From 1a33a0a1bbe884f7d757f57c673374c77229ba27 Mon Sep 17 00:00:00 2001 From: Brian McMahon Date: Fri, 12 Jun 2026 13:56:52 -0700 Subject: [PATCH] fix: quiet yfinance per-symbol ERROR spray + aggregate coverage per run; split-ratio hint on OVERWRITE errors (config#1029, config#1030) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two Flow Doctor noise/diagnosis fixes from the 2026-06-12 session: 1. metron_market_data: yfinance's internal logger emits >=5 distinctly-worded ERROR records for ONE unpriceable symbol (the PCKM 401(k) CIT storm — 5 emails per EOD run). All default yfinance fetchers now run under _quiet_yfinance (logger held to CRITICAL for the fetch), and per-symbol coverage aggregates into ONE record per artifact per run via _log_yf_coverage: partial miss = single WARN naming the symbols; full miss on the load-bearing closes artifact = single ERROR (outage surface). Recording surface preserved per no-silent-fails. 2. daily_closes: polygon_only OVERWRITE ERRORs now carry a split-ratio hint when prior/new sits within 0.5% of a clean N:1 ratio (2..50, both directions). KLAC's 10-for-1 split restated three dates by exactly /10 and the LLM auto-diagnosis blamed a producer decimal-shift bug because the message only said "90.00% diff" (data#417-419). The hint puts the strongest evidence in the message for any diagnoser, human or LLM. Co-Authored-By: Claude Fable 5 --- collectors/daily_closes.py | 35 +++++++++- collectors/metron_market_data.py | 80 ++++++++++++++++++++++ tests/test_daily_closes_split_hint.py | 65 ++++++++++++++++++ tests/test_metron_yf_noise_aggregation.py | 82 +++++++++++++++++++++++ 4 files changed, 261 insertions(+), 1 deletion(-) create mode 100644 tests/test_daily_closes_split_hint.py create mode 100644 tests/test_metron_yf_noise_aggregation.py diff --git a/collectors/daily_closes.py b/collectors/daily_closes.py index cfc23cc..221a689 100644 --- a/collectors/daily_closes.py +++ b/collectors/daily_closes.py @@ -1103,6 +1103,38 @@ def _fetch_polygon_closes_per_ticker( return recovered +_SPLIT_RATIO_MAX = 50 # largest forward/reverse split factor worth hinting (covers 20:1 NVDA-class splits) +_SPLIT_RATIO_TOL = 0.005 # adjusted closes restate by the EXACT factor; 0.5% absorbs feed rounding + + +def _split_ratio_hint(prior: float, new: float) -> str: + """Corporate-action hint when prior/new sits on a clean split ratio, else "". + + A forward N-for-1 split divides the adjusted close by exactly N (a reverse + 1-for-N multiplies by N), so a cross-source overwrite whose ratio is within + ``_SPLIT_RATIO_TOL`` of an integer 2..``_SPLIT_RATIO_MAX`` is far more likely + a split restatement than a code bug. Surfacing the ratio in the ERROR message + hands the strongest evidence to whoever diagnoses it — the KLAC 10:1 split + (2026-06-10) was auto-diagnosed as a producer decimal-shift bug because the + message only said "90.00% diff" (data#417-419, config#1030). + """ + if prior <= 0 or new <= 0: + return "" + for big, small, template in ( + (prior, new, "%d-for-1 forward stock split"), + (new, prior, "1-for-%d reverse stock split"), + ): + ratio = big / small + n = round(ratio) + if 2 <= n <= _SPLIT_RATIO_MAX and abs(ratio - n) / n <= _SPLIT_RATIO_TOL: + return ( + " [ratio = %d:1 — consistent with a %s restating adjusted history " + "(corporate action), check the split calendar before suspecting a code bug]" + % (n, template % n) + ) + return "" + + def _log_close_discrepancies( new_df: pd.DataFrame, prior_close: dict[str, float], @@ -1151,9 +1183,10 @@ def _log_close_discrepancies( n_restatement += 1 elif pct_diff > _DISCREPANCY_ERROR_PCT: logger.error( - "polygon_only OVERWRITE %s @ %s: Close %.4f → %.4f (%.2f%% diff vs prior parquet) — " + "polygon_only OVERWRITE %s @ %s: Close %.4f → %.4f (%.2f%% diff vs prior parquet)%s — " "investigate before downstream consumers re-read", ticker, run_date, prior, float(new_close), pct_diff * 100, + _split_ratio_hint(prior, float(new_close)), ) n_error += 1 elif pct_diff > _DISCREPANCY_WARN_PCT: diff --git a/collectors/metron_market_data.py b/collectors/metron_market_data.py index 214386c..2a7ddd8 100644 --- a/collectors/metron_market_data.py +++ b/collectors/metron_market_data.py @@ -30,6 +30,8 @@ from __future__ import annotations import argparse +import contextlib +import functools import json import logging import time @@ -155,6 +157,70 @@ def load_metron_universe(bucket: str, s3_client: Any) -> tuple[list[dict], list[ # ── yfinance fetchers (default sources) ───────────────────────────────────── +@contextlib.contextmanager +def _quiet_yfinance(): + """Demote yfinance's internal logger to CRITICAL for the duration of a fetch. + + yfinance logs its own ERROR record per failing symbol — ≥5 distinctly-worded + messages for ONE unpriceable ticker (quoteSummary 404, "possibly delisted" + in two forms, "1 Failed download:", per-period repeats). Flow Doctor keys + dedup on message text, so each line became its own report/email — the + 2026-06-12 PCKM storm (config#1029). Failure recording is NOT lost: each + fetcher aggregates per-symbol coverage into one record per run via + ``_log_yf_coverage`` below, which is the named recording surface. + """ + yf_logger = logging.getLogger("yfinance") + prior_level = yf_logger.level + yf_logger.setLevel(logging.CRITICAL) + try: + yield + finally: + yf_logger.setLevel(prior_level) + + +def _yf_quiet(fn): + """Run a yfinance fetcher under ``_quiet_yfinance`` (see rationale there).""" + @functools.wraps(fn) + def wrapper(*args, **kwargs): + with _quiet_yfinance(): + return fn(*args, **kwargs) + return wrapper + + +def _log_yf_coverage( + kind: str, + requested: list[str], + covered: dict | set, + *, + error_on_empty: bool = False, +) -> None: + """One aggregated coverage record per artifact per run (config#1029). + + Replaces yfinance's per-symbol ERROR spray (demoted by ``_quiet_yfinance``): + symbols with no data are reported as a SINGLE WARN naming them all. A full + miss on a non-empty request escalates to ERROR only where the caller says + the artifact is load-bearing (``error_on_empty`` — closes), so a Yahoo + outage surfaces once, loudly, instead of once per artifact per symbol. + """ + missing = sorted(set(requested) - set(covered)) + if not missing: + return + if error_on_empty and not covered: + logger.error( + "[metron_market_data] %s: yfinance returned NO data for any of %d requested " + "symbols (%s) — full-miss on a load-bearing artifact (provider outage?)", + kind, len(missing), ", ".join(missing), + ) + return + logger.warning( + "[metron_market_data] %s: no yfinance data for %d/%d symbols: %s — " + "non-listed instruments (e.g. 401(k) CITs) are broker-snapshot-priced " + "in Metron and belong out of the published universe (config#1029)", + kind, len(missing), len(requested), ", ".join(missing), + ) + + +@_yf_quiet def _yfinance_closes(yf_symbols: list[str]) -> dict[str, tuple[float, str]]: """Latest daily close per yf_symbol via yfinance → ``{yf_symbol: (close, bar_date)}``. Foreign listings (``.HK``/``.PA``/…) resolve natively. Unpriceable symbols omitted.""" @@ -192,9 +258,11 @@ def _yfinance_closes(yf_symbols: list[str]) -> dict[str, tuple[float, str]]: except Exception as e: logger.warning("[metron_market_data] yfinance close batch failed: %s", e) logger.info("[metron_market_data] closes: %d/%d symbols priced", len(out), len(yf_symbols)) + _log_yf_coverage("closes", yf_symbols, out, error_on_empty=True) return out +@_yf_quiet def _yfinance_fx(currencies: list[str], base: str = BASE_CURRENCY) -> dict[str, float]: """Latest FX rate per currency via yfinance ``{CCY}{BASE}=X`` → ``{CCY: rate}`` (``base`` per 1 unit of ``CCY``). Unresolvable pairs omitted — no fabrication.""" @@ -230,9 +298,11 @@ def _yfinance_fx(currencies: list[str], base: str = BASE_CURRENCY) -> dict[str, except Exception as e: logger.warning("[metron_market_data] yfinance FX batch failed: %s", e) logger.info("[metron_market_data] fx: %d/%d currencies resolved", len(out), len(pairs)) + _log_yf_coverage("fx", list(pairs.values()), out) return out +@_yf_quiet def _yf_history(symbols: list[str], period: str, *, is_fx: bool = False, base: str = BASE_CURRENCY) -> dict[str, list[tuple[str, float]]]: """Daily close series per symbol via yfinance over ``period`` → ``{key: [(bar_date, close), …]}`` ascending. ``is_fx`` maps a currency to the @@ -270,6 +340,7 @@ def _yf_history(symbols: list[str], period: str, *, is_fx: bool = False, base: s except Exception as e: logger.warning("[metron_market_data] yfinance history batch failed: %s", e) logger.info("[metron_market_data] history: %d/%d series captured", len(out), len(targets)) + _log_yf_coverage("fx_history" if is_fx else "close_history", list(targets.values()), out) return out @@ -281,6 +352,7 @@ def _yfinance_fx_history(currencies: list[str], period: str = DEFAULT_HISTORY_PE return _yf_history(currencies, period, is_fx=True) +@_yf_quiet def _yfinance_sectors(yf_symbols: list[str]) -> dict[str, str]: """Canonical GICS sector per held symbol via ``yf.Ticker(sym).info['sector']``. Fail-soft: an unclassifiable symbol is omitted (Metron shows a coverage gap).""" @@ -297,9 +369,11 @@ def _yfinance_sectors(yf_symbols: list[str]) -> dict[str, str]: except Exception as e: logger.warning("[metron_market_data] sector fetch failed for %s: %s", sym, e) logger.info("[metron_market_data] sectors: %d/%d classified", len(out), len(yf_symbols)) + _log_yf_coverage("sectors", yf_symbols, out) return out +@_yf_quiet def _yfinance_spy_weights() -> dict[str, float]: """SPY's live GICS sector weights (canonical label → fraction) via ``funds_data.sector_weightings`` (snake_case → canonical). ``{}`` on failure.""" @@ -315,6 +389,7 @@ def _yfinance_spy_weights() -> dict[str, float]: return {_FUNDS_SECTOR_KEY[k]: float(v) for k, v in raw.items() if k in _FUNDS_SECTOR_KEY} +@_yf_quiet def _yfinance_earnings(yf_symbols: list[str]) -> dict[str, str]: """Next (earliest upcoming) earnings date per held symbol via yfinance → ``{yf_symbol: date_iso}``. Fail-soft: no resolvable date → omitted.""" @@ -337,9 +412,11 @@ def _yfinance_earnings(yf_symbols: list[str]) -> dict[str, str]: except Exception as e: logger.warning("[metron_market_data] earnings fetch failed for %s: %s", sym, e) logger.info("[metron_market_data] earnings: %d/%d dated", len(out), len(yf_symbols)) + _log_yf_coverage("earnings", yf_symbols, out) return out +@_yf_quiet def _yfinance_fundamentals(yf_symbols: list[str]) -> dict[str, dict]: """Tearsheet fundamentals per held symbol via ``yf.Ticker(sym).info`` → ``{yf_symbol: {info_key: value}}`` over ``FUNDAMENTALS_INFO_KEYS``. @@ -363,9 +440,11 @@ def _yfinance_fundamentals(yf_symbols: list[str]) -> dict[str, dict]: if fields: out[sym] = fields logger.info("[metron_market_data] fundamentals: %d/%d symbols covered", len(out), len(yf_symbols)) + _log_yf_coverage("fundamentals", yf_symbols, out) return out +@_yf_quiet def _yfinance_intraday(yf_symbols: list[str]) -> dict[str, dict]: """Latest (~15-min delayed) quote + session context per symbol, one batched 2-day daily-bar download → ``{yf_symbol: quote}`` where quote = @@ -414,6 +493,7 @@ def _yfinance_intraday(yf_symbols: list[str]) -> dict[str, dict]: except Exception as e: logger.warning("[metron_market_data] yfinance intraday batch failed: %s", e) logger.info("[metron_market_data] intraday: %d/%d symbols quoted", len(out), len(yf_symbols)) + _log_yf_coverage("intraday", yf_symbols, out) return out diff --git a/tests/test_daily_closes_split_hint.py b/tests/test_daily_closes_split_hint.py new file mode 100644 index 0000000..0f724f5 --- /dev/null +++ b/tests/test_daily_closes_split_hint.py @@ -0,0 +1,65 @@ +"""Split-ratio hint on polygon_only OVERWRITE ERRORs (config#1030). + +KLAC's 10-for-1 split (effective 2026-06-10) restated three windowed dates by +exactly ÷10; the ERROR messages said only "90.00% diff" and the LLM +auto-diagnosis blamed a producer decimal-shift bug (data#417-419). The hint +puts the strongest evidence — the clean integer ratio — in the message itself. +""" + +from __future__ import annotations + +import logging + +import pandas as pd + +from collectors import daily_closes + + +class TestSplitRatioHint: + def test_klac_forward_split_ratio_detected(self): + hint = daily_closes._split_ratio_hint(2139.37, 213.937) + assert "10:1" in hint + assert "10-for-1 forward stock split" in hint + + def test_reverse_split_ratio_detected(self): + hint = daily_closes._split_ratio_hint(2.5, 25.0) + assert "10:1" in hint + assert "1-for-10 reverse stock split" in hint + + def test_plain_drift_yields_no_hint(self): + # 7% cross-source drift — over the ERROR band but nowhere near a clean ratio. + assert daily_closes._split_ratio_hint(100.0, 93.0) == "" + + def test_ratio_outside_tolerance_yields_no_hint(self): + # ÷9.8 is 2% off 10:1 — a genuine anomaly must not be masked as a split. + assert daily_closes._split_ratio_hint(980.0, 100.0) == "" + + def test_degenerate_inputs_yield_no_hint(self): + assert daily_closes._split_ratio_hint(0.0, 100.0) == "" + assert daily_closes._split_ratio_hint(100.0, -1.0) == "" + + def test_two_for_one_boundary_detected(self): + assert "2:1" in daily_closes._split_ratio_hint(100.0, 50.0) + + def test_unity_ratio_never_hints(self): + # 1:1 (no diff) must not match the N>=2 floor. + assert daily_closes._split_ratio_hint(100.0, 100.0) == "" + + +class TestOverwriteErrorCarriesHint: + def test_error_record_includes_split_hint(self, caplog): + new_df = pd.DataFrame({"Close": [213.937]}, index=["KLAC"]) + with caplog.at_level(logging.DEBUG): + daily_closes._log_close_discrepancies(new_df, {"KLAC": 2139.37}, "2026-06-09") + errors = [r for r in caplog.records if r.levelno == logging.ERROR] + assert len(errors) == 1 + assert "polygon_only OVERWRITE KLAC" in errors[0].message + assert "10:1" in errors[0].message + + def test_non_split_error_record_has_no_hint(self, caplog): + new_df = pd.DataFrame({"Close": [93.0]}, index=["AAPL"]) + with caplog.at_level(logging.DEBUG): + daily_closes._log_close_discrepancies(new_df, {"AAPL": 100.0}, "2026-06-09") + errors = [r for r in caplog.records if r.levelno == logging.ERROR] + assert len(errors) == 1 + assert "ratio" not in errors[0].message diff --git a/tests/test_metron_yf_noise_aggregation.py b/tests/test_metron_yf_noise_aggregation.py new file mode 100644 index 0000000..4b1db7b --- /dev/null +++ b/tests/test_metron_yf_noise_aggregation.py @@ -0,0 +1,82 @@ +"""yfinance log-noise suppression + per-run coverage aggregation (config#1029). + +The 2026-06-12 PCKM storm: one unpriceable holding (a 401(k) CIT yfinance can +never price) produced ≥5 distinctly-worded yfinance ERROR log records per EOD +run, each becoming its own Flow Doctor report/email. The fix demotes yfinance's +internal logger for the duration of each fetch and aggregates per-symbol +coverage into ONE record per artifact per run — the named recording surface. +""" + +from __future__ import annotations + +import logging + +import pytest + +from collectors import metron_market_data as mmd + + +class TestQuietYfinance: + def test_demotes_yfinance_logger_inside_and_restores_after(self): + yf_logger = logging.getLogger("yfinance") + yf_logger.setLevel(logging.DEBUG) + try: + with mmd._quiet_yfinance(): + assert yf_logger.level == logging.CRITICAL + # The PCKM failure mode: yfinance ERROR records must not pass + # the logger's own level while a fetch is in flight. + assert not yf_logger.isEnabledFor(logging.ERROR) + assert yf_logger.level == logging.DEBUG + finally: + yf_logger.setLevel(logging.NOTSET) + + def test_restores_level_even_when_fetch_raises(self): + yf_logger = logging.getLogger("yfinance") + yf_logger.setLevel(logging.INFO) + try: + with pytest.raises(RuntimeError): + with mmd._quiet_yfinance(): + raise RuntimeError("batch failed") + assert yf_logger.level == logging.INFO + finally: + yf_logger.setLevel(logging.NOTSET) + + def test_all_yfinance_fetchers_are_wrapped(self): + # The decorator is the chokepoint: every default yfinance source must + # run quieted, or one bad symbol storms Flow Doctor again. + for fn in ( + mmd._yfinance_closes, mmd._yfinance_fx, mmd._yf_history, + mmd._yfinance_sectors, mmd._yfinance_spy_weights, + mmd._yfinance_earnings, mmd._yfinance_fundamentals, + mmd._yfinance_intraday, + ): + assert hasattr(fn, "__wrapped__"), f"{fn.__name__} not under _yf_quiet" + + +class TestCoverageAggregation: + def test_partial_miss_is_one_warning_naming_all_missing(self, caplog): + with caplog.at_level(logging.DEBUG): + mmd._log_yf_coverage("closes", ["AAPL", "PCKM", "ANET"], {"AAPL": 1, "ANET": 1}) + warns = [r for r in caplog.records if r.levelno == logging.WARNING] + assert len(warns) == 1 + assert "PCKM" in warns[0].message + assert "1/3" in warns[0].message + assert not [r for r in caplog.records if r.levelno >= logging.ERROR] + + def test_full_coverage_logs_nothing(self, caplog): + with caplog.at_level(logging.DEBUG): + mmd._log_yf_coverage("closes", ["AAPL"], {"AAPL": 1}, error_on_empty=True) + assert not caplog.records + + def test_full_miss_on_load_bearing_artifact_is_single_error(self, caplog): + with caplog.at_level(logging.DEBUG): + mmd._log_yf_coverage("closes", ["AAPL", "ANET"], {}, error_on_empty=True) + errors = [r for r in caplog.records if r.levelno == logging.ERROR] + assert len(errors) == 1 + assert "AAPL" in errors[0].message and "ANET" in errors[0].message + + def test_full_miss_on_best_effort_artifact_stays_warn(self, caplog): + with caplog.at_level(logging.DEBUG): + mmd._log_yf_coverage("earnings", ["AAPL"], {}) + assert not [r for r in caplog.records if r.levelno >= logging.ERROR] + assert [r for r in caplog.records if r.levelno == logging.WARNING]