enable sending any kafka props#25
Merged
Merged
Conversation
There was a problem hiding this comment.
Pull request overview
This PR adds a mechanism to pass through KAFKA_* environment variables into Kafka client configurations across the Java services and the Python SDK, enabling users to inject Kafka authentication and other client properties (issue #24).
Changes:
- Introduced
KafkaEnvProps(Java) /kafka_env_props()(Python) helpers that mapKAFKA_FOO_BAR→foo.barand skip app-levelKAFKA_*vars (topics, consumer group). - Applied the env-derived Kafka properties to Kafka Consumers/Producers/Admin clients across multiple apps (chat-api, think-consumer, memoir consumer, monitoring consumer, streams processing app).
- Exported the Python helper in the SDK and updated an example to use it.
Reviewed changes
Copilot reviewed 17 out of 17 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| think/think-consumer/src/main/java/io/flightdeck/think/consumer/ThinkConsumer.java | Applies env-derived Kafka props to the Think consumer/producer configs. |
| think/think-consumer/src/main/java/io/flightdeck/think/config/KafkaEnvProps.java | Adds env → Kafka Properties pass-through helper for think-consumer. |
| sdk/python/flightdeck_sdk/tool_consumer_runner.py | Merges env-derived Kafka props into SDK tool runner Consumer/Producer configs. |
| sdk/python/flightdeck_sdk/think_consumer_runner.py | Merges env-derived Kafka props into SDK think runner Consumer/Producer configs. |
| sdk/python/flightdeck_sdk/kafka_env_props.py | Adds Python env → Kafka config dict helper. |
| sdk/python/flightdeck_sdk/init.py | Exposes kafka_env_props as part of the public SDK API. |
| processor-apps/processing/src/main/java/io/flightdeck/streams/KafkaEnvProps.java | Adds env → Kafka Properties helper for the Streams processing app. |
| processor-apps/processing/src/main/java/io/flightdeck/streams/FlightDeckStreamsApp.java | Applies env-derived Kafka props to Streams and AdminClient configs. |
| monitoring/logging-consumer/src/main/java/io/flightdeck/monitoring/MonitoringApp.java | Applies env-derived Kafka props to monitoring consumer config. |
| monitoring/logging-consumer/src/main/java/io/flightdeck/monitoring/KafkaEnvProps.java | Adds env → Kafka Properties helper for monitoring module. |
| memoir/update-memoir-consumer/src/main/java/io/flightdeck/memoir/consumer/UpdateMemoirConsumer.java | Applies env-derived Kafka props to memoir consumer/producer configs. |
| memoir/update-memoir-consumer/src/main/java/io/flightdeck/memoir/config/KafkaEnvProps.java | Adds env → Kafka Properties helper for memoir module. |
| examples/system-ops/kafka-tool-service/app.py | Uses SDK helper to pass env-derived Kafka props into AdminClient / lag Consumer. |
| api/chat-api/src/main/java/io/flightdeck/api/PipelineConsumer.java | Applies env-derived Kafka props to pipeline consumer config. |
| api/chat-api/src/main/java/io/flightdeck/api/OutputConsumer.java | Applies env-derived Kafka props to output consumer config. |
| api/chat-api/src/main/java/io/flightdeck/api/KafkaMessageProducer.java | Applies env-derived Kafka props to chat-api producer config. |
| api/chat-api/src/main/java/io/flightdeck/api/KafkaEnvProps.java | Adds env → Kafka Properties helper for chat-api module. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Comment on lines
+33
to
+35
| if (!key.startsWith(PREFIX) || SKIP.contains(key)) continue; | ||
| String mapped = key.substring(PREFIX.length()).toLowerCase().replace('_', '.'); | ||
| props.put(mapped, entry.getValue()); |
Comment on lines
+29
to
+32
| String key = entry.getKey(); | ||
| if (!key.startsWith(PREFIX) || SKIP.contains(key)) continue; | ||
| String mapped = key.substring(PREFIX.length()).toLowerCase().replace('_', '.'); | ||
| props.put(mapped, entry.getValue()); |
Comment on lines
+29
to
+32
| String key = entry.getKey(); | ||
| if (!key.startsWith(PREFIX) || SKIP.contains(key)) continue; | ||
| String mapped = key.substring(PREFIX.length()).toLowerCase().replace('_', '.'); | ||
| props.put(mapped, entry.getValue()); |
Comment on lines
+29
to
+32
| String key = entry.getKey(); | ||
| if (!key.startsWith(PREFIX) || SKIP.contains(key)) continue; | ||
| String mapped = key.substring(PREFIX.length()).toLowerCase().replace('_', '.'); | ||
| props.put(mapped, entry.getValue()); |
Comment on lines
+29
to
+32
| String key = entry.getKey(); | ||
| if (!key.startsWith(PREFIX) || SKIP.contains(key)) continue; | ||
| String mapped = key.substring(PREFIX.length()).toLowerCase().replace('_', '.'); | ||
| props.put(mapped, entry.getValue()); |
Comment on lines
+7
to
+16
| /** | ||
| * Pass-through of {@code KAFKA_*} environment variables into a Kafka client | ||
| * {@link Properties}. {@code KAFKA_FOO_BAR_BAZ} becomes {@code foo.bar.baz}. | ||
| * | ||
| * <p>Application-level variables that share the {@code KAFKA_} prefix but are | ||
| * not Kafka client config are skipped (input/output topic, consumer group). | ||
| * These will be renamed without the prefix in the future. | ||
| */ | ||
| public final class KafkaEnvProps { | ||
|
|
Comment on lines
+27
to
+33
| public static void apply(Properties props) { | ||
| for (Map.Entry<String, String> entry : System.getenv().entrySet()) { | ||
| String key = entry.getKey(); | ||
| if (!key.startsWith(PREFIX) || SKIP.contains(key)) continue; | ||
| String mapped = key.substring(PREFIX.length()).toLowerCase().replace('_', '.'); | ||
| props.put(mapped, entry.getValue()); | ||
| } |
| continue | ||
| mapped = key[len(_PREFIX):].lower().replace("_", ".") | ||
| out[mapped] = value | ||
| return out |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
partially fixes #24