diff --git a/agents/exponential_das/normalizer.py b/agents/exponential_das/normalizer.py index 224a1d9..dffaf7e 100644 --- a/agents/exponential_das/normalizer.py +++ b/agents/exponential_das/normalizer.py @@ -1,9 +1,14 @@ """Running normalizers for observations and rewards. Both use Welford's online algorithm for numerically stable mean/variance. -Normalisation is only updated during the warmup phase (while the buffer is -filling for the first time); afterwards the statistics are frozen. This -mirrors the StateNormalizer behaviour in the source project. + +ObservationNormalizer statistics are frozen after the warmup phase (first +buffer fill) so the obs space presented to the actor/critic networks stays +stable. + +RewardNormalizer keeps updating throughout training so that its per-step +statistics track the shifting reward distribution as the agent improves. +This matches the StepwiseRewardNormalizer behaviour in the reference project. """ from __future__ import annotations diff --git a/agents/exponential_das/trainer.py b/agents/exponential_das/trainer.py index 9ab61b5..cfdc538 100644 --- a/agents/exponential_das/trainer.py +++ b/agents/exponential_das/trainer.py @@ -77,10 +77,9 @@ def train( next_obs, reward, terminated, truncated, step_info = train_env.step(action) done = terminated or truncated - # Reward normalisation (update only during warmup) - normed_reward = agent.rew_norm.normalize( - reward, step_idx, update=not agent.buffer.warmed_up - ) + # Reward normalisation: always update so stats track the shifting + # reward distribution as the agent improves (matches reference). + normed_reward = agent.rew_norm.normalize(reward, step_idx, update=True) ep_reward += reward agent.buffer.add(obs, action, log_prob, value, normed_reward, done) diff --git a/das/env/das_env.py b/das/env/das_env.py index 57fbe84..9dabd6f 100644 --- a/das/env/das_env.py +++ b/das/env/das_env.py @@ -39,7 +39,8 @@ class DASEnv(gym.Env): checkpoint_division_base (cdb): cdb=1.0 → uniform checkpoints; cdb>1.0 → exponentially growing intervals. reward_option: - 1=log-scaled, 2=linear, 3=sparse, 4=binary (see das/env/reward.py). + 1=log-scaled, 2=linear, 3=sparse, 4=binary, 5=hybrid-sign + (see das/env/reward.py). n_individuals: Population size per sub-optimizer. ``None`` (default) lets each algorithm use its own built-in default. Pass a single ``int`` to @@ -107,6 +108,7 @@ def __init__( self._best_x: np.ndarray | None = None self._worst_y = -np.inf self._initial_range: tuple[float, float] = (float("inf"), -np.inf) + self._optimum: float | None = None self._stagnation_count = 0 self._choices_history: list[int] = [] @@ -121,6 +123,9 @@ def reset(self, seed=None, options=None): self._problem_idx += 1 self._problem = self.suite.get_problem(problem_id) + # Known global minimum, used only by optimum-aware reward options + # (training-only signal); None on suites that do not expose it. + self._optimum = getattr(self._problem, "optimum", None) dim = self._problem.dimension self._max_fe = self.fe_multiplier * dim known = [n for n in self.n_individuals if n is not None] @@ -129,18 +134,46 @@ def reset(self, seed=None, options=None): ) # Reset episode bookkeeping - self._n_fe = 0 self._checkpoint_idx = 0 self._optimizer_state = {} self._x_history = None self._y_history = None - self._best_y = float("inf") - self._best_x = None - self._worst_y = -np.inf - self._initial_range = (float("inf"), -np.inf) self._stagnation_count = 0 self._choices_history = [] + # Agent-independent reference via a random probe. Establishing best/ + # scale *before* the agent acts removes the reward-hacking incentive to + # pick a bad first optimizer just to inflate later improvement: the + # episode return telescopes to (probe_best - best_final) / scale, whose + # reference no longer depends on the agent's first action. Uses random + # sampling only, so it generalises to real problems. + lb, ub = self._problem.lower_bounds, self._problem.upper_bounds + rng = np.random.default_rng( + None + if self._seed is None + else (self._seed * 1_000_000 + self._problem_idx * 1_000) % (2**31) + ) + # Cap so the probe never consumes the whole first checkpoint's budget. + n_probe = min(max(2 * dim, 50), max(int(self._checkpoints[0]) - dim, dim + 1)) + x_probe = rng.uniform(lb, ub, size=(n_probe, dim)) + y_probe = np.array([self._problem(x) for x in x_probe], dtype=float) + i_best = int(np.argmin(y_probe)) + + self._best_y = float(y_probe[i_best]) + self._best_x = x_probe[i_best] + self._worst_y = float(y_probe.max()) + # Robust reward scale: the upper end of the range is the *median* of the + # probe, not its max. ``max`` of a uniform sample is the noisiest + # possible statistic — driven by a single outlier — so it makes the scale + # (and therefore every reward) jitter across seeds for the same run. The + # median is stable and also shrinks the scale toward the typical objective + # spread, which improves reward resolution during late-stage refinement + # near the optimum (a max-based linear scale spends ~all its range on the + # first easy descent out of the random region). + robust_upper = float(np.median(y_probe)) + self._initial_range = (self._best_y, max(robust_upper, self._best_y + 1e-5)) + self._n_fe = n_probe + obs = self._build_observation() info = {"problem_id": problem_id, "dimension": dim} return obs, info @@ -166,6 +199,7 @@ def step(self, action: int): self._initial_range, option=self.reward_option, is_final=terminated, + optimum=self._optimum, ) obs = self._build_observation() @@ -251,17 +285,10 @@ def _update_episode_state(self, result: dict, prev_best_y: float): if worst_y > self._worst_y: self._worst_y = worst_y - # Set initial range on first step. - # When worst_so_far_y is absent the default is -inf, which collapses - # scale to 1e-5 and inflates every subsequent reward by 1e5. Instead, - # derive scale from the magnitude of the initial best fitness. - if self._initial_range[0] == float("inf"): - safe_worst = ( - worst_y - if np.isfinite(worst_y) - else new_best_y + max(abs(new_best_y), 1.0) - ) - self._initial_range = (new_best_y, max(safe_worst, new_best_y + 1e-5)) + # NOTE: ``_initial_range`` (the reward reference and scale) is fixed in + # reset() from an agent-independent random probe, so it is intentionally + # not updated here — that is what prevents reward hacking on the first + # optimizer choice. # Stagnation counter — prefer the FE delta from the result dict so that # stagnation accumulates correctly even when y_history is not returned. diff --git a/das/env/ioh_suite.py b/das/env/ioh_suite.py index a0beed5..f8c5453 100644 --- a/das/env/ioh_suite.py +++ b/das/env/ioh_suite.py @@ -41,6 +41,11 @@ def lower_bounds(self) -> np.ndarray: def upper_bounds(self) -> np.ndarray: return np.asarray(self._p.bounds.ub, dtype=np.float64) + @property + def optimum(self) -> float: + """Known global minimum (objective value) of the problem.""" + return float(self._p.optimum.y) + def __call__(self, x) -> float: return float(self._p(x)) diff --git a/das/env/reward.py b/das/env/reward.py index 933b262..7a9982d 100644 --- a/das/env/reward.py +++ b/das/env/reward.py @@ -1,12 +1,21 @@ """Reward functions for the DAS environment. -All functions take (new_best_y, old_best_y, initial_value_range, is_final) -and return a scalar reward. Improvement is scaled by the initial fitness range -so rewards are comparable across different problem instances. +All functions take (new_best_y, old_best_y, initial_value_range, is_final, +optimum) and return a scalar reward. Improvement is scaled by the initial +fitness range so rewards are comparable across different problem instances. + +``optimum`` (the known global minimum) is optional. When it is available, +optimum-aware functions measure progress in *orders of magnitude of the gap to +the optimum* (the natural BBOB metric) instead of a probe-relative ratio that +saturates near the optimum. It is training-only — the learned policy never sees +it — and every function falls back to its probe-relative behaviour when the +optimum is ``None`` (e.g. a non-BBOB suite), so the options stay portable. """ import numpy as np +_GAP_FLOOR = 1e-8 # BBOB precision target: gaps below this count as "solved". + def _improvement_ratio( new_best_y: float, old_best_y: float, initial_range: tuple[float, float] @@ -15,45 +24,117 @@ def _improvement_ratio( return (old_best_y - new_best_y) / (scale + 1e-10) -def reward_log_scaled(new_best_y, old_best_y, initial_range, is_final=False): +def _log_gap_orders(y_from: float, y_to: float, optimum: float) -> float: + """Orders of magnitude the gap to the optimum shrinks going y_from -> y_to. + + Positive when ``y_to`` is closer to the optimum. Telescopes over a run to + log10(initial_gap / final_gap), i.e. the total accuracy (in decades) gained. + """ + old_gap = max(y_from - optimum, _GAP_FLOOR) + new_gap = max(y_to - optimum, _GAP_FLOOR) + return float(np.log10(old_gap) - np.log10(new_gap)) + + +def _terminal_reward(final_y, initial_range, optimum) -> float: + """Full-magnitude terminal reward, clipped to [-10, 10]. + + With a known optimum: orders of magnitude of accuracy gained relative to the + random-probe baseline — this does *not* saturate, so reaching gap 1e-8 is + rewarded far more than gap 1e-2 (the probe-scaled version cannot tell them + apart). Otherwise: probe-scaled total improvement (legacy behaviour). + """ + if optimum is not None: + return float(np.clip(_log_gap_orders(initial_range[0], final_y, optimum), -10.0, 10.0)) + raw = _improvement_ratio(final_y, initial_range[0], initial_range) + return float(np.clip(raw, -10.0, 10.0)) + + +def reward_log_scaled(new_best_y, old_best_y, initial_range, is_final=False, optimum=None): """Log-scaled incremental improvement (original r1).""" - if old_best_y == float("inf"): - return float(np.log(initial_range[1] - initial_range[0] + 1e-10)) ratio = _improvement_ratio(new_best_y, old_best_y, initial_range) return float(np.log(np.clip(ratio, 0.0, 1.0) + 1e-5)) -def reward_linear(new_best_y, old_best_y, initial_range, is_final=False): +def reward_linear(new_best_y, old_best_y, initial_range, is_final=False, optimum=None): """Linear improvement clipped to [0, 1] (original r2).""" - if old_best_y == float("inf"): - return float(np.log(initial_range[1] - initial_range[0] + 1e-10)) return float( np.clip(_improvement_ratio(new_best_y, old_best_y, initial_range), 0.0, 1.0) ) -def reward_sparse(new_best_y, old_best_y, initial_range, is_final=False): - """Sparse: only reward at the final checkpoint (original r3).""" - if old_best_y == float("inf") or not is_final: - return float(np.log(initial_range[1] - initial_range[0] + 1e-10)) +def reward_log_improvement(new_best_y, old_best_y, initial_range, is_final=False, optimum=None): + """Constant reward per order-of-magnitude reduction toward the optimum.""" + if optimum is None: + return reward_linear(new_best_y, old_best_y, initial_range, is_final) + return float(max(_log_gap_orders(old_best_y, new_best_y, optimum), 0.0)) + + +def reward_sparse(new_best_y, old_best_y, initial_range, is_final=False, optimum=None): + """Sparse: reward only at the final checkpoint (original r3).""" + if not is_final: + return 0.0 + if optimum is not None: + return float(np.clip(_log_gap_orders(initial_range[0], new_best_y, optimum), 0.0, 10.0)) total_improvement = initial_range[0] - new_best_y scale = initial_range[1] - initial_range[0] return float(np.log(total_improvement / (scale + 1e-10) + 1e-5)) -def reward_binary(new_best_y, old_best_y, initial_range, is_final=False): +def reward_binary(new_best_y, old_best_y, initial_range, is_final=False, optimum=None): """Binary: 1 if improvement >= 0.1%, else 0 (original r4).""" - if old_best_y == float("inf"): - return 0.0 ratio = _improvement_ratio(new_best_y, old_best_y, initial_range) return 1.0 if ratio >= 1e-3 else 0.0 +def reward_hybrid_binary(new_best_y, old_best_y, initial_range, is_final=False, optimum=None): + """Hybrid A: dense +0.1 progress bonus + full-magnitude terminal reward.""" + if is_final: + return _terminal_reward(new_best_y, initial_range, optimum) + ratio = _improvement_ratio(new_best_y, old_best_y, initial_range) + return 0.1 if ratio > 1e-8 else 0 + + +# Probably the best +def reward_hybrid_sign(new_best_y, old_best_y, initial_range, is_final=False, optimum=None): + """Hybrid B: dense progress signal + full-magnitude terminal reward.""" + if is_final: + return _terminal_reward(new_best_y, initial_range, optimum) + + base, slope, penalty = 0.1, 1.0, 0.15 + + if optimum is not None: + step_threshold = 0.05 + + def gain(y_from, y_to): + return _log_gap_orders(y_from, y_to, optimum) + else: + step_threshold = 5e-3 + + def gain(y_from, y_to): + return _improvement_ratio(y_to, y_from, initial_range) + + step_gain = gain(old_best_y, new_best_y) + if step_gain > step_threshold: + return float(base + slope * np.clip(step_gain, 0.0, 1.0)) + + # Already at the precision target: a stalled step is the goal state, not + # stagnation, so don't penalise it (otherwise solving early is discouraged). + if optimum is not None and (new_best_y - optimum) <= _GAP_FLOOR: + return 0.0 + + progress = max(gain(initial_range[0], new_best_y), 0.0) + shortfall = 1.0 - np.clip(step_gain / step_threshold, 0.0, 1.0) + return float(-penalty * shortfall**2 / (1.0 + progress)) + + REWARD_FNS = { 1: reward_log_scaled, 2: reward_linear, - 3: reward_sparse, - 4: reward_binary, + 3: reward_log_improvement, + 4: reward_sparse, + 5: reward_binary, + 6: reward_hybrid_binary, + 7: reward_hybrid_sign, } @@ -63,10 +144,11 @@ def compute_reward( initial_range: tuple[float, float], option: int = 1, is_final: bool = False, + optimum: float | None = None, ) -> float: fn = REWARD_FNS.get(option) if fn is None: raise ValueError( f"Unknown reward option {option}. Choose from {list(REWARD_FNS)}" ) - return fn(new_best_y, old_best_y, initial_range, is_final) + return fn(new_best_y, old_best_y, initial_range, is_final, optimum) diff --git a/tests/test_baselines.py b/tests/test_baselines.py index 8a5ec6d..30fd4f5 100644 --- a/tests/test_baselines.py +++ b/tests/test_baselines.py @@ -288,16 +288,16 @@ def test_fitness_history_y_is_nonincreasing(self): assert ys[i] < ys[i - 1] def test_fitness_history_nonempty_after_episode(self): - """At least one improvement must occur (first evaluation beats inf).""" + """reset() probe establishes a finite initial best; optimizer steps may + not improve on it, so fitness_history from steps can be empty.""" env = make_env() - _, fitness_history = run_episode(env, random_policy) - assert len(fitness_history) >= 1 + env.reset() + assert np.isfinite(env._best_y) def test_fixed_policy_runs_full_episode(self): env = make_env() - step_info, fitness_history = run_episode(env, fixed_policy(0)) + step_info, _ = run_episode(env, fixed_policy(0)) assert np.isfinite(step_info["best_y"]) - assert len(fitness_history) >= 1 def test_episode_advances_problem_idx(self): env = make_env() @@ -858,7 +858,9 @@ def test_fitness_history_step_fe_within_budget(self): assert 1 <= fe <= max_fe def test_fitness_history_step_accumulated_across_checkpoints(self): - """Full episode fitness history must contain at least as many points as one step.""" + """fitness_history_step records improvements over the probe best. + The probe in reset() may already be the episode's best, so this + list can legitimately be empty; verify it is a list of valid tuples.""" env = make_env() env.reset() all_history = [] @@ -868,8 +870,9 @@ def test_fitness_history_step_accumulated_across_checkpoints(self): done = terminated or truncated all_history.extend(info["fitness_history_step"]) - # At minimum one improvement in the first checkpoint (from inf) - assert len(all_history) >= 1 + assert isinstance(all_history, list) + for fe, y in all_history: + assert isinstance(fe, int) and isinstance(y, float) def test_fitness_history_step_fe_monotone_across_episode(self): """FE values accumulated across all checkpoints must be strictly increasing.""" diff --git a/tests/test_heterogeneous_portfolios.py b/tests/test_heterogeneous_portfolios.py index fb85bfe..e4fa8a4 100644 --- a/tests/test_heterogeneous_portfolios.py +++ b/tests/test_heterogeneous_portfolios.py @@ -370,6 +370,7 @@ def test_reset_clears_state_between_episodes(self, spec, fe_mult): env.reset() drain(env) env.reset() - assert env._n_fe == 0 - assert env._best_y == float("inf") + # reset() runs a random probe, so _n_fe > 0 and _best_y is finite + assert env._n_fe > 0 + assert np.isfinite(env._best_y) assert env._optimizer_state == {} diff --git a/tests/test_parallel_envs.py b/tests/test_parallel_envs.py index b17bedd..15edb8e 100644 --- a/tests/test_parallel_envs.py +++ b/tests/test_parallel_envs.py @@ -169,8 +169,9 @@ def test_reset_clears_all_episode_state(self): env.step(0) env.step(0) # mid-episode env.reset() # full reset - assert env._n_fe == 0 - assert env._best_y == float("inf") + # reset() runs a random probe, so _n_fe > 0 and _best_y is finite + assert env._n_fe > 0 + assert np.isfinite(env._best_y) assert env._checkpoint_idx == 0 assert env._choices_history == [] assert env._optimizer_state == {} @@ -202,10 +203,11 @@ def test_best_y_is_independent(self): env_b = make_env(suite=suite) env_a.reset() env_b.reset() + best_y_before = env_b._best_y # probe value set during reset env_a.step(0) - assert env_b._best_y == float("inf") + assert env_b._best_y == best_y_before def test_optimizer_state_does_not_leak(self): """Warm-start population in env_a must not appear in env_b."""