diff --git a/api/chat-api/src/main/java/io/flightdeck/api/KafkaEnvProps.java b/api/chat-api/src/main/java/io/flightdeck/api/KafkaEnvProps.java
new file mode 100644
index 0000000..ca951f2
--- /dev/null
+++ b/api/chat-api/src/main/java/io/flightdeck/api/KafkaEnvProps.java
@@ -0,0 +1,35 @@
+package io.flightdeck.api;
+
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+/**
+ * Pass-through of {@code KAFKA_*} environment variables into a Kafka client
+ * {@link Properties}. {@code KAFKA_FOO_BAR_BAZ} becomes {@code foo.bar.baz}.
+ *
+ *
Application-level variables that share the {@code KAFKA_} prefix but are
+ * not Kafka client config are skipped (input/output topic, consumer group).
+ * These will be renamed without the prefix in the future.
+ */
+public final class KafkaEnvProps {
+
+ private static final String PREFIX = "KAFKA_";
+
+ private static final Set SKIP = Set.of(
+ "KAFKA_INPUT_TOPIC",
+ "KAFKA_OUTPUT_TOPIC",
+ "KAFKA_CONSUMER_GROUP"
+ );
+
+ private KafkaEnvProps() {}
+
+ public static void apply(Properties props) {
+ for (Map.Entry entry : System.getenv().entrySet()) {
+ String key = entry.getKey();
+ if (!key.startsWith(PREFIX) || SKIP.contains(key)) continue;
+ String mapped = key.substring(PREFIX.length()).toLowerCase().replace('_', '.');
+ props.put(mapped, entry.getValue());
+ }
+ }
+}
diff --git a/api/chat-api/src/main/java/io/flightdeck/api/KafkaMessageProducer.java b/api/chat-api/src/main/java/io/flightdeck/api/KafkaMessageProducer.java
index 59f6b59..56c1395 100644
--- a/api/chat-api/src/main/java/io/flightdeck/api/KafkaMessageProducer.java
+++ b/api/chat-api/src/main/java/io/flightdeck/api/KafkaMessageProducer.java
@@ -28,6 +28,7 @@ public class KafkaMessageProducer {
public KafkaMessageProducer() {
Properties props = new Properties();
+ KafkaEnvProps.apply(props);
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
diff --git a/api/chat-api/src/main/java/io/flightdeck/api/OutputConsumer.java b/api/chat-api/src/main/java/io/flightdeck/api/OutputConsumer.java
index 620918f..31c3b8d 100644
--- a/api/chat-api/src/main/java/io/flightdeck/api/OutputConsumer.java
+++ b/api/chat-api/src/main/java/io/flightdeck/api/OutputConsumer.java
@@ -37,6 +37,7 @@ public OutputConsumer(ChatWebSocketServer wsServer) {
@Override
public void run() {
Properties props = new Properties();
+ KafkaEnvProps.apply(props);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
diff --git a/api/chat-api/src/main/java/io/flightdeck/api/PipelineConsumer.java b/api/chat-api/src/main/java/io/flightdeck/api/PipelineConsumer.java
index 1939c98..e5038ef 100644
--- a/api/chat-api/src/main/java/io/flightdeck/api/PipelineConsumer.java
+++ b/api/chat-api/src/main/java/io/flightdeck/api/PipelineConsumer.java
@@ -55,6 +55,7 @@ public PipelineConsumer(ChatWebSocketServer wsServer) {
@Override
public void run() {
Properties props = new Properties();
+ KafkaEnvProps.apply(props);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
diff --git a/examples/system-ops/kafka-tool-service/app.py b/examples/system-ops/kafka-tool-service/app.py
index 6145e4c..ccfdc0a 100644
--- a/examples/system-ops/kafka-tool-service/app.py
+++ b/examples/system-ops/kafka-tool-service/app.py
@@ -23,7 +23,7 @@
from confluent_kafka import Consumer, TopicPartition, KafkaException
from confluent_kafka.admin import AdminClient
-from flightdeck_sdk import ToolConsumerRunner, ToolConsumerConfig
+from flightdeck_sdk import ToolConsumerRunner, ToolConsumerConfig, kafka_env_props
# ── Config ────────────────────────────────────────────────────────────────────
@@ -34,7 +34,7 @@
# ── Admin client for the monitored cluster ───────────────────────────────────
-admin = AdminClient({"bootstrap.servers": TARGET_BOOTSTRAP})
+admin = AdminClient({**kafka_env_props(), "bootstrap.servers": TARGET_BOOTSTRAP})
# ── Kafka ops tools ──────────────────────────────────────────────────────────
@@ -136,6 +136,7 @@ def kafka_consumer_group_lag(input_data):
cg_tp = future[group_id].result(timeout=10)
lag_consumer = Consumer({
+ **kafka_env_props(),
"bootstrap.servers": TARGET_BOOTSTRAP,
"group.id": f"_lag_checker_{group_id}",
"enable.auto.commit": False,
diff --git a/memoir/update-memoir-consumer/src/main/java/io/flightdeck/memoir/config/KafkaEnvProps.java b/memoir/update-memoir-consumer/src/main/java/io/flightdeck/memoir/config/KafkaEnvProps.java
new file mode 100644
index 0000000..623322e
--- /dev/null
+++ b/memoir/update-memoir-consumer/src/main/java/io/flightdeck/memoir/config/KafkaEnvProps.java
@@ -0,0 +1,35 @@
+package io.flightdeck.memoir.config;
+
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+/**
+ * Pass-through of {@code KAFKA_*} environment variables into a Kafka client
+ * {@link Properties}. {@code KAFKA_FOO_BAR_BAZ} becomes {@code foo.bar.baz}.
+ *
+ * Application-level variables that share the {@code KAFKA_} prefix but are
+ * not Kafka client config are skipped (input/output topic, consumer group).
+ * These will be renamed without the prefix in the future.
+ */
+public final class KafkaEnvProps {
+
+ private static final String PREFIX = "KAFKA_";
+
+ private static final Set SKIP = Set.of(
+ "KAFKA_INPUT_TOPIC",
+ "KAFKA_OUTPUT_TOPIC",
+ "KAFKA_CONSUMER_GROUP"
+ );
+
+ private KafkaEnvProps() {}
+
+ public static void apply(Properties props) {
+ for (Map.Entry entry : System.getenv().entrySet()) {
+ String key = entry.getKey();
+ if (!key.startsWith(PREFIX) || SKIP.contains(key)) continue;
+ String mapped = key.substring(PREFIX.length()).toLowerCase().replace('_', '.');
+ props.put(mapped, entry.getValue());
+ }
+ }
+}
diff --git a/memoir/update-memoir-consumer/src/main/java/io/flightdeck/memoir/consumer/UpdateMemoirConsumer.java b/memoir/update-memoir-consumer/src/main/java/io/flightdeck/memoir/consumer/UpdateMemoirConsumer.java
index 6ab610b..91ad3e0 100644
--- a/memoir/update-memoir-consumer/src/main/java/io/flightdeck/memoir/consumer/UpdateMemoirConsumer.java
+++ b/memoir/update-memoir-consumer/src/main/java/io/flightdeck/memoir/consumer/UpdateMemoirConsumer.java
@@ -4,6 +4,7 @@
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import io.flightdeck.memoir.config.AppConfig;
+import io.flightdeck.memoir.config.KafkaEnvProps;
import io.flightdeck.memoir.model.MemoirSessionEnd;
import io.flightdeck.memoir.service.ClaudeMemoirService;
import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -124,6 +125,7 @@ public void close() {
private static KafkaConsumer createConsumer() {
Properties props = new Properties();
+ KafkaEnvProps.apply(props);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, AppConfig.BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, AppConfig.CONSUMER_GROUP);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
@@ -136,6 +138,7 @@ private static KafkaConsumer createConsumer() {
private static KafkaProducer createProducer() {
Properties props = new Properties();
+ KafkaEnvProps.apply(props);
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, AppConfig.BOOTSTRAP_SERVERS);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
diff --git a/monitoring/logging-consumer/src/main/java/io/flightdeck/monitoring/KafkaEnvProps.java b/monitoring/logging-consumer/src/main/java/io/flightdeck/monitoring/KafkaEnvProps.java
new file mode 100644
index 0000000..3644f12
--- /dev/null
+++ b/monitoring/logging-consumer/src/main/java/io/flightdeck/monitoring/KafkaEnvProps.java
@@ -0,0 +1,35 @@
+package io.flightdeck.monitoring;
+
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+/**
+ * Pass-through of {@code KAFKA_*} environment variables into a Kafka client
+ * {@link Properties}. {@code KAFKA_FOO_BAR_BAZ} becomes {@code foo.bar.baz}.
+ *
+ * Application-level variables that share the {@code KAFKA_} prefix but are
+ * not Kafka client config are skipped (input/output topic, consumer group).
+ * These will be renamed without the prefix in the future.
+ */
+public final class KafkaEnvProps {
+
+ private static final String PREFIX = "KAFKA_";
+
+ private static final Set SKIP = Set.of(
+ "KAFKA_INPUT_TOPIC",
+ "KAFKA_OUTPUT_TOPIC",
+ "KAFKA_CONSUMER_GROUP"
+ );
+
+ private KafkaEnvProps() {}
+
+ public static void apply(Properties props) {
+ for (Map.Entry entry : System.getenv().entrySet()) {
+ String key = entry.getKey();
+ if (!key.startsWith(PREFIX) || SKIP.contains(key)) continue;
+ String mapped = key.substring(PREFIX.length()).toLowerCase().replace('_', '.');
+ props.put(mapped, entry.getValue());
+ }
+ }
+}
diff --git a/monitoring/logging-consumer/src/main/java/io/flightdeck/monitoring/MonitoringApp.java b/monitoring/logging-consumer/src/main/java/io/flightdeck/monitoring/MonitoringApp.java
index ec51681..e29e600 100644
--- a/monitoring/logging-consumer/src/main/java/io/flightdeck/monitoring/MonitoringApp.java
+++ b/monitoring/logging-consumer/src/main/java/io/flightdeck/monitoring/MonitoringApp.java
@@ -51,6 +51,7 @@ public static void main(String[] args) {
log.info(" Topics: {}", TOPICS);
Properties props = new Properties();
+ KafkaEnvProps.apply(props);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
diff --git a/processor-apps/processing/src/main/java/io/flightdeck/streams/FlightDeckStreamsApp.java b/processor-apps/processing/src/main/java/io/flightdeck/streams/FlightDeckStreamsApp.java
index d40e479..836ed8a 100644
--- a/processor-apps/processing/src/main/java/io/flightdeck/streams/FlightDeckStreamsApp.java
+++ b/processor-apps/processing/src/main/java/io/flightdeck/streams/FlightDeckStreamsApp.java
@@ -146,6 +146,7 @@ private static void ensureTopicsExist(Properties streamsProps) {
String bootstrapServers = streamsProps.getProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
Properties adminProps = new Properties();
+ KafkaEnvProps.apply(adminProps);
adminProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
List requiredTopics = new java.util.ArrayList<>(List.of(
@@ -207,6 +208,7 @@ public static Properties buildConfig() {
}
Properties p = new Properties();
+ KafkaEnvProps.apply(p);
p.put(StreamsConfig.APPLICATION_ID_CONFIG, Topics.AGENT_NAME + "-streams");
p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
p.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
diff --git a/processor-apps/processing/src/main/java/io/flightdeck/streams/KafkaEnvProps.java b/processor-apps/processing/src/main/java/io/flightdeck/streams/KafkaEnvProps.java
new file mode 100644
index 0000000..8838e62
--- /dev/null
+++ b/processor-apps/processing/src/main/java/io/flightdeck/streams/KafkaEnvProps.java
@@ -0,0 +1,35 @@
+package io.flightdeck.streams;
+
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+/**
+ * Pass-through of {@code KAFKA_*} environment variables into a Kafka client
+ * {@link Properties}. {@code KAFKA_FOO_BAR_BAZ} becomes {@code foo.bar.baz}.
+ *
+ * Application-level variables that share the {@code KAFKA_} prefix but are
+ * not Kafka client config are skipped (input/output topic, consumer group).
+ * These will be renamed without the prefix in the future.
+ */
+public final class KafkaEnvProps {
+
+ private static final String PREFIX = "KAFKA_";
+
+ private static final Set SKIP = Set.of(
+ "KAFKA_INPUT_TOPIC",
+ "KAFKA_OUTPUT_TOPIC",
+ "KAFKA_CONSUMER_GROUP"
+ );
+
+ private KafkaEnvProps() {}
+
+ public static void apply(Properties props) {
+ for (Map.Entry entry : System.getenv().entrySet()) {
+ String key = entry.getKey();
+ if (!key.startsWith(PREFIX) || SKIP.contains(key)) continue;
+ String mapped = key.substring(PREFIX.length()).toLowerCase().replace('_', '.');
+ props.put(mapped, entry.getValue());
+ }
+ }
+}
diff --git a/sdk/python/flightdeck_sdk/__init__.py b/sdk/python/flightdeck_sdk/__init__.py
index 29bd2ad..b7270a8 100644
--- a/sdk/python/flightdeck_sdk/__init__.py
+++ b/sdk/python/flightdeck_sdk/__init__.py
@@ -1,3 +1,4 @@
+from .kafka_env_props import kafka_env_props
from .message_context import KafkaMessageContext
from .tool_consumer_runner import ToolConsumerRunner, ToolConsumerConfig
from .think_consumer_runner import ThinkConsumerRunner, ThinkConsumerConfig
@@ -8,4 +9,5 @@
"ToolConsumerConfig",
"ThinkConsumerRunner",
"ThinkConsumerConfig",
+ "kafka_env_props",
]
diff --git a/sdk/python/flightdeck_sdk/kafka_env_props.py b/sdk/python/flightdeck_sdk/kafka_env_props.py
new file mode 100644
index 0000000..e704bb7
--- /dev/null
+++ b/sdk/python/flightdeck_sdk/kafka_env_props.py
@@ -0,0 +1,27 @@
+"""Pass-through of KAFKA_* environment variables into a Kafka client config dict.
+
+KAFKA_FOO_BAR_BAZ becomes foo.bar.baz. Application-level variables that share
+the KAFKA_ prefix but are not Kafka client config are skipped (input/output
+topic, consumer group). These will be renamed without the prefix in the future.
+"""
+
+import os
+
+_PREFIX = "KAFKA_"
+
+_SKIP = {
+ "KAFKA_INPUT_TOPIC",
+ "KAFKA_OUTPUT_TOPIC",
+ "KAFKA_CONSUMER_GROUP",
+}
+
+
+def kafka_env_props() -> dict[str, str]:
+ """Return a config dict built from all KAFKA_* env vars."""
+ out: dict[str, str] = {}
+ for key, value in os.environ.items():
+ if not key.startswith(_PREFIX) or key in _SKIP:
+ continue
+ mapped = key[len(_PREFIX):].lower().replace("_", ".")
+ out[mapped] = value
+ return out
diff --git a/sdk/python/flightdeck_sdk/think_consumer_runner.py b/sdk/python/flightdeck_sdk/think_consumer_runner.py
index f4a7bb6..863d88e 100644
--- a/sdk/python/flightdeck_sdk/think_consumer_runner.py
+++ b/sdk/python/flightdeck_sdk/think_consumer_runner.py
@@ -10,6 +10,8 @@
from confluent_kafka import Consumer, Producer, TopicPartition
+from .kafka_env_props import kafka_env_props
+
logger = logging.getLogger(__name__)
@@ -84,7 +86,10 @@ def __init__(self, config: ThinkConsumerConfig):
raise ValueError("claude_api_key is required when llm_provider='claude'")
logger.info("Using Claude LLM provider (model=%s)", config.claude_model)
+ env_props = kafka_env_props()
+
self._consumer = Consumer({
+ **env_props,
"bootstrap.servers": config.brokers,
"group.id": config.group_id,
"auto.offset.reset": "earliest",
@@ -94,6 +99,7 @@ def __init__(self, config: ThinkConsumerConfig):
})
self._producer = Producer({
+ **env_props,
"bootstrap.servers": config.brokers,
"acks": "all",
"enable.idempotence": True,
diff --git a/sdk/python/flightdeck_sdk/tool_consumer_runner.py b/sdk/python/flightdeck_sdk/tool_consumer_runner.py
index 2beec1b..2b36814 100644
--- a/sdk/python/flightdeck_sdk/tool_consumer_runner.py
+++ b/sdk/python/flightdeck_sdk/tool_consumer_runner.py
@@ -6,6 +6,7 @@
from confluent_kafka import Consumer, Producer
+from .kafka_env_props import kafka_env_props
from .message_context import KafkaMessageContext
logger = logging.getLogger(__name__)
@@ -42,7 +43,10 @@ def __init__(self, config: ToolConsumerConfig):
self._config = config
self._running = False
+ env_props = kafka_env_props()
+
self._consumer = Consumer({
+ **env_props,
"bootstrap.servers": config.brokers,
"group.id": config.group_id,
"auto.offset.reset": "earliest",
@@ -51,6 +55,7 @@ def __init__(self, config: ToolConsumerConfig):
})
self._producer = Producer({
+ **env_props,
"bootstrap.servers": config.brokers,
"acks": "all",
"enable.idempotence": True,
diff --git a/think/think-consumer/src/main/java/io/flightdeck/think/config/KafkaEnvProps.java b/think/think-consumer/src/main/java/io/flightdeck/think/config/KafkaEnvProps.java
new file mode 100644
index 0000000..f708548
--- /dev/null
+++ b/think/think-consumer/src/main/java/io/flightdeck/think/config/KafkaEnvProps.java
@@ -0,0 +1,38 @@
+package io.flightdeck.think.config;
+
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+/**
+ * Pass-through of {@code KAFKA_*} environment variables into a Kafka client
+ * {@link Properties}. {@code KAFKA_FOO_BAR_BAZ} becomes {@code foo.bar.baz}.
+ *
+ * Application-level variables that share the {@code KAFKA_} prefix but are
+ * not Kafka client config are skipped (input/output topic, consumer group).
+ * These will be renamed without the prefix in the future.
+ *
+ *
No validation is done — unknown keys are forwarded as-is and Kafka will
+ * log a warning and ignore them.
+ */
+public final class KafkaEnvProps {
+
+ private static final String PREFIX = "KAFKA_";
+
+ private static final Set SKIP = Set.of(
+ "KAFKA_INPUT_TOPIC",
+ "KAFKA_OUTPUT_TOPIC",
+ "KAFKA_CONSUMER_GROUP"
+ );
+
+ private KafkaEnvProps() {}
+
+ public static void apply(Properties props) {
+ for (Map.Entry entry : System.getenv().entrySet()) {
+ String key = entry.getKey();
+ if (!key.startsWith(PREFIX) || SKIP.contains(key)) continue;
+ String mapped = key.substring(PREFIX.length()).toLowerCase().replace('_', '.');
+ props.put(mapped, entry.getValue());
+ }
+ }
+}
diff --git a/think/think-consumer/src/main/java/io/flightdeck/think/consumer/ThinkConsumer.java b/think/think-consumer/src/main/java/io/flightdeck/think/consumer/ThinkConsumer.java
index cf9449f..4ccf16a 100644
--- a/think/think-consumer/src/main/java/io/flightdeck/think/consumer/ThinkConsumer.java
+++ b/think/think-consumer/src/main/java/io/flightdeck/think/consumer/ThinkConsumer.java
@@ -4,6 +4,7 @@
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import io.flightdeck.think.config.AppConfig;
+import io.flightdeck.think.config.KafkaEnvProps;
import io.flightdeck.think.model.FullSessionContext;
import io.flightdeck.think.model.MessageInput;
import io.flightdeck.think.model.ThinkResponse;
@@ -529,6 +530,7 @@ public void close() {
private static KafkaConsumer createConsumer() {
Properties props = new Properties();
+ KafkaEnvProps.apply(props);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, AppConfig.BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, AppConfig.CONSUMER_GROUP);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
@@ -541,6 +543,7 @@ private static KafkaConsumer createConsumer() {
private static KafkaProducer createProducer() {
Properties props = new Properties();
+ KafkaEnvProps.apply(props);
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, AppConfig.BOOTSTRAP_SERVERS);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());