diff --git a/api/chat-api/src/main/java/io/flightdeck/api/KafkaEnvProps.java b/api/chat-api/src/main/java/io/flightdeck/api/KafkaEnvProps.java new file mode 100644 index 0000000..ca951f2 --- /dev/null +++ b/api/chat-api/src/main/java/io/flightdeck/api/KafkaEnvProps.java @@ -0,0 +1,35 @@ +package io.flightdeck.api; + +import java.util.Map; +import java.util.Properties; +import java.util.Set; + +/** + * Pass-through of {@code KAFKA_*} environment variables into a Kafka client + * {@link Properties}. {@code KAFKA_FOO_BAR_BAZ} becomes {@code foo.bar.baz}. + * + *

Application-level variables that share the {@code KAFKA_} prefix but are + * not Kafka client config are skipped (input/output topic, consumer group). + * These will be renamed without the prefix in the future. + */ +public final class KafkaEnvProps { + + private static final String PREFIX = "KAFKA_"; + + private static final Set SKIP = Set.of( + "KAFKA_INPUT_TOPIC", + "KAFKA_OUTPUT_TOPIC", + "KAFKA_CONSUMER_GROUP" + ); + + private KafkaEnvProps() {} + + public static void apply(Properties props) { + for (Map.Entry entry : System.getenv().entrySet()) { + String key = entry.getKey(); + if (!key.startsWith(PREFIX) || SKIP.contains(key)) continue; + String mapped = key.substring(PREFIX.length()).toLowerCase().replace('_', '.'); + props.put(mapped, entry.getValue()); + } + } +} diff --git a/api/chat-api/src/main/java/io/flightdeck/api/KafkaMessageProducer.java b/api/chat-api/src/main/java/io/flightdeck/api/KafkaMessageProducer.java index 59f6b59..56c1395 100644 --- a/api/chat-api/src/main/java/io/flightdeck/api/KafkaMessageProducer.java +++ b/api/chat-api/src/main/java/io/flightdeck/api/KafkaMessageProducer.java @@ -28,6 +28,7 @@ public class KafkaMessageProducer { public KafkaMessageProducer() { Properties props = new Properties(); + KafkaEnvProps.apply(props); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); diff --git a/api/chat-api/src/main/java/io/flightdeck/api/OutputConsumer.java b/api/chat-api/src/main/java/io/flightdeck/api/OutputConsumer.java index 620918f..31c3b8d 100644 --- a/api/chat-api/src/main/java/io/flightdeck/api/OutputConsumer.java +++ b/api/chat-api/src/main/java/io/flightdeck/api/OutputConsumer.java @@ -37,6 +37,7 @@ public OutputConsumer(ChatWebSocketServer wsServer) { @Override public void run() { Properties props = new Properties(); + KafkaEnvProps.apply(props); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); diff --git a/api/chat-api/src/main/java/io/flightdeck/api/PipelineConsumer.java b/api/chat-api/src/main/java/io/flightdeck/api/PipelineConsumer.java index 1939c98..e5038ef 100644 --- a/api/chat-api/src/main/java/io/flightdeck/api/PipelineConsumer.java +++ b/api/chat-api/src/main/java/io/flightdeck/api/PipelineConsumer.java @@ -55,6 +55,7 @@ public PipelineConsumer(ChatWebSocketServer wsServer) { @Override public void run() { Properties props = new Properties(); + KafkaEnvProps.apply(props); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); diff --git a/examples/system-ops/kafka-tool-service/app.py b/examples/system-ops/kafka-tool-service/app.py index 6145e4c..ccfdc0a 100644 --- a/examples/system-ops/kafka-tool-service/app.py +++ b/examples/system-ops/kafka-tool-service/app.py @@ -23,7 +23,7 @@ from confluent_kafka import Consumer, TopicPartition, KafkaException from confluent_kafka.admin import AdminClient -from flightdeck_sdk import ToolConsumerRunner, ToolConsumerConfig +from flightdeck_sdk import ToolConsumerRunner, ToolConsumerConfig, kafka_env_props # ── Config ──────────────────────────────────────────────────────────────────── @@ -34,7 +34,7 @@ # ── Admin client for the monitored cluster ─────────────────────────────────── -admin = AdminClient({"bootstrap.servers": TARGET_BOOTSTRAP}) +admin = AdminClient({**kafka_env_props(), "bootstrap.servers": TARGET_BOOTSTRAP}) # ── Kafka ops tools ────────────────────────────────────────────────────────── @@ -136,6 +136,7 @@ def kafka_consumer_group_lag(input_data): cg_tp = future[group_id].result(timeout=10) lag_consumer = Consumer({ + **kafka_env_props(), "bootstrap.servers": TARGET_BOOTSTRAP, "group.id": f"_lag_checker_{group_id}", "enable.auto.commit": False, diff --git a/memoir/update-memoir-consumer/src/main/java/io/flightdeck/memoir/config/KafkaEnvProps.java b/memoir/update-memoir-consumer/src/main/java/io/flightdeck/memoir/config/KafkaEnvProps.java new file mode 100644 index 0000000..623322e --- /dev/null +++ b/memoir/update-memoir-consumer/src/main/java/io/flightdeck/memoir/config/KafkaEnvProps.java @@ -0,0 +1,35 @@ +package io.flightdeck.memoir.config; + +import java.util.Map; +import java.util.Properties; +import java.util.Set; + +/** + * Pass-through of {@code KAFKA_*} environment variables into a Kafka client + * {@link Properties}. {@code KAFKA_FOO_BAR_BAZ} becomes {@code foo.bar.baz}. + * + *

Application-level variables that share the {@code KAFKA_} prefix but are + * not Kafka client config are skipped (input/output topic, consumer group). + * These will be renamed without the prefix in the future. + */ +public final class KafkaEnvProps { + + private static final String PREFIX = "KAFKA_"; + + private static final Set SKIP = Set.of( + "KAFKA_INPUT_TOPIC", + "KAFKA_OUTPUT_TOPIC", + "KAFKA_CONSUMER_GROUP" + ); + + private KafkaEnvProps() {} + + public static void apply(Properties props) { + for (Map.Entry entry : System.getenv().entrySet()) { + String key = entry.getKey(); + if (!key.startsWith(PREFIX) || SKIP.contains(key)) continue; + String mapped = key.substring(PREFIX.length()).toLowerCase().replace('_', '.'); + props.put(mapped, entry.getValue()); + } + } +} diff --git a/memoir/update-memoir-consumer/src/main/java/io/flightdeck/memoir/consumer/UpdateMemoirConsumer.java b/memoir/update-memoir-consumer/src/main/java/io/flightdeck/memoir/consumer/UpdateMemoirConsumer.java index 6ab610b..91ad3e0 100644 --- a/memoir/update-memoir-consumer/src/main/java/io/flightdeck/memoir/consumer/UpdateMemoirConsumer.java +++ b/memoir/update-memoir-consumer/src/main/java/io/flightdeck/memoir/consumer/UpdateMemoirConsumer.java @@ -4,6 +4,7 @@ import com.fasterxml.jackson.databind.SerializationFeature; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import io.flightdeck.memoir.config.AppConfig; +import io.flightdeck.memoir.config.KafkaEnvProps; import io.flightdeck.memoir.model.MemoirSessionEnd; import io.flightdeck.memoir.service.ClaudeMemoirService; import org.apache.kafka.clients.consumer.ConsumerConfig; @@ -124,6 +125,7 @@ public void close() { private static KafkaConsumer createConsumer() { Properties props = new Properties(); + KafkaEnvProps.apply(props); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, AppConfig.BOOTSTRAP_SERVERS); props.put(ConsumerConfig.GROUP_ID_CONFIG, AppConfig.CONSUMER_GROUP); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); @@ -136,6 +138,7 @@ private static KafkaConsumer createConsumer() { private static KafkaProducer createProducer() { Properties props = new Properties(); + KafkaEnvProps.apply(props); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, AppConfig.BOOTSTRAP_SERVERS); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); diff --git a/monitoring/logging-consumer/src/main/java/io/flightdeck/monitoring/KafkaEnvProps.java b/monitoring/logging-consumer/src/main/java/io/flightdeck/monitoring/KafkaEnvProps.java new file mode 100644 index 0000000..3644f12 --- /dev/null +++ b/monitoring/logging-consumer/src/main/java/io/flightdeck/monitoring/KafkaEnvProps.java @@ -0,0 +1,35 @@ +package io.flightdeck.monitoring; + +import java.util.Map; +import java.util.Properties; +import java.util.Set; + +/** + * Pass-through of {@code KAFKA_*} environment variables into a Kafka client + * {@link Properties}. {@code KAFKA_FOO_BAR_BAZ} becomes {@code foo.bar.baz}. + * + *

Application-level variables that share the {@code KAFKA_} prefix but are + * not Kafka client config are skipped (input/output topic, consumer group). + * These will be renamed without the prefix in the future. + */ +public final class KafkaEnvProps { + + private static final String PREFIX = "KAFKA_"; + + private static final Set SKIP = Set.of( + "KAFKA_INPUT_TOPIC", + "KAFKA_OUTPUT_TOPIC", + "KAFKA_CONSUMER_GROUP" + ); + + private KafkaEnvProps() {} + + public static void apply(Properties props) { + for (Map.Entry entry : System.getenv().entrySet()) { + String key = entry.getKey(); + if (!key.startsWith(PREFIX) || SKIP.contains(key)) continue; + String mapped = key.substring(PREFIX.length()).toLowerCase().replace('_', '.'); + props.put(mapped, entry.getValue()); + } + } +} diff --git a/monitoring/logging-consumer/src/main/java/io/flightdeck/monitoring/MonitoringApp.java b/monitoring/logging-consumer/src/main/java/io/flightdeck/monitoring/MonitoringApp.java index ec51681..e29e600 100644 --- a/monitoring/logging-consumer/src/main/java/io/flightdeck/monitoring/MonitoringApp.java +++ b/monitoring/logging-consumer/src/main/java/io/flightdeck/monitoring/MonitoringApp.java @@ -51,6 +51,7 @@ public static void main(String[] args) { log.info(" Topics: {}", TOPICS); Properties props = new Properties(); + KafkaEnvProps.apply(props); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); diff --git a/processor-apps/processing/src/main/java/io/flightdeck/streams/FlightDeckStreamsApp.java b/processor-apps/processing/src/main/java/io/flightdeck/streams/FlightDeckStreamsApp.java index d40e479..836ed8a 100644 --- a/processor-apps/processing/src/main/java/io/flightdeck/streams/FlightDeckStreamsApp.java +++ b/processor-apps/processing/src/main/java/io/flightdeck/streams/FlightDeckStreamsApp.java @@ -146,6 +146,7 @@ private static void ensureTopicsExist(Properties streamsProps) { String bootstrapServers = streamsProps.getProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG); Properties adminProps = new Properties(); + KafkaEnvProps.apply(adminProps); adminProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); List requiredTopics = new java.util.ArrayList<>(List.of( @@ -207,6 +208,7 @@ public static Properties buildConfig() { } Properties p = new Properties(); + KafkaEnvProps.apply(p); p.put(StreamsConfig.APPLICATION_ID_CONFIG, Topics.AGENT_NAME + "-streams"); p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); p.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, diff --git a/processor-apps/processing/src/main/java/io/flightdeck/streams/KafkaEnvProps.java b/processor-apps/processing/src/main/java/io/flightdeck/streams/KafkaEnvProps.java new file mode 100644 index 0000000..8838e62 --- /dev/null +++ b/processor-apps/processing/src/main/java/io/flightdeck/streams/KafkaEnvProps.java @@ -0,0 +1,35 @@ +package io.flightdeck.streams; + +import java.util.Map; +import java.util.Properties; +import java.util.Set; + +/** + * Pass-through of {@code KAFKA_*} environment variables into a Kafka client + * {@link Properties}. {@code KAFKA_FOO_BAR_BAZ} becomes {@code foo.bar.baz}. + * + *

Application-level variables that share the {@code KAFKA_} prefix but are + * not Kafka client config are skipped (input/output topic, consumer group). + * These will be renamed without the prefix in the future. + */ +public final class KafkaEnvProps { + + private static final String PREFIX = "KAFKA_"; + + private static final Set SKIP = Set.of( + "KAFKA_INPUT_TOPIC", + "KAFKA_OUTPUT_TOPIC", + "KAFKA_CONSUMER_GROUP" + ); + + private KafkaEnvProps() {} + + public static void apply(Properties props) { + for (Map.Entry entry : System.getenv().entrySet()) { + String key = entry.getKey(); + if (!key.startsWith(PREFIX) || SKIP.contains(key)) continue; + String mapped = key.substring(PREFIX.length()).toLowerCase().replace('_', '.'); + props.put(mapped, entry.getValue()); + } + } +} diff --git a/sdk/python/flightdeck_sdk/__init__.py b/sdk/python/flightdeck_sdk/__init__.py index 29bd2ad..b7270a8 100644 --- a/sdk/python/flightdeck_sdk/__init__.py +++ b/sdk/python/flightdeck_sdk/__init__.py @@ -1,3 +1,4 @@ +from .kafka_env_props import kafka_env_props from .message_context import KafkaMessageContext from .tool_consumer_runner import ToolConsumerRunner, ToolConsumerConfig from .think_consumer_runner import ThinkConsumerRunner, ThinkConsumerConfig @@ -8,4 +9,5 @@ "ToolConsumerConfig", "ThinkConsumerRunner", "ThinkConsumerConfig", + "kafka_env_props", ] diff --git a/sdk/python/flightdeck_sdk/kafka_env_props.py b/sdk/python/flightdeck_sdk/kafka_env_props.py new file mode 100644 index 0000000..e704bb7 --- /dev/null +++ b/sdk/python/flightdeck_sdk/kafka_env_props.py @@ -0,0 +1,27 @@ +"""Pass-through of KAFKA_* environment variables into a Kafka client config dict. + +KAFKA_FOO_BAR_BAZ becomes foo.bar.baz. Application-level variables that share +the KAFKA_ prefix but are not Kafka client config are skipped (input/output +topic, consumer group). These will be renamed without the prefix in the future. +""" + +import os + +_PREFIX = "KAFKA_" + +_SKIP = { + "KAFKA_INPUT_TOPIC", + "KAFKA_OUTPUT_TOPIC", + "KAFKA_CONSUMER_GROUP", +} + + +def kafka_env_props() -> dict[str, str]: + """Return a config dict built from all KAFKA_* env vars.""" + out: dict[str, str] = {} + for key, value in os.environ.items(): + if not key.startswith(_PREFIX) or key in _SKIP: + continue + mapped = key[len(_PREFIX):].lower().replace("_", ".") + out[mapped] = value + return out diff --git a/sdk/python/flightdeck_sdk/think_consumer_runner.py b/sdk/python/flightdeck_sdk/think_consumer_runner.py index f4a7bb6..863d88e 100644 --- a/sdk/python/flightdeck_sdk/think_consumer_runner.py +++ b/sdk/python/flightdeck_sdk/think_consumer_runner.py @@ -10,6 +10,8 @@ from confluent_kafka import Consumer, Producer, TopicPartition +from .kafka_env_props import kafka_env_props + logger = logging.getLogger(__name__) @@ -84,7 +86,10 @@ def __init__(self, config: ThinkConsumerConfig): raise ValueError("claude_api_key is required when llm_provider='claude'") logger.info("Using Claude LLM provider (model=%s)", config.claude_model) + env_props = kafka_env_props() + self._consumer = Consumer({ + **env_props, "bootstrap.servers": config.brokers, "group.id": config.group_id, "auto.offset.reset": "earliest", @@ -94,6 +99,7 @@ def __init__(self, config: ThinkConsumerConfig): }) self._producer = Producer({ + **env_props, "bootstrap.servers": config.brokers, "acks": "all", "enable.idempotence": True, diff --git a/sdk/python/flightdeck_sdk/tool_consumer_runner.py b/sdk/python/flightdeck_sdk/tool_consumer_runner.py index 2beec1b..2b36814 100644 --- a/sdk/python/flightdeck_sdk/tool_consumer_runner.py +++ b/sdk/python/flightdeck_sdk/tool_consumer_runner.py @@ -6,6 +6,7 @@ from confluent_kafka import Consumer, Producer +from .kafka_env_props import kafka_env_props from .message_context import KafkaMessageContext logger = logging.getLogger(__name__) @@ -42,7 +43,10 @@ def __init__(self, config: ToolConsumerConfig): self._config = config self._running = False + env_props = kafka_env_props() + self._consumer = Consumer({ + **env_props, "bootstrap.servers": config.brokers, "group.id": config.group_id, "auto.offset.reset": "earliest", @@ -51,6 +55,7 @@ def __init__(self, config: ToolConsumerConfig): }) self._producer = Producer({ + **env_props, "bootstrap.servers": config.brokers, "acks": "all", "enable.idempotence": True, diff --git a/think/think-consumer/src/main/java/io/flightdeck/think/config/KafkaEnvProps.java b/think/think-consumer/src/main/java/io/flightdeck/think/config/KafkaEnvProps.java new file mode 100644 index 0000000..f708548 --- /dev/null +++ b/think/think-consumer/src/main/java/io/flightdeck/think/config/KafkaEnvProps.java @@ -0,0 +1,38 @@ +package io.flightdeck.think.config; + +import java.util.Map; +import java.util.Properties; +import java.util.Set; + +/** + * Pass-through of {@code KAFKA_*} environment variables into a Kafka client + * {@link Properties}. {@code KAFKA_FOO_BAR_BAZ} becomes {@code foo.bar.baz}. + * + *

Application-level variables that share the {@code KAFKA_} prefix but are + * not Kafka client config are skipped (input/output topic, consumer group). + * These will be renamed without the prefix in the future. + * + *

No validation is done — unknown keys are forwarded as-is and Kafka will + * log a warning and ignore them. + */ +public final class KafkaEnvProps { + + private static final String PREFIX = "KAFKA_"; + + private static final Set SKIP = Set.of( + "KAFKA_INPUT_TOPIC", + "KAFKA_OUTPUT_TOPIC", + "KAFKA_CONSUMER_GROUP" + ); + + private KafkaEnvProps() {} + + public static void apply(Properties props) { + for (Map.Entry entry : System.getenv().entrySet()) { + String key = entry.getKey(); + if (!key.startsWith(PREFIX) || SKIP.contains(key)) continue; + String mapped = key.substring(PREFIX.length()).toLowerCase().replace('_', '.'); + props.put(mapped, entry.getValue()); + } + } +} diff --git a/think/think-consumer/src/main/java/io/flightdeck/think/consumer/ThinkConsumer.java b/think/think-consumer/src/main/java/io/flightdeck/think/consumer/ThinkConsumer.java index cf9449f..4ccf16a 100644 --- a/think/think-consumer/src/main/java/io/flightdeck/think/consumer/ThinkConsumer.java +++ b/think/think-consumer/src/main/java/io/flightdeck/think/consumer/ThinkConsumer.java @@ -4,6 +4,7 @@ import com.fasterxml.jackson.databind.SerializationFeature; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import io.flightdeck.think.config.AppConfig; +import io.flightdeck.think.config.KafkaEnvProps; import io.flightdeck.think.model.FullSessionContext; import io.flightdeck.think.model.MessageInput; import io.flightdeck.think.model.ThinkResponse; @@ -529,6 +530,7 @@ public void close() { private static KafkaConsumer createConsumer() { Properties props = new Properties(); + KafkaEnvProps.apply(props); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, AppConfig.BOOTSTRAP_SERVERS); props.put(ConsumerConfig.GROUP_ID_CONFIG, AppConfig.CONSUMER_GROUP); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); @@ -541,6 +543,7 @@ private static KafkaConsumer createConsumer() { private static KafkaProducer createProducer() { Properties props = new Properties(); + KafkaEnvProps.apply(props); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, AppConfig.BOOTSTRAP_SERVERS); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());