Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 34 additions & 1 deletion collectors/daily_closes.py
Original file line number Diff line number Diff line change
Expand Up @@ -1103,6 +1103,38 @@ def _fetch_polygon_closes_per_ticker(
return recovered


_SPLIT_RATIO_MAX = 50 # largest forward/reverse split factor worth hinting (covers 20:1 NVDA-class splits)
_SPLIT_RATIO_TOL = 0.005 # adjusted closes restate by the EXACT factor; 0.5% absorbs feed rounding


def _split_ratio_hint(prior: float, new: float) -> str:
"""Corporate-action hint when prior/new sits on a clean split ratio, else "".

A forward N-for-1 split divides the adjusted close by exactly N (a reverse
1-for-N multiplies by N), so a cross-source overwrite whose ratio is within
``_SPLIT_RATIO_TOL`` of an integer 2..``_SPLIT_RATIO_MAX`` is far more likely
a split restatement than a code bug. Surfacing the ratio in the ERROR message
hands the strongest evidence to whoever diagnoses it — the KLAC 10:1 split
(2026-06-10) was auto-diagnosed as a producer decimal-shift bug because the
message only said "90.00% diff" (data#417-419, config#1030).
"""
if prior <= 0 or new <= 0:
return ""
for big, small, template in (
(prior, new, "%d-for-1 forward stock split"),
(new, prior, "1-for-%d reverse stock split"),
):
ratio = big / small
n = round(ratio)
if 2 <= n <= _SPLIT_RATIO_MAX and abs(ratio - n) / n <= _SPLIT_RATIO_TOL:
return (
" [ratio = %d:1 — consistent with a %s restating adjusted history "
"(corporate action), check the split calendar before suspecting a code bug]"
% (n, template % n)
)
return ""


def _log_close_discrepancies(
new_df: pd.DataFrame,
prior_close: dict[str, float],
Expand Down Expand Up @@ -1151,9 +1183,10 @@ def _log_close_discrepancies(
n_restatement += 1
elif pct_diff > _DISCREPANCY_ERROR_PCT:
logger.error(
"polygon_only OVERWRITE %s @ %s: Close %.4f → %.4f (%.2f%% diff vs prior parquet) — "
"polygon_only OVERWRITE %s @ %s: Close %.4f → %.4f (%.2f%% diff vs prior parquet)%s — "
"investigate before downstream consumers re-read",
ticker, run_date, prior, float(new_close), pct_diff * 100,
_split_ratio_hint(prior, float(new_close)),
)
n_error += 1
elif pct_diff > _DISCREPANCY_WARN_PCT:
Expand Down
80 changes: 80 additions & 0 deletions collectors/metron_market_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
from __future__ import annotations

import argparse
import contextlib
import functools
import json
import logging
import time
Expand Down Expand Up @@ -155,6 +157,70 @@ def load_metron_universe(bucket: str, s3_client: Any) -> tuple[list[dict], list[
# ── yfinance fetchers (default sources) ─────────────────────────────────────


@contextlib.contextmanager
def _quiet_yfinance():
"""Demote yfinance's internal logger to CRITICAL for the duration of a fetch.

yfinance logs its own ERROR record per failing symbol — ≥5 distinctly-worded
messages for ONE unpriceable ticker (quoteSummary 404, "possibly delisted"
in two forms, "1 Failed download:", per-period repeats). Flow Doctor keys
dedup on message text, so each line became its own report/email — the
2026-06-12 PCKM storm (config#1029). Failure recording is NOT lost: each
fetcher aggregates per-symbol coverage into one record per run via
``_log_yf_coverage`` below, which is the named recording surface.
"""
yf_logger = logging.getLogger("yfinance")
prior_level = yf_logger.level
yf_logger.setLevel(logging.CRITICAL)
try:
yield
finally:
yf_logger.setLevel(prior_level)


def _yf_quiet(fn):
"""Run a yfinance fetcher under ``_quiet_yfinance`` (see rationale there)."""
@functools.wraps(fn)
def wrapper(*args, **kwargs):
with _quiet_yfinance():
return fn(*args, **kwargs)
return wrapper


def _log_yf_coverage(
kind: str,
requested: list[str],
covered: dict | set,
*,
error_on_empty: bool = False,
) -> None:
"""One aggregated coverage record per artifact per run (config#1029).

Replaces yfinance's per-symbol ERROR spray (demoted by ``_quiet_yfinance``):
symbols with no data are reported as a SINGLE WARN naming them all. A full
miss on a non-empty request escalates to ERROR only where the caller says
the artifact is load-bearing (``error_on_empty`` — closes), so a Yahoo
outage surfaces once, loudly, instead of once per artifact per symbol.
"""
missing = sorted(set(requested) - set(covered))
if not missing:
return
if error_on_empty and not covered:
logger.error(
"[metron_market_data] %s: yfinance returned NO data for any of %d requested "
"symbols (%s) — full-miss on a load-bearing artifact (provider outage?)",
kind, len(missing), ", ".join(missing),
)
return
logger.warning(
"[metron_market_data] %s: no yfinance data for %d/%d symbols: %s — "
"non-listed instruments (e.g. 401(k) CITs) are broker-snapshot-priced "
"in Metron and belong out of the published universe (config#1029)",
kind, len(missing), len(requested), ", ".join(missing),
)


@_yf_quiet
def _yfinance_closes(yf_symbols: list[str]) -> dict[str, tuple[float, str]]:
"""Latest daily close per yf_symbol via yfinance → ``{yf_symbol: (close, bar_date)}``.
Foreign listings (``.HK``/``.PA``/…) resolve natively. Unpriceable symbols omitted."""
Expand Down Expand Up @@ -192,9 +258,11 @@ def _yfinance_closes(yf_symbols: list[str]) -> dict[str, tuple[float, str]]:
except Exception as e:
logger.warning("[metron_market_data] yfinance close batch failed: %s", e)
logger.info("[metron_market_data] closes: %d/%d symbols priced", len(out), len(yf_symbols))
_log_yf_coverage("closes", yf_symbols, out, error_on_empty=True)
return out


@_yf_quiet
def _yfinance_fx(currencies: list[str], base: str = BASE_CURRENCY) -> dict[str, float]:
"""Latest FX rate per currency via yfinance ``{CCY}{BASE}=X`` → ``{CCY: rate}``
(``base`` per 1 unit of ``CCY``). Unresolvable pairs omitted — no fabrication."""
Expand Down Expand Up @@ -230,9 +298,11 @@ def _yfinance_fx(currencies: list[str], base: str = BASE_CURRENCY) -> dict[str,
except Exception as e:
logger.warning("[metron_market_data] yfinance FX batch failed: %s", e)
logger.info("[metron_market_data] fx: %d/%d currencies resolved", len(out), len(pairs))
_log_yf_coverage("fx", list(pairs.values()), out)
return out


@_yf_quiet
def _yf_history(symbols: list[str], period: str, *, is_fx: bool = False, base: str = BASE_CURRENCY) -> dict[str, list[tuple[str, float]]]:
"""Daily close series per symbol via yfinance over ``period`` →
``{key: [(bar_date, close), …]}`` ascending. ``is_fx`` maps a currency to the
Expand Down Expand Up @@ -270,6 +340,7 @@ def _yf_history(symbols: list[str], period: str, *, is_fx: bool = False, base: s
except Exception as e:
logger.warning("[metron_market_data] yfinance history batch failed: %s", e)
logger.info("[metron_market_data] history: %d/%d series captured", len(out), len(targets))
_log_yf_coverage("fx_history" if is_fx else "close_history", list(targets.values()), out)
return out


Expand All @@ -281,6 +352,7 @@ def _yfinance_fx_history(currencies: list[str], period: str = DEFAULT_HISTORY_PE
return _yf_history(currencies, period, is_fx=True)


@_yf_quiet
def _yfinance_sectors(yf_symbols: list[str]) -> dict[str, str]:
"""Canonical GICS sector per held symbol via ``yf.Ticker(sym).info['sector']``.
Fail-soft: an unclassifiable symbol is omitted (Metron shows a coverage gap)."""
Expand All @@ -297,9 +369,11 @@ def _yfinance_sectors(yf_symbols: list[str]) -> dict[str, str]:
except Exception as e:
logger.warning("[metron_market_data] sector fetch failed for %s: %s", sym, e)
logger.info("[metron_market_data] sectors: %d/%d classified", len(out), len(yf_symbols))
_log_yf_coverage("sectors", yf_symbols, out)
return out


@_yf_quiet
def _yfinance_spy_weights() -> dict[str, float]:
"""SPY's live GICS sector weights (canonical label → fraction) via
``funds_data.sector_weightings`` (snake_case → canonical). ``{}`` on failure."""
Expand All @@ -315,6 +389,7 @@ def _yfinance_spy_weights() -> dict[str, float]:
return {_FUNDS_SECTOR_KEY[k]: float(v) for k, v in raw.items() if k in _FUNDS_SECTOR_KEY}


@_yf_quiet
def _yfinance_earnings(yf_symbols: list[str]) -> dict[str, str]:
"""Next (earliest upcoming) earnings date per held symbol via yfinance →
``{yf_symbol: date_iso}``. Fail-soft: no resolvable date → omitted."""
Expand All @@ -337,9 +412,11 @@ def _yfinance_earnings(yf_symbols: list[str]) -> dict[str, str]:
except Exception as e:
logger.warning("[metron_market_data] earnings fetch failed for %s: %s", sym, e)
logger.info("[metron_market_data] earnings: %d/%d dated", len(out), len(yf_symbols))
_log_yf_coverage("earnings", yf_symbols, out)
return out


@_yf_quiet
def _yfinance_fundamentals(yf_symbols: list[str]) -> dict[str, dict]:
"""Tearsheet fundamentals per held symbol via ``yf.Ticker(sym).info`` →
``{yf_symbol: {info_key: value}}`` over ``FUNDAMENTALS_INFO_KEYS``.
Expand All @@ -363,9 +440,11 @@ def _yfinance_fundamentals(yf_symbols: list[str]) -> dict[str, dict]:
if fields:
out[sym] = fields
logger.info("[metron_market_data] fundamentals: %d/%d symbols covered", len(out), len(yf_symbols))
_log_yf_coverage("fundamentals", yf_symbols, out)
return out


@_yf_quiet
def _yfinance_intraday(yf_symbols: list[str]) -> dict[str, dict]:
"""Latest (~15-min delayed) quote + session context per symbol, one batched
2-day daily-bar download → ``{yf_symbol: quote}`` where quote =
Expand Down Expand Up @@ -414,6 +493,7 @@ def _yfinance_intraday(yf_symbols: list[str]) -> dict[str, dict]:
except Exception as e:
logger.warning("[metron_market_data] yfinance intraday batch failed: %s", e)
logger.info("[metron_market_data] intraday: %d/%d symbols quoted", len(out), len(yf_symbols))
_log_yf_coverage("intraday", yf_symbols, out)
return out


Expand Down
65 changes: 65 additions & 0 deletions tests/test_daily_closes_split_hint.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
"""Split-ratio hint on polygon_only OVERWRITE ERRORs (config#1030).

KLAC's 10-for-1 split (effective 2026-06-10) restated three windowed dates by
exactly ÷10; the ERROR messages said only "90.00% diff" and the LLM
auto-diagnosis blamed a producer decimal-shift bug (data#417-419). The hint
puts the strongest evidence — the clean integer ratio — in the message itself.
"""

from __future__ import annotations

import logging

import pandas as pd

from collectors import daily_closes


class TestSplitRatioHint:
def test_klac_forward_split_ratio_detected(self):
hint = daily_closes._split_ratio_hint(2139.37, 213.937)
assert "10:1" in hint
assert "10-for-1 forward stock split" in hint

def test_reverse_split_ratio_detected(self):
hint = daily_closes._split_ratio_hint(2.5, 25.0)
assert "10:1" in hint
assert "1-for-10 reverse stock split" in hint

def test_plain_drift_yields_no_hint(self):
# 7% cross-source drift — over the ERROR band but nowhere near a clean ratio.
assert daily_closes._split_ratio_hint(100.0, 93.0) == ""

def test_ratio_outside_tolerance_yields_no_hint(self):
# ÷9.8 is 2% off 10:1 — a genuine anomaly must not be masked as a split.
assert daily_closes._split_ratio_hint(980.0, 100.0) == ""

def test_degenerate_inputs_yield_no_hint(self):
assert daily_closes._split_ratio_hint(0.0, 100.0) == ""
assert daily_closes._split_ratio_hint(100.0, -1.0) == ""

def test_two_for_one_boundary_detected(self):
assert "2:1" in daily_closes._split_ratio_hint(100.0, 50.0)

def test_unity_ratio_never_hints(self):
# 1:1 (no diff) must not match the N>=2 floor.
assert daily_closes._split_ratio_hint(100.0, 100.0) == ""


class TestOverwriteErrorCarriesHint:
def test_error_record_includes_split_hint(self, caplog):
new_df = pd.DataFrame({"Close": [213.937]}, index=["KLAC"])
with caplog.at_level(logging.DEBUG):
daily_closes._log_close_discrepancies(new_df, {"KLAC": 2139.37}, "2026-06-09")
errors = [r for r in caplog.records if r.levelno == logging.ERROR]
assert len(errors) == 1
assert "polygon_only OVERWRITE KLAC" in errors[0].message
assert "10:1" in errors[0].message

def test_non_split_error_record_has_no_hint(self, caplog):
new_df = pd.DataFrame({"Close": [93.0]}, index=["AAPL"])
with caplog.at_level(logging.DEBUG):
daily_closes._log_close_discrepancies(new_df, {"AAPL": 100.0}, "2026-06-09")
errors = [r for r in caplog.records if r.levelno == logging.ERROR]
assert len(errors) == 1
assert "ratio" not in errors[0].message
82 changes: 82 additions & 0 deletions tests/test_metron_yf_noise_aggregation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
"""yfinance log-noise suppression + per-run coverage aggregation (config#1029).

The 2026-06-12 PCKM storm: one unpriceable holding (a 401(k) CIT yfinance can
never price) produced ≥5 distinctly-worded yfinance ERROR log records per EOD
run, each becoming its own Flow Doctor report/email. The fix demotes yfinance's
internal logger for the duration of each fetch and aggregates per-symbol
coverage into ONE record per artifact per run — the named recording surface.
"""

from __future__ import annotations

import logging

import pytest

from collectors import metron_market_data as mmd


class TestQuietYfinance:
def test_demotes_yfinance_logger_inside_and_restores_after(self):
yf_logger = logging.getLogger("yfinance")
yf_logger.setLevel(logging.DEBUG)
try:
with mmd._quiet_yfinance():
assert yf_logger.level == logging.CRITICAL
# The PCKM failure mode: yfinance ERROR records must not pass
# the logger's own level while a fetch is in flight.
assert not yf_logger.isEnabledFor(logging.ERROR)
assert yf_logger.level == logging.DEBUG
finally:
yf_logger.setLevel(logging.NOTSET)

def test_restores_level_even_when_fetch_raises(self):
yf_logger = logging.getLogger("yfinance")
yf_logger.setLevel(logging.INFO)
try:
with pytest.raises(RuntimeError):
with mmd._quiet_yfinance():
raise RuntimeError("batch failed")
assert yf_logger.level == logging.INFO
finally:
yf_logger.setLevel(logging.NOTSET)

def test_all_yfinance_fetchers_are_wrapped(self):
# The decorator is the chokepoint: every default yfinance source must
# run quieted, or one bad symbol storms Flow Doctor again.
for fn in (
mmd._yfinance_closes, mmd._yfinance_fx, mmd._yf_history,
mmd._yfinance_sectors, mmd._yfinance_spy_weights,
mmd._yfinance_earnings, mmd._yfinance_fundamentals,
mmd._yfinance_intraday,
):
assert hasattr(fn, "__wrapped__"), f"{fn.__name__} not under _yf_quiet"


class TestCoverageAggregation:
def test_partial_miss_is_one_warning_naming_all_missing(self, caplog):
with caplog.at_level(logging.DEBUG):
mmd._log_yf_coverage("closes", ["AAPL", "PCKM", "ANET"], {"AAPL": 1, "ANET": 1})
warns = [r for r in caplog.records if r.levelno == logging.WARNING]
assert len(warns) == 1
assert "PCKM" in warns[0].message
assert "1/3" in warns[0].message
assert not [r for r in caplog.records if r.levelno >= logging.ERROR]

def test_full_coverage_logs_nothing(self, caplog):
with caplog.at_level(logging.DEBUG):
mmd._log_yf_coverage("closes", ["AAPL"], {"AAPL": 1}, error_on_empty=True)
assert not caplog.records

def test_full_miss_on_load_bearing_artifact_is_single_error(self, caplog):
with caplog.at_level(logging.DEBUG):
mmd._log_yf_coverage("closes", ["AAPL", "ANET"], {}, error_on_empty=True)
errors = [r for r in caplog.records if r.levelno == logging.ERROR]
assert len(errors) == 1
assert "AAPL" in errors[0].message and "ANET" in errors[0].message

def test_full_miss_on_best_effort_artifact_stays_warn(self, caplog):
with caplog.at_level(logging.DEBUG):
mmd._log_yf_coverage("earnings", ["AAPL"], {})
assert not [r for r in caplog.records if r.levelno >= logging.ERROR]
assert [r for r in caplog.records if r.levelno == logging.WARNING]
Loading