Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions examples/config/env_var_example.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# Example arroyo block configuration with environment variable substitution
#
# This configuration demonstrates how to use environment variables in your
# YAML configuration files. Environment variables are expanded when the
# configuration is loaded.
#
# Supported formats:
# - ${VAR_NAME} - Replace with environment variable value (error if not set)
# - ${VAR_NAME:-default} - Replace with env var, or use default if not set
# - $VAR_NAME - Simple variable expansion (error if not set)
#
# Example usage:
# export ZMQ_INPUT_ADDRESS="tcp://127.0.0.1:5555"
# export ZMQ_OUTPUT_ADDRESS="tcp://127.0.0.1:5556"
# export MESSAGE_TIMEOUT="30"
# python -m arroyopy run examples/config/env_var_example.yaml

blocks:
- name: zmq_pipeline_with_env_vars
description: Pipeline using environment variables for configuration

operator:
class: myapp.operators.MessageProcessor
kwargs:
# Use env var with default fallback
timeout: ${MESSAGE_TIMEOUT:-60}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a .env.example file that includes all the environment variables defined in this YAML?

batch_size: ${BATCH_SIZE:-100}

listeners:
- class: arroyopy.zmq.ZMQListener
kwargs:
# Use env var (required - will fail if not set)
address: ${ZMQ_INPUT_ADDRESS}
socket_type: ${ZMQ_INPUT_TYPE:-SUB}

publishers:
- class: arroyopy.zmq.ZMQPublisher
kwargs:
# Mix env vars and defaults
address: ${ZMQ_OUTPUT_ADDRESS}
socket_type: ${ZMQ_OUTPUT_TYPE:-PUB}
# Simple $VAR syntax also works
topic: $TOPIC_NAME
4 changes: 2 additions & 2 deletions pixi.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

181 changes: 181 additions & 0 deletions src/_test/test_block.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,21 @@ async def process(self, message: Message) -> Message:
return message


# Concrete listener for YAML/config loading tests
class ConcreteListener(Listener):
"""Simple concrete listener for testing."""

def __init__(self, operator=None, **kwargs):
super().__init__(operator)
self.kwargs = kwargs

async def start(self) -> None:
pass

async def stop(self) -> None:
pass


# Simple class for testing invalid type validation
class NotAComponent:
"""A class that is not a Listener, Publisher, or Operator."""
Expand Down Expand Up @@ -780,6 +795,172 @@ async def test_operator_start_stop_loop():
assert publisher.publish.called


# ============================================================================
# Environment variable expansion tests
# ============================================================================


def test_env_var_expansion_basic(monkeypatch):
"""Test basic environment variable expansion in config."""
monkeypatch.setenv("TEST_ADDRESS", "tcp://127.0.0.1:5555")

yaml_content = """
blocks:
- name: test_unit
operator:
class: _test.test_block.ConcreteOperator
listeners:
- class: _test.test_block.ConcreteListener
kwargs:
address: ${TEST_ADDRESS}
"""

with tempfile.NamedTemporaryFile(mode="w", suffix=".yaml", delete=False) as f:
f.write(yaml_content)
yaml_path = f.name

try:
blocks = load_blocks_from_yaml(yaml_path)
assert len(blocks) == 1
# Environment variable was expanded
finally:
Path(yaml_path).unlink()


def test_env_var_expansion_with_default(monkeypatch):
"""Test environment variable expansion with default values."""
# Don't set TEST_MISSING so it uses the default

yaml_content = """
blocks:
- name: test_unit
operator:
class: _test.test_block.ConcreteOperator
listeners:
- class: _test.test_block.ConcreteListener
kwargs:
address: ${TEST_MISSING:-tcp://localhost:9999}
"""

with tempfile.NamedTemporaryFile(mode="w", suffix=".yaml", delete=False) as f:
f.write(yaml_content)
yaml_path = f.name

try:
blocks = load_blocks_from_yaml(yaml_path)
assert len(blocks) == 1
# The defaults should be used
finally:
Path(yaml_path).unlink()


def test_env_var_expansion_missing_no_default(monkeypatch):
"""Test that missing env var without default raises error."""
yaml_content = """
blocks:
- name: test_unit
operator:
class: _test.test_block.ConcreteOperator
listeners:
- class: _test.test_block.ConcreteListener
kwargs:
address: ${MISSING_VAR}
"""

with tempfile.NamedTemporaryFile(mode="w", suffix=".yaml", delete=False) as f:
f.write(yaml_content)
yaml_path = f.name

try:
with pytest.raises(
ConfigurationError, match="Environment variable 'MISSING_VAR' is not set"
):
load_blocks_from_yaml(yaml_path)
finally:
Path(yaml_path).unlink()


def test_env_var_expansion_in_nested_values(monkeypatch):
"""Test environment variable expansion in nested config values."""
monkeypatch.setenv("REDIS_HOST", "redis.example.com")
monkeypatch.setenv("REDIS_PORT", "6379")

yaml_content = """
blocks:
- name: test_unit
operator:
class: _test.test_block.ConcreteOperator
listeners:
- class: _test.test_block.ConcreteListener
kwargs:
connection_string: redis://${REDIS_HOST}:${REDIS_PORT}
timeout: ${TIMEOUT:-30}
"""

with tempfile.NamedTemporaryFile(mode="w", suffix=".yaml", delete=False) as f:
f.write(yaml_content)
yaml_path = f.name

try:
blocks = load_blocks_from_yaml(yaml_path)
assert len(blocks) == 1
# Connection string should have env vars expanded
finally:
Path(yaml_path).unlink()


def test_env_var_expansion_simple_syntax(monkeypatch):
"""Test simple $VAR syntax for environment variables."""
monkeypatch.setenv("SIMPLE_VAR", "test_value")

yaml_content = """
blocks:
- name: test_unit_$SIMPLE_VAR
operator:
class: _test.test_block.ConcreteOperator
"""

with tempfile.NamedTemporaryFile(mode="w", suffix=".yaml", delete=False) as f:
f.write(yaml_content)
yaml_path = f.name

try:
blocks = load_blocks_from_yaml(yaml_path)
assert len(blocks) == 1
assert blocks[0].name == "test_unit_test_value"
finally:
Path(yaml_path).unlink()


def test_env_var_no_expansion_for_non_strings(monkeypatch):
"""Test that non-string values are not affected by env var expansion."""
monkeypatch.setenv("PORT", "5555")

yaml_content = """
blocks:
- name: test_unit
operator:
class: _test.test_block.ConcreteOperator
listeners:
- class: _test.test_block.ConcreteListener
kwargs:
port: 8080
enabled: true
ratio: 0.5
"""

with tempfile.NamedTemporaryFile(mode="w", suffix=".yaml", delete=False) as f:
f.write(yaml_content)
yaml_path = f.name

try:
blocks = load_blocks_from_yaml(yaml_path)
assert len(blocks) == 1
# Non-string values should remain unchanged
finally:
Path(yaml_path).unlink()


# ============================================================================
# Listener tests
# ============================================================================
Expand Down
101 changes: 101 additions & 0 deletions src/arroyopy/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
"""
import importlib
import logging
import os
import re
from pathlib import Path
from typing import Any, Dict, List, Optional, Type

Expand All @@ -25,6 +27,102 @@ class ConfigurationError(Exception):
pass


def _expand_env_var(value: str) -> str:
"""
Expand environment variables in a string.

Supports the following formats:
- ${VAR_NAME} - Replace with environment variable value
- ${VAR_NAME:-default_value} - Replace with env var, or default if not set
- $VAR_NAME - Simple variable expansion

Parameters
----------
value : str
String potentially containing environment variable references

Returns
-------
str
String with environment variables expanded

Example
-------
>>> os.environ['MY_VAR'] = 'test'
>>> _expand_env_var('${MY_VAR}')
'test'
>>> _expand_env_var('${MISSING:-default}')
'default'
"""

# Pattern for ${VAR_NAME:-default} or ${VAR_NAME}
def replace_with_default(match):
var_expr = match.group(1)
if ":-" in var_expr:
var_name, default_value = var_expr.split(":-", 1)
return os.environ.get(var_name, default_value)
else:
var_name = var_expr
env_value = os.environ.get(var_name)
if env_value is None:
raise ConfigurationError(
f"Environment variable '{var_name}' is not set and no default provided"
)
return env_value

# Replace ${VAR} and ${VAR:-default}
value = re.sub(r"\$\{([^}]+)\}", replace_with_default, value)

# Replace simple $VAR (word boundaries to avoid partial matches)
def replace_simple(match):
var_name = match.group(1)
env_value = os.environ.get(var_name)
if env_value is None:
raise ConfigurationError(
f"Environment variable '{var_name}' is not set and no default provided"
)
return env_value

value = re.sub(r"\$(\w+)", replace_simple, value)

return value


def _expand_env_vars_in_config(config: Any) -> Any:
"""
Recursively expand environment variables in configuration values.

Processes dictionaries, lists, and strings to replace environment variable
references with their actual values.

Parameters
----------
config : Any
Configuration value (dict, list, str, or other type)

Returns
-------
Any
Configuration with environment variables expanded

Example
-------
>>> os.environ['PORT'] = '5555'
>>> config = {'address': 'tcp://127.0.0.1:${PORT}'}
>>> _expand_env_vars_in_config(config)
{'address': 'tcp://127.0.0.1:5555'}
"""
if isinstance(config, dict):
return {key: _expand_env_vars_in_config(value) for key, value in config.items()}
elif isinstance(config, list):
return [_expand_env_vars_in_config(item) for item in config]
elif isinstance(config, str):
return _expand_env_var(config)
else:
# Return other types (int, bool, etc.) unchanged
return config


def _import_class(class_path: str) -> Type:
"""
Dynamically import a class from a module path.
Expand Down Expand Up @@ -229,6 +327,9 @@ def load_blocks_from_yaml(yaml_path: str) -> List[Block]:
if data is None:
raise ConfigurationError("Configuration file is empty")

# Expand environment variables in the loaded data
data = _expand_env_vars_in_config(data)

# Require 'blocks' key
if "blocks" not in data:
raise ConfigurationError(
Expand Down
Loading