Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions api/chat-api/src/main/java/io/flightdeck/api/KafkaEnvProps.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package io.flightdeck.api;

import java.util.Map;
import java.util.Properties;
import java.util.Set;

/**
* Pass-through of {@code KAFKA_*} environment variables into a Kafka client
* {@link Properties}. {@code KAFKA_FOO_BAR_BAZ} becomes {@code foo.bar.baz}.
*
* <p>Application-level variables that share the {@code KAFKA_} prefix but are
* not Kafka client config are skipped (input/output topic, consumer group).
* These will be renamed without the prefix in the future.
*/
public final class KafkaEnvProps {

private static final String PREFIX = "KAFKA_";

private static final Set<String> SKIP = Set.of(
"KAFKA_INPUT_TOPIC",
"KAFKA_OUTPUT_TOPIC",
"KAFKA_CONSUMER_GROUP"
);

private KafkaEnvProps() {}

public static void apply(Properties props) {
for (Map.Entry<String, String> entry : System.getenv().entrySet()) {
String key = entry.getKey();
if (!key.startsWith(PREFIX) || SKIP.contains(key)) continue;
String mapped = key.substring(PREFIX.length()).toLowerCase().replace('_', '.');
props.put(mapped, entry.getValue());
Comment on lines +29 to +32
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public class KafkaMessageProducer {

public KafkaMessageProducer() {
Properties props = new Properties();
KafkaEnvProps.apply(props);
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public OutputConsumer(ChatWebSocketServer wsServer) {
@Override
public void run() {
Properties props = new Properties();
KafkaEnvProps.apply(props);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public PipelineConsumer(ChatWebSocketServer wsServer) {
@Override
public void run() {
Properties props = new Properties();
KafkaEnvProps.apply(props);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
Expand Down
5 changes: 3 additions & 2 deletions examples/system-ops/kafka-tool-service/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from confluent_kafka import Consumer, TopicPartition, KafkaException
from confluent_kafka.admin import AdminClient

from flightdeck_sdk import ToolConsumerRunner, ToolConsumerConfig
from flightdeck_sdk import ToolConsumerRunner, ToolConsumerConfig, kafka_env_props

# ── Config ────────────────────────────────────────────────────────────────────

Expand All @@ -34,7 +34,7 @@

# ── Admin client for the monitored cluster ───────────────────────────────────

admin = AdminClient({"bootstrap.servers": TARGET_BOOTSTRAP})
admin = AdminClient({**kafka_env_props(), "bootstrap.servers": TARGET_BOOTSTRAP})

# ── Kafka ops tools ──────────────────────────────────────────────────────────

Expand Down Expand Up @@ -136,6 +136,7 @@ def kafka_consumer_group_lag(input_data):
cg_tp = future[group_id].result(timeout=10)

lag_consumer = Consumer({
**kafka_env_props(),
"bootstrap.servers": TARGET_BOOTSTRAP,
"group.id": f"_lag_checker_{group_id}",
"enable.auto.commit": False,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package io.flightdeck.memoir.config;

import java.util.Map;
import java.util.Properties;
import java.util.Set;

/**
* Pass-through of {@code KAFKA_*} environment variables into a Kafka client
* {@link Properties}. {@code KAFKA_FOO_BAR_BAZ} becomes {@code foo.bar.baz}.
*
* <p>Application-level variables that share the {@code KAFKA_} prefix but are
* not Kafka client config are skipped (input/output topic, consumer group).
* These will be renamed without the prefix in the future.
*/
public final class KafkaEnvProps {

private static final String PREFIX = "KAFKA_";

private static final Set<String> SKIP = Set.of(
"KAFKA_INPUT_TOPIC",
"KAFKA_OUTPUT_TOPIC",
"KAFKA_CONSUMER_GROUP"
);

private KafkaEnvProps() {}

public static void apply(Properties props) {
for (Map.Entry<String, String> entry : System.getenv().entrySet()) {
String key = entry.getKey();
if (!key.startsWith(PREFIX) || SKIP.contains(key)) continue;
String mapped = key.substring(PREFIX.length()).toLowerCase().replace('_', '.');
props.put(mapped, entry.getValue());
Comment on lines +29 to +32
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import io.flightdeck.memoir.config.AppConfig;
import io.flightdeck.memoir.config.KafkaEnvProps;
import io.flightdeck.memoir.model.MemoirSessionEnd;
import io.flightdeck.memoir.service.ClaudeMemoirService;
import org.apache.kafka.clients.consumer.ConsumerConfig;
Expand Down Expand Up @@ -124,6 +125,7 @@ public void close() {

private static KafkaConsumer<String, String> createConsumer() {
Properties props = new Properties();
KafkaEnvProps.apply(props);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, AppConfig.BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, AppConfig.CONSUMER_GROUP);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
Expand All @@ -136,6 +138,7 @@ private static KafkaConsumer<String, String> createConsumer() {

private static KafkaProducer<String, String> createProducer() {
Properties props = new Properties();
KafkaEnvProps.apply(props);
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, AppConfig.BOOTSTRAP_SERVERS);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package io.flightdeck.monitoring;

import java.util.Map;
import java.util.Properties;
import java.util.Set;

/**
* Pass-through of {@code KAFKA_*} environment variables into a Kafka client
* {@link Properties}. {@code KAFKA_FOO_BAR_BAZ} becomes {@code foo.bar.baz}.
*
* <p>Application-level variables that share the {@code KAFKA_} prefix but are
* not Kafka client config are skipped (input/output topic, consumer group).
* These will be renamed without the prefix in the future.
*/
public final class KafkaEnvProps {

private static final String PREFIX = "KAFKA_";

private static final Set<String> SKIP = Set.of(
"KAFKA_INPUT_TOPIC",
"KAFKA_OUTPUT_TOPIC",
"KAFKA_CONSUMER_GROUP"
);

private KafkaEnvProps() {}

public static void apply(Properties props) {
for (Map.Entry<String, String> entry : System.getenv().entrySet()) {
String key = entry.getKey();
if (!key.startsWith(PREFIX) || SKIP.contains(key)) continue;
String mapped = key.substring(PREFIX.length()).toLowerCase().replace('_', '.');
props.put(mapped, entry.getValue());
Comment on lines +29 to +32
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public static void main(String[] args) {
log.info(" Topics: {}", TOPICS);

Properties props = new Properties();
KafkaEnvProps.apply(props);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ private static void ensureTopicsExist(Properties streamsProps) {
String bootstrapServers = streamsProps.getProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);

Properties adminProps = new Properties();
KafkaEnvProps.apply(adminProps);
adminProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

List<String> requiredTopics = new java.util.ArrayList<>(List.of(
Expand Down Expand Up @@ -207,6 +208,7 @@ public static Properties buildConfig() {
}

Properties p = new Properties();
KafkaEnvProps.apply(p);
p.put(StreamsConfig.APPLICATION_ID_CONFIG, Topics.AGENT_NAME + "-streams");
p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
p.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package io.flightdeck.streams;

import java.util.Map;
import java.util.Properties;
import java.util.Set;

/**
* Pass-through of {@code KAFKA_*} environment variables into a Kafka client
* {@link Properties}. {@code KAFKA_FOO_BAR_BAZ} becomes {@code foo.bar.baz}.
*
* <p>Application-level variables that share the {@code KAFKA_} prefix but are
* not Kafka client config are skipped (input/output topic, consumer group).
* These will be renamed without the prefix in the future.
*/
public final class KafkaEnvProps {

Comment on lines +7 to +16
private static final String PREFIX = "KAFKA_";

private static final Set<String> SKIP = Set.of(
"KAFKA_INPUT_TOPIC",
"KAFKA_OUTPUT_TOPIC",
"KAFKA_CONSUMER_GROUP"
);

private KafkaEnvProps() {}

public static void apply(Properties props) {
for (Map.Entry<String, String> entry : System.getenv().entrySet()) {
String key = entry.getKey();
if (!key.startsWith(PREFIX) || SKIP.contains(key)) continue;
String mapped = key.substring(PREFIX.length()).toLowerCase().replace('_', '.');
props.put(mapped, entry.getValue());
Comment on lines +29 to +32
}
Comment on lines +27 to +33
}
}
2 changes: 2 additions & 0 deletions sdk/python/flightdeck_sdk/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from .kafka_env_props import kafka_env_props
from .message_context import KafkaMessageContext
from .tool_consumer_runner import ToolConsumerRunner, ToolConsumerConfig
from .think_consumer_runner import ThinkConsumerRunner, ThinkConsumerConfig
Expand All @@ -8,4 +9,5 @@
"ToolConsumerConfig",
"ThinkConsumerRunner",
"ThinkConsumerConfig",
"kafka_env_props",
]
27 changes: 27 additions & 0 deletions sdk/python/flightdeck_sdk/kafka_env_props.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
"""Pass-through of KAFKA_* environment variables into a Kafka client config dict.

KAFKA_FOO_BAR_BAZ becomes foo.bar.baz. Application-level variables that share
the KAFKA_ prefix but are not Kafka client config are skipped (input/output
topic, consumer group). These will be renamed without the prefix in the future.
"""

import os

_PREFIX = "KAFKA_"

_SKIP = {
"KAFKA_INPUT_TOPIC",
"KAFKA_OUTPUT_TOPIC",
"KAFKA_CONSUMER_GROUP",
}


def kafka_env_props() -> dict[str, str]:
"""Return a config dict built from all KAFKA_* env vars."""
out: dict[str, str] = {}
for key, value in os.environ.items():
if not key.startswith(_PREFIX) or key in _SKIP:
continue
mapped = key[len(_PREFIX):].lower().replace("_", ".")
out[mapped] = value
return out
6 changes: 6 additions & 0 deletions sdk/python/flightdeck_sdk/think_consumer_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

from confluent_kafka import Consumer, Producer, TopicPartition

from .kafka_env_props import kafka_env_props

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -84,7 +86,10 @@ def __init__(self, config: ThinkConsumerConfig):
raise ValueError("claude_api_key is required when llm_provider='claude'")
logger.info("Using Claude LLM provider (model=%s)", config.claude_model)

env_props = kafka_env_props()

self._consumer = Consumer({
**env_props,
"bootstrap.servers": config.brokers,
"group.id": config.group_id,
"auto.offset.reset": "earliest",
Expand All @@ -94,6 +99,7 @@ def __init__(self, config: ThinkConsumerConfig):
})

self._producer = Producer({
**env_props,
"bootstrap.servers": config.brokers,
"acks": "all",
"enable.idempotence": True,
Expand Down
5 changes: 5 additions & 0 deletions sdk/python/flightdeck_sdk/tool_consumer_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from confluent_kafka import Consumer, Producer

from .kafka_env_props import kafka_env_props
from .message_context import KafkaMessageContext

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -42,7 +43,10 @@ def __init__(self, config: ToolConsumerConfig):
self._config = config
self._running = False

env_props = kafka_env_props()

self._consumer = Consumer({
**env_props,
"bootstrap.servers": config.brokers,
"group.id": config.group_id,
"auto.offset.reset": "earliest",
Expand All @@ -51,6 +55,7 @@ def __init__(self, config: ToolConsumerConfig):
})

self._producer = Producer({
**env_props,
"bootstrap.servers": config.brokers,
"acks": "all",
"enable.idempotence": True,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package io.flightdeck.think.config;

import java.util.Map;
import java.util.Properties;
import java.util.Set;

/**
* Pass-through of {@code KAFKA_*} environment variables into a Kafka client
* {@link Properties}. {@code KAFKA_FOO_BAR_BAZ} becomes {@code foo.bar.baz}.
*
* <p>Application-level variables that share the {@code KAFKA_} prefix but are
* not Kafka client config are skipped (input/output topic, consumer group).
* These will be renamed without the prefix in the future.
*
* <p>No validation is done — unknown keys are forwarded as-is and Kafka will
* log a warning and ignore them.
*/
public final class KafkaEnvProps {

private static final String PREFIX = "KAFKA_";

private static final Set<String> SKIP = Set.of(
"KAFKA_INPUT_TOPIC",
"KAFKA_OUTPUT_TOPIC",
"KAFKA_CONSUMER_GROUP"
);

private KafkaEnvProps() {}

public static void apply(Properties props) {
for (Map.Entry<String, String> entry : System.getenv().entrySet()) {
String key = entry.getKey();
if (!key.startsWith(PREFIX) || SKIP.contains(key)) continue;
String mapped = key.substring(PREFIX.length()).toLowerCase().replace('_', '.');
props.put(mapped, entry.getValue());
Comment on lines +33 to +35
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import io.flightdeck.think.config.AppConfig;
import io.flightdeck.think.config.KafkaEnvProps;
import io.flightdeck.think.model.FullSessionContext;
import io.flightdeck.think.model.MessageInput;
import io.flightdeck.think.model.ThinkResponse;
Expand Down Expand Up @@ -529,6 +530,7 @@ public void close() {

private static KafkaConsumer<String, String> createConsumer() {
Properties props = new Properties();
KafkaEnvProps.apply(props);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, AppConfig.BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, AppConfig.CONSUMER_GROUP);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
Expand All @@ -541,6 +543,7 @@ private static KafkaConsumer<String, String> createConsumer() {

private static KafkaProducer<String, String> createProducer() {
Properties props = new Properties();
KafkaEnvProps.apply(props);
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, AppConfig.BOOTSTRAP_SERVERS);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
Expand Down
Loading