From 49652f12cf6e15fe45e189b434b579b09e339883 Mon Sep 17 00:00:00 2001 From: Dylan McReynolds Date: Sat, 18 Apr 2026 12:46:25 -0700 Subject: [PATCH 1/2] env variable replacement --- examples/config/env_var_example.yaml | 43 +++++++ src/_test/test_block.py | 181 +++++++++++++++++++++++++++ src/arroyopy/config.py | 101 +++++++++++++++ 3 files changed, 325 insertions(+) create mode 100644 examples/config/env_var_example.yaml diff --git a/examples/config/env_var_example.yaml b/examples/config/env_var_example.yaml new file mode 100644 index 0000000..a8de422 --- /dev/null +++ b/examples/config/env_var_example.yaml @@ -0,0 +1,43 @@ +# Example arroyo block configuration with environment variable substitution +# +# This configuration demonstrates how to use environment variables in your +# YAML configuration files. Environment variables are expanded when the +# configuration is loaded. +# +# Supported formats: +# - ${VAR_NAME} - Replace with environment variable value (error if not set) +# - ${VAR_NAME:-default} - Replace with env var, or use default if not set +# - $VAR_NAME - Simple variable expansion (error if not set) +# +# Example usage: +# export ZMQ_INPUT_ADDRESS="tcp://127.0.0.1:5555" +# export ZMQ_OUTPUT_ADDRESS="tcp://127.0.0.1:5556" +# export MESSAGE_TIMEOUT="30" +# python -m arroyopy run examples/config/env_var_example.yaml + +blocks: + - name: zmq_pipeline_with_env_vars + description: Pipeline using environment variables for configuration + + operator: + class: myapp.operators.MessageProcessor + kwargs: + # Use env var with default fallback + timeout: ${MESSAGE_TIMEOUT:-60} + batch_size: ${BATCH_SIZE:-100} + + listeners: + - class: arroyopy.zmq.ZMQListener + kwargs: + # Use env var (required - will fail if not set) + address: ${ZMQ_INPUT_ADDRESS} + socket_type: ${ZMQ_INPUT_TYPE:-SUB} + + publishers: + - class: arroyopy.zmq.ZMQPublisher + kwargs: + # Mix env vars and defaults + address: ${ZMQ_OUTPUT_ADDRESS} + socket_type: ${ZMQ_OUTPUT_TYPE:-PUB} + # Simple $VAR syntax also works + topic: $TOPIC_NAME diff --git a/src/_test/test_block.py b/src/_test/test_block.py index 2dfc441..6117a79 100644 --- a/src/_test/test_block.py +++ b/src/_test/test_block.py @@ -33,6 +33,21 @@ async def process(self, message: Message) -> Message: return message +# Concrete listener for YAML/config loading tests +class ConcreteListener(Listener): + """Simple concrete listener for testing.""" + + def __init__(self, operator=None, **kwargs): + super().__init__(operator) + self.kwargs = kwargs + + async def start(self) -> None: + pass + + async def stop(self) -> None: + pass + + # Simple class for testing invalid type validation class NotAComponent: """A class that is not a Listener, Publisher, or Operator.""" @@ -780,6 +795,172 @@ async def test_operator_start_stop_loop(): assert publisher.publish.called +# ============================================================================ +# Environment variable expansion tests +# ============================================================================ + + +def test_env_var_expansion_basic(monkeypatch): + """Test basic environment variable expansion in config.""" + monkeypatch.setenv("TEST_ADDRESS", "tcp://127.0.0.1:5555") + + yaml_content = """ +blocks: + - name: test_unit + operator: + class: _test.test_block.ConcreteOperator + listeners: + - class: _test.test_block.ConcreteListener + kwargs: + address: ${TEST_ADDRESS} +""" + + with tempfile.NamedTemporaryFile(mode="w", suffix=".yaml", delete=False) as f: + f.write(yaml_content) + yaml_path = f.name + + try: + blocks = load_blocks_from_yaml(yaml_path) + assert len(blocks) == 1 + # Environment variable was expanded + finally: + Path(yaml_path).unlink() + + +def test_env_var_expansion_with_default(monkeypatch): + """Test environment variable expansion with default values.""" + # Don't set TEST_MISSING so it uses the default + + yaml_content = """ +blocks: + - name: test_unit + operator: + class: _test.test_block.ConcreteOperator + listeners: + - class: _test.test_block.ConcreteListener + kwargs: + address: ${TEST_MISSING:-tcp://localhost:9999} +""" + + with tempfile.NamedTemporaryFile(mode="w", suffix=".yaml", delete=False) as f: + f.write(yaml_content) + yaml_path = f.name + + try: + blocks = load_blocks_from_yaml(yaml_path) + assert len(blocks) == 1 + # The defaults should be used + finally: + Path(yaml_path).unlink() + + +def test_env_var_expansion_missing_no_default(monkeypatch): + """Test that missing env var without default raises error.""" + yaml_content = """ +blocks: + - name: test_unit + operator: + class: _test.test_block.ConcreteOperator + listeners: + - class: _test.test_block.ConcreteListener + kwargs: + address: ${MISSING_VAR} +""" + + with tempfile.NamedTemporaryFile(mode="w", suffix=".yaml", delete=False) as f: + f.write(yaml_content) + yaml_path = f.name + + try: + with pytest.raises( + ConfigurationError, match="Environment variable 'MISSING_VAR' is not set" + ): + load_blocks_from_yaml(yaml_path) + finally: + Path(yaml_path).unlink() + + +def test_env_var_expansion_in_nested_values(monkeypatch): + """Test environment variable expansion in nested config values.""" + monkeypatch.setenv("REDIS_HOST", "redis.example.com") + monkeypatch.setenv("REDIS_PORT", "6379") + + yaml_content = """ +blocks: + - name: test_unit + operator: + class: _test.test_block.ConcreteOperator + listeners: + - class: _test.test_block.ConcreteListener + kwargs: + connection_string: redis://${REDIS_HOST}:${REDIS_PORT} + timeout: ${TIMEOUT:-30} +""" + + with tempfile.NamedTemporaryFile(mode="w", suffix=".yaml", delete=False) as f: + f.write(yaml_content) + yaml_path = f.name + + try: + blocks = load_blocks_from_yaml(yaml_path) + assert len(blocks) == 1 + # Connection string should have env vars expanded + finally: + Path(yaml_path).unlink() + + +def test_env_var_expansion_simple_syntax(monkeypatch): + """Test simple $VAR syntax for environment variables.""" + monkeypatch.setenv("SIMPLE_VAR", "test_value") + + yaml_content = """ +blocks: + - name: test_unit_$SIMPLE_VAR + operator: + class: _test.test_block.ConcreteOperator +""" + + with tempfile.NamedTemporaryFile(mode="w", suffix=".yaml", delete=False) as f: + f.write(yaml_content) + yaml_path = f.name + + try: + blocks = load_blocks_from_yaml(yaml_path) + assert len(blocks) == 1 + assert blocks[0].name == "test_unit_test_value" + finally: + Path(yaml_path).unlink() + + +def test_env_var_no_expansion_for_non_strings(monkeypatch): + """Test that non-string values are not affected by env var expansion.""" + monkeypatch.setenv("PORT", "5555") + + yaml_content = """ +blocks: + - name: test_unit + operator: + class: _test.test_block.ConcreteOperator + listeners: + - class: _test.test_block.ConcreteListener + kwargs: + port: 8080 + enabled: true + ratio: 0.5 +""" + + with tempfile.NamedTemporaryFile(mode="w", suffix=".yaml", delete=False) as f: + f.write(yaml_content) + yaml_path = f.name + + try: + blocks = load_blocks_from_yaml(yaml_path) + assert len(blocks) == 1 + # Non-string values should remain unchanged + finally: + Path(yaml_path).unlink() + + # ============================================================================ # Listener tests # ============================================================================ diff --git a/src/arroyopy/config.py b/src/arroyopy/config.py index cda03f7..555cca4 100644 --- a/src/arroyopy/config.py +++ b/src/arroyopy/config.py @@ -6,6 +6,8 @@ """ import importlib import logging +import os +import re from pathlib import Path from typing import Any, Dict, List, Optional, Type @@ -25,6 +27,102 @@ class ConfigurationError(Exception): pass +def _expand_env_var(value: str) -> str: + """ + Expand environment variables in a string. + + Supports the following formats: + - ${VAR_NAME} - Replace with environment variable value + - ${VAR_NAME:-default_value} - Replace with env var, or default if not set + - $VAR_NAME - Simple variable expansion + + Parameters + ---------- + value : str + String potentially containing environment variable references + + Returns + ------- + str + String with environment variables expanded + + Example + ------- + >>> os.environ['MY_VAR'] = 'test' + >>> _expand_env_var('${MY_VAR}') + 'test' + >>> _expand_env_var('${MISSING:-default}') + 'default' + """ + + # Pattern for ${VAR_NAME:-default} or ${VAR_NAME} + def replace_with_default(match): + var_expr = match.group(1) + if ":-" in var_expr: + var_name, default_value = var_expr.split(":-", 1) + return os.environ.get(var_name, default_value) + else: + var_name = var_expr + env_value = os.environ.get(var_name) + if env_value is None: + raise ConfigurationError( + f"Environment variable '{var_name}' is not set and no default provided" + ) + return env_value + + # Replace ${VAR} and ${VAR:-default} + value = re.sub(r"\$\{([^}]+)\}", replace_with_default, value) + + # Replace simple $VAR (word boundaries to avoid partial matches) + def replace_simple(match): + var_name = match.group(1) + env_value = os.environ.get(var_name) + if env_value is None: + raise ConfigurationError( + f"Environment variable '{var_name}' is not set and no default provided" + ) + return env_value + + value = re.sub(r"\$(\w+)", replace_simple, value) + + return value + + +def _expand_env_vars_in_config(config: Any) -> Any: + """ + Recursively expand environment variables in configuration values. + + Processes dictionaries, lists, and strings to replace environment variable + references with their actual values. + + Parameters + ---------- + config : Any + Configuration value (dict, list, str, or other type) + + Returns + ------- + Any + Configuration with environment variables expanded + + Example + ------- + >>> os.environ['PORT'] = '5555' + >>> config = {'address': 'tcp://127.0.0.1:${PORT}'} + >>> _expand_env_vars_in_config(config) + {'address': 'tcp://127.0.0.1:5555'} + """ + if isinstance(config, dict): + return {key: _expand_env_vars_in_config(value) for key, value in config.items()} + elif isinstance(config, list): + return [_expand_env_vars_in_config(item) for item in config] + elif isinstance(config, str): + return _expand_env_var(config) + else: + # Return other types (int, bool, etc.) unchanged + return config + + def _import_class(class_path: str) -> Type: """ Dynamically import a class from a module path. @@ -229,6 +327,9 @@ def load_blocks_from_yaml(yaml_path: str) -> List[Block]: if data is None: raise ConfigurationError("Configuration file is empty") + # Expand environment variables in the loaded data + data = _expand_env_vars_in_config(data) + # Require 'blocks' key if "blocks" not in data: raise ConfigurationError( From 8b49e3e6a88760b23c5a3b495b82e5a7dd576c05 Mon Sep 17 00:00:00 2001 From: Dylan McReynolds Date: Sat, 18 Apr 2026 12:46:59 -0700 Subject: [PATCH 2/2] update pixi.lock --- pixi.lock | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pixi.lock b/pixi.lock index d6d8e6d..68217d7 100644 --- a/pixi.lock +++ b/pixi.lock @@ -1558,8 +1558,8 @@ packages: requires_python: '>=3.9' - pypi: ./ name: arroyopy - version: 0.3.0a7 - sha256: 76dc3179c409495f9e06776fc63c6c8572405029c82a82b1368b9ce9282c9500 + version: 0.3.0a8 + sha256: 7436e19246ced34d83b1e2d9c3c4dbab699e69f4a7a9ec4b76136be477c374f4 requires_dist: - python-dotenv - pandas