Implementation of data harvester#6
Merged
Merged
Conversation
Captures the design for the new harvester CLI that proactively warms the local Parquet cache via category-specific strategies (discover-then-fill, walk-back-until-checkpoint, gap-fill via CachedClient, snapshot overwrite) to stay within the FMP starter-plan 20 GB/month bandwidth cap. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Adds four more high-volume categories to the harvester v2 spec: - commodities (eod + intraday off-by-default) - forex (eod + intraday off-by-default) - indexes (eod + intraday off-by-default) - dcf (snapshot overwrite, P4) All three price-history categories reuse FMP's shared historical-price-eod and historical-chart wire endpoints. CachedClient registry will register the same endpoint under multiple (category, method) keys so user-driven calls hit the same parquet location regardless of which category they went through. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Adds seven harvester categories (commodities_eod/intraday, forex_eod/intraday, indexes_eod/intraday, dcf) plus the corresponding symbol universes and multi-category CachedClient registry sharing for the historical-price-eod and historical-chart endpoints. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
31 TDD tasks covering: setup (deps + skeleton), foundation (config, state, budget, catalog, base, manager), CachedClient extensions (PAGE_WALK pattern, SnapshotStore, multi-category registration), all 17 concrete category harvesters (P3 chart/commodities/forex/indexes/news/economics/TI, P1 statements + safety-net sweep, P2 analyst_estimates/insider_trades/form13f, P4 analyst_snapshots/dcf), CLI integration (harvest + harvest-status), end-to-end smoke test, README + example config. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…extvar, FMPBudgetError
Extends TemporalPattern enum with PAGE_WALK, adds page_param/default_page_size/walk_date_field fields to CacheableEndpoint, and adds _read_page_walk method to CachedCategoryProxy that returns stored records sorted by date descending without touching the upstream API.
…form13f Adds three PAGE_WALK registrations to build_default_registry() — analyst.financial_estimates, insider_trades.latest_insider_trades, and form13f.latest_filings — with corresponding tests in TestPageWalkEndpointRegistrations.
Adds TechnicalIndicatorsHarvester (P3) that fans out over symbol universe and configurable indicator list; validates method names at build time.
Implements Task 21: discover-then-fill incremental cycle via earnings_calendar plus a periodic safety-net sweep over the full financial_symbols universe.
Implements Task 22: per-symbol page-walk harvester for analyst financial estimates, with checkpoint-based stop conditions, ParquetStorage writes, and per-symbol error isolation.
Adds DcfHarvester with _INCLUDE_MAP for dcf_valuation and levered_dcf endpoints, writing snapshot overwrites over financial_symbols. Registers the category and wires it into the categories __init__.py.
Install `_on_response_size` on `FmpClient` in `HarvesterManager.start()` so every API response is recorded via `BudgetTracker.record_bytes()` and checked against the hard cap with `BudgetTracker.check_hard_cap()`.
Adds `aiofmp harvest` (--once, --dry-run, --category, --api-key) and `aiofmp harvest-status` subcommands under a new `cli` click group. Refactors aiofmp/cli.py to expose a `cli` group with `mcp-server` as a subcommand while keeping the existing `aiofmp-mcp-server` entry point intact.
Introduce FMPServerError(FMPError) raised on HTTP status >= 500 and refactor _run_cycle_with_retry to handle both FMPRateLimitError and FMPServerError with separate attempt counters using their respective retry policies (on_429 and on_5xx).
Seed the statements_safetynet checkpoint on first incremental run so the 30-day timer starts immediately, making the monthly sweep reachable. Add _run_safety_net_with_row to record safety-net cycles under their own statements_safetynet category_runs row (using current_harvest_category) instead of sharing the statements row, satisfying spec §6 observability.
Add `_stop_event` attribute and `should_stop()` helper to `CategoryHarvester` so subclasses can poll for shutdown between loop iterations. Wire the check into all per-symbol and per-page loops (gap_fill_base, statements, analyst_estimates, analyst_snapshots, dcf, technical_indicators, news, economics) and page-walk loops (insider_trades, form13f); both styles return PARTIAL and — for page walkers — still persist accumulated data before returning. Add unit tests for the new mechanism in test_base.py and test_gap_fill_base.py.
…, doc - Fix D: wrap insider_trades/form13f page-walk in try/finally so pages fetched before a mid-walk exception are always persisted; count fetch_errors and surface RunStatus.PARTIAL accordingly. - Fix E: news run_cycle now walks pages per variant up to a safety cap of 10, stopping when a batch is empty. - Fix F: --config on harvest/harvest-status is now optional; falls back to ./harvester.yaml then ~/.aiofmp/harvester.yaml with a clear error if neither exists. - Fix G: document shutdown_grace_seconds in harvester.example.yaml.
…permanent errors - directory.financial_symbols was calling /financial-symbols-list which 404s; the actual FMP path is /financial-statement-symbol-list. Pre-existing SDK bug surfaced once the harvester started using the financial_symbols universe to drive statements + dcf. - base._make_request had a broad `except Exception` that retried every error with exponential backoff, including permanent 4xx (402 Payment Required, 403, 404) and FMP-side response/parse errors. Removed it so only transient network failures (asyncio.TimeoutError, aiohttp.ClientError) retry; all FMP* exceptions now propagate immediately. The harvester's _run_cycle_with_retry still applies the 429/5xx retry policies on top.
…walls Wire FMPPaywallError handling into the per-item loop of all 9 concrete category harvesters. Each now calls note_paywall() on 402 and breaks the full symbol/page/indicator loop when PAYWALL_THRESHOLD is reached, returning RunStatus.PARTIAL. note_success() resets the counter after each good fetch. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…ly filter Adds a ``plan`` field to harvester.yaml (basic|starter|premium|ultimate) that drives three runtime behaviours so the harvester operates safely within FMP's per-tier limits: 1. Sliding-window rate limiter in FMPBaseClient paces every HTTP request to the plan's calls-per-minute cap (Starter=300, Premium=750, Ultimate=3000). This eliminates the 429 storm that hit when 17 categories ran in parallel against Starter's 300/min cap. 2. New FMPPaywallError raised on HTTP 402. The base client no longer buries 402s under a generic FMPError, and CategoryHarvester counts consecutive paywalls per cycle. After 5 consecutive 402s in a single category (PAYWALL_THRESHOLD), the cycle short-circuits to PARTIAL with one warning line — instead of churning through 26k symbols at one 402 each. 3. SymbolCatalog now accepts an optional symbol_filter. When the plan has us_only_coverage=True (Basic/Starter), the manager installs ``is_us_symbol`` which drops anything containing a ``.`` (foreign exchange suffix) or starting with ``0P`` (Morningstar fund IDs). Statements also strips ``quarter`` from periods when the plan only includes annual fundamentals. Plan capability matrix lives in aiofmp/harvester/plan.py (calls_per_minute, monthly_bandwidth_gb, historical_years, has_quarterly_fundamentals, us_only_coverage). Mirrors FMP's pricing page snapshot. Tests: 39 new tests in test_plan.py covering plan lookup, US-symbol heuristics, sliding-window pacing, FMPPaywallError mapping, and the paywall-threshold counter. Existing statements tests updated to declare ``plan: premium`` so the existing ``period: quarter`` assertions remain valid. Full suite: 850 passing. Example YAML documents the new ``plan`` field and lists the categories that are known-paywalled on Starter so operators can disable them explicitly when desired.
Two follow-ups after running with plan=starter and seeing seven residual warning lines:
- aiofmp/base.py: ``_handle_response`` now treats HTTP 200 with an empty
or whitespace-only body as ``data = None`` (silent no-op) instead of
raising ``FMPError("Failed to parse response")``. FMP returns this
shape for paywalled resources on some endpoints (e.g.
economics.economic_indicators for UNRATE/FEDFUNDS/DFF on Starter)
instead of a clean 402. Without this, every cycle logged three parse
errors that weren't actual bugs.
- aiofmp/harvester/catalog.py: ``SymbolCatalog.symbols()`` now applies
the symbol filter on read, not just on refresh. The previous run
populated the indexes/commodities universes BEFORE the plan-aware
filter existed, so non-US symbols (DX-Y.NYB, etc.) remained in
SQLite for the full 7-day catalog TTL. Filtering on read makes plan
changes take effect immediately without touching the underlying
store; the cache itself remains plan-agnostic.
Combined with the prior plan-aware rate limiter and paywall short-
circuit, a clean Starter run now logs ~4 short-circuit warnings (one
per entirely-paywalled category) instead of thousands of individual
402/429 lines.
Tests: 4 new test_plan.py cases for empty-body handling and read-side
filtering. Full suite: 854 passing.
…probe Probed FMP's Starter plan against the live API (scripts/probe_plan.sh) and replaced coarse plan-tier flags with fine-grained per-endpoint capability sets. The harvester now applies five plan-aware filters at category construction: 1. paywalled_categories — manager auto-skips entirely-paywalled categories at start-up with one info line (Starter: form13f). No more 5-call discovery cost per cycle. 2. quarterly_paywalled_statement_endpoints — statements.py now skips period=quarter PER ENDPOINT instead of dropping it wholesale. Starter still gets quarterly income/balance/cash flow/ growth/owner_earnings/enterprise_values, just not key_metrics/ ratios/segmentations. (Previous code dropped quarter for all 13 endpoints — a big over-correction.) 3. intraday_paywalled_timeframes — chart_intraday drops 1min on Starter; 5min and up still work fine. 4. paywalled_news_variants — news drops press_releases on Starter; general/stock/crypto/forex still work. 5. quarterly_analyst_estimates_paywalled — analyst_estimates drops period=quarter (annual still works). Plus a directly user-visible bug fix: - economics defaults switched from FRED codes (UNRATE/FEDFUNDS/DFF) which FMP rejected with "Invalid name" to FMP's actual indicator names (GDP, realGDP, CPI, inflationRate, consumerSentiment, unemploymentRate, federalFunds, retailSales). All eight return data on Starter. Updated examples/harvester.example.yaml + the matching default harvester.yaml to document the per-plan auto-filtering and use the working indicator names. Categories that previously logged short-circuit warnings (form13f, news.press_releases, etc.) now run silently within plan limits. Test scope changes: every category test's manager fixture now sets `m.config.plan = "premium"` to keep the existing per-period and per-variant assertions valid. plan.py tests assert the probed capability data. Full suite: 855 passing. The probe script (scripts/probe_plan.sh) is checked in so the capability map can be re-verified against future plan changes or re-probed on a different key.
FMP occasionally returns integers larger than 2^53 (e.g. growth metrics
on illiquid symbols, junk market caps). pyarrow.Table.from_pylist infers
``double`` (float64) for any numeric column that also contains a float
or None, and float64 can only represent integers exactly up to 2^53.
pyarrow then raises ``ArrowInvalid: Integer value N is outside the
range exactly representable by a IEEE 754 double precision value`` and
the entire write fails, losing the whole batch.
Fix: ParquetStorage.write() now runs records through
_sanitize_records_for_parquet() first. The sanitizer:
- Scans columns for any int with |x| > 2^53 (excluding bool).
- Stringifies ALL values in those columns across every record, so
pyarrow sees a homogeneous string column.
- Leaves every other column (and records with only safe values)
untouched and unmutated.
Stringification preserves full precision and is reversible
(``int(s)``). Most columns stay numeric — this only affects rare cases
of FMP returning unsafe magnitudes.
Tests: 8 cases covering pass-through, exact-boundary, big positive,
big negative, None handling, bool-not-int, empty input, and an
end-to-end write+read of a record that previously crashed pyarrow.
Full suite: 863 passing.
…eshold, atomic parquet After running the harvester against the real FMP starter API for several minutes, four classes of issues showed up. All four are fixed here. 1. Server-side 429s under sustained 300 RPM (the documented Starter cap). FMP's counting drifts vs. our deterministic sliding window. Lower the per-plan RPM ~17% (starter 300→250, premium 750→625, ultimate 3000→2500) to leave a safety margin. 2. The residual 429s that slip through the margin now retry transparently inside _make_request (FMPRateLimitError handler, 5s/10s/20s backoff, capped by max_retries). Previously a single 429 = one lost symbol per cycle; now most 429s are invisible to callers. 3. PAYWALL_THRESHOLD 5 → 10. Small universes (commodities, ~40 symbols) were short-circuiting when the alphabetical prefix happened to be all paywalled (DXUSD/ESUSD/...), even though later symbols (GCUSD, SIUSD) would work. 10 rides through realistic paywalled prefixes while still bailing out fast when the whole endpoint is paywalled. 4. ParquetStorage now reads 0-byte and otherwise-corrupt parquet files defensively (logs once, removes the bad file, returns []) instead of propagating an ArrowInvalid that fails the whole per-symbol call. Root-cause prevention: writes now go to a sibling ``.tmp`` and atomically rename to the final path, so an interruption mid-flush can't leave a half-written file. Tests: updated test_plan.py to assert the new RPM and threshold values (picked up via CategoryHarvester.PAYWALL_THRESHOLD rather than a literal). 863 passing.
After iter 1, the remaining short-circuit warning in indexes_eod was firing on symbols like ^AVFOCGRW, ^AFLI, ^AEX — non-US indexes that the symbol-only is_us_symbol heuristic couldn't catch (no dot suffix, no 0P prefix). The FMP record for each index carries a currency field that's the authoritative signal. Changes: - is_us_symbol(symbol, payload=None): when payload is provided and has a non-empty, non-USD currency field, drop the symbol immediately. Falls back to the existing symbol-shape heuristic otherwise. Safe to call either way (payload default = None). - StateStore.list_symbol_records(universe): new helper that returns (symbol, payload) tuples by joining symbol + parsed payload_json. SymbolCatalog uses this on read so cached non-US indexes get filtered out without waiting for the next 7-day catalog refresh. - SymbolCatalog._refresh: passes the FMP record dict to the filter so fresh writes also benefit from the richer signal. - SymbolFilter type alias updated to Callable[[str, dict], bool]. Tests: 3 new cases for currency-based filtering, plus the existing symbol-only tests still pass via the default payload=None path. 866 total tests passing.
The harvester correctly identifies entirely-paywalled categories via
PAYWALL_THRESHOLD consecutive 402s. But every cycle re-discovers the
same paywall (10 wasted API calls + 1 warning line per category, per
cycle). For Starter users with 3+ partly-paywalled categories that's
30+ wasted requests every cycle, indefinitely.
Add in-memory paywall memory:
- CategoryHarvester._paywalled_at: timestamp set when note_paywall()
trips the threshold; cleared by note_success() (proves reachability).
- _paywall_skip_remaining_seconds(): how long until the next re-probe
is due. Defaults to a 24h window via PAYWALL_REPROBE_SECONDS.
- _run_once_and_record() checks this BEFORE setting up the cycle. When
the window is active, logs one info line ("paywalled; re-probe in ~N
hours") and writes a PAUSED_FOR_BUDGET row to the state store so the
history reflects the skip.
Result: first cycle after process start probes each category (one
short-circuit warning per genuinely-paywalled category, ~10 requests
each). Subsequent cycles silently skip until the 24h re-probe window
elapses. If the user upgrades their plan, restart the process to
re-probe immediately.
5 new tests cover the marker lifecycle (set on threshold trip, cleared
on success, expires after the configured window, not set below
threshold). 870 total tests passing.
Adds two behavioral tests: - test_paywall_memory_skips_subsequent_cycle: forges a fresh paywall marker and confirms _run_once_and_record skips run_cycle() entirely and records a PAUSED_FOR_BUDGET row with the 'paywalled' error. - test_paywall_memory_expires_after_window: forges a marker older than PAYWALL_REPROBE_SECONDS and confirms the cycle runs normally (the re-probe window has elapsed). These cover the integration between note_paywall() → _paywalled_at → _run_once_and_record's pre-cycle skip check, which the iter-4 live run can't observe directly (24h re-probe interval). 872 tests passing.
CI on PR #6 failed at the lint step. Eight ruff errors: - F821 in aiofmp/harvester/plan.py: typing.Any used in dict[str, Any] annotation but never imported. Critical — annotation strings under ``from __future__ import annotations`` parse lazily so it didn't crash locally, but ruff catches it strictly. Added the import. - F841 in aiofmp/base.py: ``except FMPRateLimitError as e:`` where ``e`` was unused (we re-raise the current exception). Dropped ``as e``. - F541 in aiofmp/base.py: f-string with no placeholders for the 402 message. Dropped the ``f`` prefix. - F401 in tests/test_harvester/test_plan.py: ``PLAN_LIMITS`` imported but not referenced. Removed. - I001 x4: import-block ordering in three test files. Ruff auto-sorted. Plus ``ruff format`` reformatted 24 files (test fixtures and category modules touched in recent commits). Pure whitespace/style — no behaviour change. After fixes: uv run ruff check . -> All checks passed! uv run ruff format --check . -> 147 files already formatted uv run pytest -q -> 872 passed
…ts, 5xx retry Three new pyarrow / FMP-side failure modes surfaced in the latest live harvester run. All three are fixed here. 1. ``Expected bytes, got a 'int'/'float' object`` (chart_eod for ARHVF, BINI, CBDL, NICH). The original sanitizer (commit 94da395) only stringifies columns when the **current** batch contains an int |x| > 2^53. On **append**, a previous batch may have already stringified the column on disk; the merged record list then has string values from storage plus normal-sized ints from the new API response, and pyarrow infers the column as string from the first non-null value, then chokes on the int. Extended _sanitize_records_for_parquet to also flag columns where the same column has BOTH str and (int or float) values across rows — those get stringified for type consistency. 2. ``Cannot write struct type 'data' with no child field to Parquet`` (statements.revenue_geographic_segmentation for COSM). FMP returns ``{"data": {}}`` for symbols with no breakdown. pyarrow infers a struct type from the dict but has zero fields to write. Sanitizer now also detects columns where any value is an empty dict and replaces those empties with None. Populated dicts in the same column keep their struct shape; pyarrow infers from them. 3. ``Server error: 502`` (dcf.levered_dcf for SPFI). FMPServerError was already raised by _handle_response for 5xx, but only the harvester's per-cycle retry handled it — a single 5xx on one symbol was lost data. Mirroring the 429 transparent retry pattern, _make_request now retries FMPServerError with shorter backoff (2s/4s/8s — gateway errors clear faster than rate limits). Tests: 9 new cases covering mixed str+int / str+float columns, empty dict nullification (both per-row and all-rows), populated-dict pass- through, end-to-end write+read of both failure modes, and a 5xx-then- ok retry. 881 total tests passing; ruff lint+format clean.
codemug
added a commit
that referenced
this pull request
May 24, 2026
CI on PR #6 failed at the lint step. Eight ruff errors: - F821 in aiofmp/harvester/plan.py: typing.Any used in dict[str, Any] annotation but never imported. Critical — annotation strings under ``from __future__ import annotations`` parse lazily so it didn't crash locally, but ruff catches it strictly. Added the import. - F841 in aiofmp/base.py: ``except FMPRateLimitError as e:`` where ``e`` was unused (we re-raise the current exception). Dropped ``as e``. - F541 in aiofmp/base.py: f-string with no placeholders for the 402 message. Dropped the ``f`` prefix. - F401 in tests/test_harvester/test_plan.py: ``PLAN_LIMITS`` imported but not referenced. Removed. - I001 x4: import-block ordering in three test files. Ruff auto-sorted. Plus ``ruff format`` reformatted 24 files (test fixtures and category modules touched in recent commits). Pure whitespace/style — no behaviour change. After fixes: uv run ruff check . -> All checks passed! uv run ruff format --check . -> 147 files already formatted uv run pytest -q -> 872 passed
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
No description provided.