Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions agents/exponential_das/normalizer.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
"""Running normalizers for observations and rewards.

Both use Welford's online algorithm for numerically stable mean/variance.
Normalisation is only updated during the warmup phase (while the buffer is
filling for the first time); afterwards the statistics are frozen. This
mirrors the StateNormalizer behaviour in the source project.

ObservationNormalizer statistics are frozen after the warmup phase (first
buffer fill) so the obs space presented to the actor/critic networks stays
stable.

RewardNormalizer keeps updating throughout training so that its per-step
statistics track the shifting reward distribution as the agent improves.
This matches the StepwiseRewardNormalizer behaviour in the reference project.
"""

from __future__ import annotations
Expand Down
7 changes: 3 additions & 4 deletions agents/exponential_das/trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,9 @@ def train(
next_obs, reward, terminated, truncated, step_info = train_env.step(action)
done = terminated or truncated

# Reward normalisation (update only during warmup)
normed_reward = agent.rew_norm.normalize(
reward, step_idx, update=not agent.buffer.warmed_up
)
# Reward normalisation: always update so stats track the shifting
# reward distribution as the agent improves (matches reference).
normed_reward = agent.rew_norm.normalize(reward, step_idx, update=True)
ep_reward += reward

agent.buffer.add(obs, action, log_prob, value, normed_reward, done)
Expand Down
61 changes: 44 additions & 17 deletions das/env/das_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ class DASEnv(gym.Env):
checkpoint_division_base (cdb):
cdb=1.0 → uniform checkpoints; cdb>1.0 → exponentially growing intervals.
reward_option:
1=log-scaled, 2=linear, 3=sparse, 4=binary (see das/env/reward.py).
1=log-scaled, 2=linear, 3=sparse, 4=binary, 5=hybrid-sign
(see das/env/reward.py).
n_individuals:
Population size per sub-optimizer. ``None`` (default) lets each
algorithm use its own built-in default. Pass a single ``int`` to
Expand Down Expand Up @@ -107,6 +108,7 @@ def __init__(
self._best_x: np.ndarray | None = None
self._worst_y = -np.inf
self._initial_range: tuple[float, float] = (float("inf"), -np.inf)
self._optimum: float | None = None
self._stagnation_count = 0
self._choices_history: list[int] = []

Expand All @@ -121,6 +123,9 @@ def reset(self, seed=None, options=None):
self._problem_idx += 1

self._problem = self.suite.get_problem(problem_id)
# Known global minimum, used only by optimum-aware reward options
# (training-only signal); None on suites that do not expose it.
self._optimum = getattr(self._problem, "optimum", None)
dim = self._problem.dimension
self._max_fe = self.fe_multiplier * dim
known = [n for n in self.n_individuals if n is not None]
Expand All @@ -129,18 +134,46 @@ def reset(self, seed=None, options=None):
)

# Reset episode bookkeeping
self._n_fe = 0
self._checkpoint_idx = 0
self._optimizer_state = {}
self._x_history = None
self._y_history = None
self._best_y = float("inf")
self._best_x = None
self._worst_y = -np.inf
self._initial_range = (float("inf"), -np.inf)
self._stagnation_count = 0
self._choices_history = []

# Agent-independent reference via a random probe. Establishing best/
# scale *before* the agent acts removes the reward-hacking incentive to
# pick a bad first optimizer just to inflate later improvement: the
# episode return telescopes to (probe_best - best_final) / scale, whose
# reference no longer depends on the agent's first action. Uses random
# sampling only, so it generalises to real problems.
lb, ub = self._problem.lower_bounds, self._problem.upper_bounds
rng = np.random.default_rng(
None
if self._seed is None
else (self._seed * 1_000_000 + self._problem_idx * 1_000) % (2**31)
)
# Cap so the probe never consumes the whole first checkpoint's budget.
n_probe = min(max(2 * dim, 50), max(int(self._checkpoints[0]) - dim, dim + 1))
x_probe = rng.uniform(lb, ub, size=(n_probe, dim))
y_probe = np.array([self._problem(x) for x in x_probe], dtype=float)
i_best = int(np.argmin(y_probe))

self._best_y = float(y_probe[i_best])
self._best_x = x_probe[i_best]
self._worst_y = float(y_probe.max())
# Robust reward scale: the upper end of the range is the *median* of the
# probe, not its max. ``max`` of a uniform sample is the noisiest
# possible statistic — driven by a single outlier — so it makes the scale
# (and therefore every reward) jitter across seeds for the same run. The
# median is stable and also shrinks the scale toward the typical objective
# spread, which improves reward resolution during late-stage refinement
# near the optimum (a max-based linear scale spends ~all its range on the
# first easy descent out of the random region).
robust_upper = float(np.median(y_probe))
self._initial_range = (self._best_y, max(robust_upper, self._best_y + 1e-5))
self._n_fe = n_probe

obs = self._build_observation()
info = {"problem_id": problem_id, "dimension": dim}
return obs, info
Expand All @@ -166,6 +199,7 @@ def step(self, action: int):
self._initial_range,
option=self.reward_option,
is_final=terminated,
optimum=self._optimum,
)

obs = self._build_observation()
Expand Down Expand Up @@ -251,17 +285,10 @@ def _update_episode_state(self, result: dict, prev_best_y: float):
if worst_y > self._worst_y:
self._worst_y = worst_y

# Set initial range on first step.
# When worst_so_far_y is absent the default is -inf, which collapses
# scale to 1e-5 and inflates every subsequent reward by 1e5. Instead,
# derive scale from the magnitude of the initial best fitness.
if self._initial_range[0] == float("inf"):
safe_worst = (
worst_y
if np.isfinite(worst_y)
else new_best_y + max(abs(new_best_y), 1.0)
)
self._initial_range = (new_best_y, max(safe_worst, new_best_y + 1e-5))
# NOTE: ``_initial_range`` (the reward reference and scale) is fixed in
# reset() from an agent-independent random probe, so it is intentionally
# not updated here — that is what prevents reward hacking on the first
# optimizer choice.

# Stagnation counter — prefer the FE delta from the result dict so that
# stagnation accumulates correctly even when y_history is not returned.
Expand Down
5 changes: 5 additions & 0 deletions das/env/ioh_suite.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ def lower_bounds(self) -> np.ndarray:
def upper_bounds(self) -> np.ndarray:
return np.asarray(self._p.bounds.ub, dtype=np.float64)

@property
def optimum(self) -> float:
"""Known global minimum (objective value) of the problem."""
return float(self._p.optimum.y)

def __call__(self, x) -> float:
return float(self._p(x))

Expand Down
120 changes: 101 additions & 19 deletions das/env/reward.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,21 @@
"""Reward functions for the DAS environment.

All functions take (new_best_y, old_best_y, initial_value_range, is_final)
and return a scalar reward. Improvement is scaled by the initial fitness range
so rewards are comparable across different problem instances.
All functions take (new_best_y, old_best_y, initial_value_range, is_final,
optimum) and return a scalar reward. Improvement is scaled by the initial
fitness range so rewards are comparable across different problem instances.

``optimum`` (the known global minimum) is optional. When it is available,
optimum-aware functions measure progress in *orders of magnitude of the gap to
the optimum* (the natural BBOB metric) instead of a probe-relative ratio that
saturates near the optimum. It is training-only — the learned policy never sees
it — and every function falls back to its probe-relative behaviour when the
optimum is ``None`` (e.g. a non-BBOB suite), so the options stay portable.
"""

import numpy as np

_GAP_FLOOR = 1e-8 # BBOB precision target: gaps below this count as "solved".

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice cap. It may interfer with AOCC computation. Double check that fitness isn't clipped there twice



def _improvement_ratio(
new_best_y: float, old_best_y: float, initial_range: tuple[float, float]
Expand All @@ -15,45 +24,117 @@ def _improvement_ratio(
return (old_best_y - new_best_y) / (scale + 1e-10)


def reward_log_scaled(new_best_y, old_best_y, initial_range, is_final=False):
def _log_gap_orders(y_from: float, y_to: float, optimum: float) -> float:
"""Orders of magnitude the gap to the optimum shrinks going y_from -> y_to.

Positive when ``y_to`` is closer to the optimum. Telescopes over a run to
log10(initial_gap / final_gap), i.e. the total accuracy (in decades) gained.
"""
old_gap = max(y_from - optimum, _GAP_FLOOR)
new_gap = max(y_to - optimum, _GAP_FLOOR)
return float(np.log10(old_gap) - np.log10(new_gap))


def _terminal_reward(final_y, initial_range, optimum) -> float:
"""Full-magnitude terminal reward, clipped to [-10, 10].

With a known optimum: orders of magnitude of accuracy gained relative to the
random-probe baseline — this does *not* saturate, so reaching gap 1e-8 is
rewarded far more than gap 1e-2 (the probe-scaled version cannot tell them
apart). Otherwise: probe-scaled total improvement (legacy behaviour).
"""
if optimum is not None:
return float(np.clip(_log_gap_orders(initial_range[0], final_y, optimum), -10.0, 10.0))
raw = _improvement_ratio(final_y, initial_range[0], initial_range)
return float(np.clip(raw, -10.0, 10.0))


def reward_log_scaled(new_best_y, old_best_y, initial_range, is_final=False, optimum=None):
"""Log-scaled incremental improvement (original r1)."""
if old_best_y == float("inf"):

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logarithm of the first reward was added in order to avoid reward hacking. Generally in the case of the rewards that do not take global optimum into account, it's hard for the reward not to get hacked. I think It's also important to keep in mind, that inserting global minimum into reward is making meta-bbo task significantly easier. It would be nice to compare global-optimum-aware rewards to each other, but not necessarily to the ones that do not take GO into account.

return float(np.log(initial_range[1] - initial_range[0] + 1e-10))
ratio = _improvement_ratio(new_best_y, old_best_y, initial_range)
return float(np.log(np.clip(ratio, 0.0, 1.0) + 1e-5))


def reward_linear(new_best_y, old_best_y, initial_range, is_final=False):
def reward_linear(new_best_y, old_best_y, initial_range, is_final=False, optimum=None):
"""Linear improvement clipped to [0, 1] (original r2)."""
if old_best_y == float("inf"):
return float(np.log(initial_range[1] - initial_range[0] + 1e-10))
return float(
np.clip(_improvement_ratio(new_best_y, old_best_y, initial_range), 0.0, 1.0)
)


def reward_sparse(new_best_y, old_best_y, initial_range, is_final=False):
"""Sparse: only reward at the final checkpoint (original r3)."""
if old_best_y == float("inf") or not is_final:
return float(np.log(initial_range[1] - initial_range[0] + 1e-10))
def reward_log_improvement(new_best_y, old_best_y, initial_range, is_final=False, optimum=None):
"""Constant reward per order-of-magnitude reduction toward the optimum."""
if optimum is None:
return reward_linear(new_best_y, old_best_y, initial_range, is_final)
return float(max(_log_gap_orders(old_best_y, new_best_y, optimum), 0.0))


def reward_sparse(new_best_y, old_best_y, initial_range, is_final=False, optimum=None):
"""Sparse: reward only at the final checkpoint (original r3)."""
if not is_final:
return 0.0
if optimum is not None:
return float(np.clip(_log_gap_orders(initial_range[0], new_best_y, optimum), 0.0, 10.0))
total_improvement = initial_range[0] - new_best_y
scale = initial_range[1] - initial_range[0]
return float(np.log(total_improvement / (scale + 1e-10) + 1e-5))


def reward_binary(new_best_y, old_best_y, initial_range, is_final=False):
def reward_binary(new_best_y, old_best_y, initial_range, is_final=False, optimum=None):

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to have Unit tests for all those reward definitions

"""Binary: 1 if improvement >= 0.1%, else 0 (original r4)."""
if old_best_y == float("inf"):
return 0.0
ratio = _improvement_ratio(new_best_y, old_best_y, initial_range)
return 1.0 if ratio >= 1e-3 else 0.0


def reward_hybrid_binary(new_best_y, old_best_y, initial_range, is_final=False, optimum=None):
"""Hybrid A: dense +0.1 progress bonus + full-magnitude terminal reward."""
if is_final:
return _terminal_reward(new_best_y, initial_range, optimum)
ratio = _improvement_ratio(new_best_y, old_best_y, initial_range)
return 0.1 if ratio > 1e-8 else 0


# Probably the best
def reward_hybrid_sign(new_best_y, old_best_y, initial_range, is_final=False, optimum=None):
"""Hybrid B: dense progress signal + full-magnitude terminal reward."""
if is_final:
return _terminal_reward(new_best_y, initial_range, optimum)

base, slope, penalty = 0.1, 1.0, 0.15

if optimum is not None:
step_threshold = 0.05

def gain(y_from, y_to):
return _log_gap_orders(y_from, y_to, optimum)
else:
step_threshold = 5e-3

def gain(y_from, y_to):
return _improvement_ratio(y_to, y_from, initial_range)

step_gain = gain(old_best_y, new_best_y)
if step_gain > step_threshold:
return float(base + slope * np.clip(step_gain, 0.0, 1.0))

# Already at the precision target: a stalled step is the goal state, not
# stagnation, so don't penalise it (otherwise solving early is discouraged).
if optimum is not None and (new_best_y - optimum) <= _GAP_FLOOR:
return 0.0

progress = max(gain(initial_range[0], new_best_y), 0.0)
shortfall = 1.0 - np.clip(step_gain / step_threshold, 0.0, 1.0)
return float(-penalty * shortfall**2 / (1.0 + progress))


REWARD_FNS = {
1: reward_log_scaled,
2: reward_linear,
3: reward_sparse,
4: reward_binary,
3: reward_log_improvement,
4: reward_sparse,
5: reward_binary,
6: reward_hybrid_binary,
7: reward_hybrid_sign,
}


Expand All @@ -63,10 +144,11 @@ def compute_reward(
initial_range: tuple[float, float],
option: int = 1,
is_final: bool = False,
optimum: float | None = None,
) -> float:
fn = REWARD_FNS.get(option)
if fn is None:
raise ValueError(
f"Unknown reward option {option}. Choose from {list(REWARD_FNS)}"
)
return fn(new_best_y, old_best_y, initial_range, is_final)
return fn(new_best_y, old_best_y, initial_range, is_final, optimum)
19 changes: 11 additions & 8 deletions tests/test_baselines.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,16 +288,16 @@ def test_fitness_history_y_is_nonincreasing(self):
assert ys[i] < ys[i - 1]

def test_fitness_history_nonempty_after_episode(self):
"""At least one improvement must occur (first evaluation beats inf)."""
"""reset() probe establishes a finite initial best; optimizer steps may
not improve on it, so fitness_history from steps can be empty."""
env = make_env()
_, fitness_history = run_episode(env, random_policy)
assert len(fitness_history) >= 1
env.reset()
assert np.isfinite(env._best_y)

def test_fixed_policy_runs_full_episode(self):
env = make_env()
step_info, fitness_history = run_episode(env, fixed_policy(0))
step_info, _ = run_episode(env, fixed_policy(0))
assert np.isfinite(step_info["best_y"])
assert len(fitness_history) >= 1

def test_episode_advances_problem_idx(self):
env = make_env()
Expand Down Expand Up @@ -858,7 +858,9 @@ def test_fitness_history_step_fe_within_budget(self):
assert 1 <= fe <= max_fe

def test_fitness_history_step_accumulated_across_checkpoints(self):
"""Full episode fitness history must contain at least as many points as one step."""
"""fitness_history_step records improvements over the probe best.
The probe in reset() may already be the episode's best, so this
list can legitimately be empty; verify it is a list of valid tuples."""
env = make_env()
env.reset()
all_history = []
Expand All @@ -868,8 +870,9 @@ def test_fitness_history_step_accumulated_across_checkpoints(self):
done = terminated or truncated
all_history.extend(info["fitness_history_step"])

# At minimum one improvement in the first checkpoint (from inf)
assert len(all_history) >= 1
assert isinstance(all_history, list)
for fe, y in all_history:
assert isinstance(fe, int) and isinstance(y, float)

def test_fitness_history_step_fe_monotone_across_episode(self):
"""FE values accumulated across all checkpoints must be strictly increasing."""
Expand Down
5 changes: 3 additions & 2 deletions tests/test_heterogeneous_portfolios.py
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,7 @@ def test_reset_clears_state_between_episodes(self, spec, fe_mult):
env.reset()
drain(env)
env.reset()
assert env._n_fe == 0
assert env._best_y == float("inf")
# reset() runs a random probe, so _n_fe > 0 and _best_y is finite
assert env._n_fe > 0
assert np.isfinite(env._best_y)
assert env._optimizer_state == {}
Loading
Loading