diff --git a/sqlmesh/core/context.py b/sqlmesh/core/context.py index dc51aad2a7..0c49b557b7 100644 --- a/sqlmesh/core/context.py +++ b/sqlmesh/core/context.py @@ -1666,6 +1666,7 @@ def plan_builder( # This ensures that no models outside the impacted sub-DAG(s) will be backfilled unexpectedly. backfill_models = modified_model_names or None + plan_execution_time = execution_time or now() max_interval_end_per_model = None default_start, default_end = None, None if not run: @@ -1680,17 +1681,21 @@ def plan_builder( max_interval_end_per_model, backfill_models, modified_model_names, - execution_time or now(), + plan_execution_time, ) # Refresh snapshot intervals to ensure that they are up to date with values reflected in the max_interval_end_per_model. self.state_sync.refresh_snapshot_intervals(context_diff.snapshots.values()) + max_interval_end_per_model = self._filter_stale_end_overrides( + max_interval_end_per_model, + context_diff.snapshots_by_name, + ) start_override_per_model = self._calculate_start_override_per_model( min_intervals, start or default_start, end or default_end, - execution_time or now(), + plan_execution_time, backfill_models, snapshots, max_interval_end_per_model, @@ -3181,6 +3186,20 @@ def _get_max_interval_end_per_model( ).items() } + @staticmethod + def _filter_stale_end_overrides( + max_interval_end_per_model: t.Dict[str, datetime], + snapshots_by_name: t.Dict[str, Snapshot], + ) -> t.Dict[str, datetime]: + # Drop stale interval ends for snapshots whose new versions have no intervals yet. Otherwise the old + # prod end is reused as an end_override, causing missing_intervals() to skip the new snapshot entirely + # when the requested start is newer than that stale end. + return { + model_fqn: end + for model_fqn, end in max_interval_end_per_model.items() + if model_fqn not in snapshots_by_name or snapshots_by_name[model_fqn].intervals + } + @staticmethod def _get_models_for_interval_end( snapshots: t.Dict[str, Snapshot], backfill_models: t.Set[str] diff --git a/tests/core/test_context.py b/tests/core/test_context.py index c3d88e205e..f4dbde2f25 100644 --- a/tests/core/test_context.py +++ b/tests/core/test_context.py @@ -38,7 +38,7 @@ from sqlmesh.core.environment import Environment, EnvironmentNamingInfo, EnvironmentStatements from sqlmesh.core.plan.definition import Plan from sqlmesh.core.macros import MacroEvaluator, RuntimeStage -from sqlmesh.core.model import load_sql_based_model, model, SqlModel, Model +from sqlmesh.core.model import SeedModel, load_sql_based_model, model, SqlModel, Model from sqlmesh.core.model.common import ParsableSql from sqlmesh.core.model.cache import OptimizedQueryCache from sqlmesh.core.renderer import render_statements @@ -1223,6 +1223,84 @@ def test_plan_seed_model_excluded_from_default_end(copy_to_temp_path: t.Callable context.close() +@pytest.mark.slow +def test_seed_model_pr_plan_filters_stale_end_override( + copy_to_temp_path: t.Callable, mocker: MockerFixture +): + path = copy_to_temp_path("examples/sushi") + + with time_machine.travel("2024-06-01 00:00:00 UTC"): + context = Context(paths=path, gateway="duckdb_persistent") + context.plan("prod", no_prompts=True, auto_apply=True) + context.close() + + with time_machine.travel("2026-04-13 00:00:00 UTC"): + context = Context(paths=path, gateway="duckdb_persistent") + + model = t.cast(SeedModel, context.get_model("sushi.waiter_names").copy()) + model.seed.content += "10,Trey\n" + context.upsert_model(model) + context.upsert_model( + load_sql_based_model( + parse( + """ + MODEL ( + name sushi.waiter_rollup, + kind FULL, + cron '@daily' + ); + + SELECT waiter_id, waiter_name, event_date + FROM sushi.waiter_as_customer_by_day + """ + ), + default_catalog=context.default_catalog, + ) + ) + + waiter_as_customer_by_day = context.get_snapshot( + "sushi.waiter_as_customer_by_day", raise_if_missing=True + ) + orders = context.get_snapshot("sushi.orders", raise_if_missing=True) + original_get_max_interval_end_per_model = context._get_max_interval_end_per_model + + def _mocked_max_interval_end_per_model( + snapshots: t.Dict[str, t.Any], backfill_models: t.Optional[t.Set[str]] + ) -> t.Dict[str, datetime]: + result = original_get_max_interval_end_per_model(snapshots, backfill_models) + # Keep the overall plan end recent via another affected model while making the old prod end for + # waiter_as_customer_by_day older than the PR start. Without filtering, that stale end_override + # causes the new waiter_as_customer_by_day snapshot to be skipped and waiter_rollup fails when it + # references the missing physical table. + result[waiter_as_customer_by_day.name] = to_datetime("2026-01-01") + result[orders.name] = to_datetime("2026-04-13") + return result + + mocker.patch.object( + context, + "_get_max_interval_end_per_model", + side_effect=_mocked_max_interval_end_per_model, + ) + + plan = context.plan("dev", start="2 months ago", no_prompts=True) + missing_interval_names = {si.snapshot_id.name for si in plan.missing_intervals} + + assert plan.user_provided_flags == {"start": "2 months ago"} + assert to_timestamp(plan.start) == to_timestamp("2026-02-13") + assert to_timestamp(plan.end) == to_timestamp("2026-04-13") + assert any("waiter_as_customer_by_day" in name for name in missing_interval_names) + assert any("waiter_rollup" in name for name in missing_interval_names) + + context.apply(plan) + + environment = context.state_sync.get_environment("dev") + assert environment is not None + promoted_snapshot_names = {snapshot.name for snapshot in environment.promoted_snapshots} + assert any("waiter_as_customer_by_day" in name for name in promoted_snapshot_names) + assert any("waiter_rollup" in name for name in promoted_snapshot_names) + context.close() + + @pytest.mark.slow def test_schema_error_no_default(sushi_context_pre_scheduling) -> None: context = sushi_context_pre_scheduling