diff --git a/internal/controller/reporting.go b/internal/controller/reporting.go index 7815989..367ee85 100644 --- a/internal/controller/reporting.go +++ b/internal/controller/reporting.go @@ -163,6 +163,11 @@ func (c *Controller) processSyncEvents(ctx context.Context, syncClusterPods []an ) } + // Log per-deployment at INFO now that the job is queued, matching the + // individual "Posted record" path so startup sync has equivalent + // per-deployment visibility. Records rejected at submission are excluded. + logSyncedRecords(jobResp, syncRecords) + // Wait for job completion with a timeout to prevent indefinite startup delay. jobCtx, cancel := context.WithTimeout(ctx, 5*time.Minute) defer cancel() @@ -197,9 +202,20 @@ func (c *Controller) processSyncEvents(ctx context.Context, syncClusterPods []an return nil } +// syncCandidate is a single container observed on a pod that is eligible to +// become the deployment record for a given deployment_name during startup sync. +type syncCandidate struct { + pod *corev1.Pod + container corev1.Container + wlName string +} + func (c *Controller) makeSyncRecords(ctx context.Context, syncClusterPods []any) []*deploymentrecord.Record { - seenSyncRecords := make(map[string]bool) - var syncRecords []*deploymentrecord.Record + // The API requires a unique set of deployment_names for a snapshot job. + // During a rollout the same deployment_name can appear with multiple digests + // (old and new pods running at once), so we must collapse to a single digest per workload and + // container name. We pick the newest running pod so the snapshot reflects the rollout target. + winners := make(map[string]syncCandidate) for _, p := range syncClusterPods { pod, ok := p.(*corev1.Pod) if !ok { @@ -227,34 +243,75 @@ func (c *Controller) makeSyncRecords(ctx context.Context, syncClusterPods []any) allContainers = append(allContainers, pod.Spec.Containers...) allContainers = append(allContainers, pod.Spec.InitContainers...) - // Filter out containers already in syncRecords - var newContainers []corev1.Container + // Dedupe to one record per deployment_name. On a digest conflict + // (rollout in progress) the preferred pod wins (running over terminal; otherwise newest). for _, container := range allContainers { dn := getARDeploymentName(pod, container, c.cfg.Template, wl.Name) digest := getContainerDigest(pod, container.Name) if dn == "" || digest == "" { continue } - key := getCacheKey(EventCreated, dn, digest) - if seenSyncRecords[key] { + + existing, exists := winners[dn] + if !exists { + winners[dn] = syncCandidate{pod: pod, container: container, wlName: wl.Name} continue } - seenSyncRecords[key] = true - newContainers = append(newContainers, container) - } - if len(newContainers) == 0 { - continue + // Same deployment_name with the same digest is just another replica + // of the same version. The submitted record (deployment_name, digest, + // image) is identical regardless of which pod we pick, so keep the + // one we already have. + existingDigest := getContainerDigest(existing.pod, existing.container.Name) + if existingDigest == digest { + continue + } + + // Collision with a different digest (e.g. rollout in progress). + // Keep the newest running pod's digest and log the decision so the + // rollout is visible. + if isPreferredSyncPod(pod, existing.pod) { + slog.Info("Multiple digests observed for deployment_name during sync, choosing preferred pod", + "deployment_name", dn, + "chosen_pod", pod.Name, + "chosen_digest", digest, + "replaced_pod", existing.pod.Name, + "replaced_digest", existingDigest, + ) + winners[dn] = syncCandidate{pod: pod, container: container, wlName: wl.Name} + } else { + slog.Info("Multiple digests observed for deployment_name during sync, keeping preferred pod", + "deployment_name", dn, + "kept_pod", existing.pod.Name, + "kept_digest", existingDigest, + "skipped_pod", pod.Name, + "skipped_digest", digest, + ) + } } + } - // Only gather aggregate metadata if there are new containers - aggPodMetadata := c.metadataAggregator.BuildAggregatePodMetadata(ctx, podToPartialMetadata(pod)) + // Build records from the winning candidates. Iterate in sorted + // deployment_name order for deterministic output, and cache aggregate + // metadata per pod so pods hosting multiple winners aren't recomputed. + aggCache := make(map[*corev1.Pod]*metadata.AggregatePodMetadata) + dns := make([]string, 0, len(winners)) + for dn := range winners { + dns = append(dns, dn) + } + slices.Sort(dns) - for _, container := range newContainers { - record := c.buildRecord(pod, container, EventCreated, wl.Name, aggPodMetadata) - if record != nil { - syncRecords = append(syncRecords, record) - } + syncRecords := make([]*deploymentrecord.Record, 0, len(winners)) + for _, dn := range dns { + cand := winners[dn] + agg, ok := aggCache[cand.pod] + if !ok { + agg = c.metadataAggregator.BuildAggregatePodMetadata(ctx, podToPartialMetadata(cand.pod)) + aggCache[cand.pod] = agg + } + record := c.buildRecord(cand.pod, cand.container, EventCreated, cand.wlName, agg) + if record != nil { + syncRecords = append(syncRecords, record) } } @@ -264,6 +321,45 @@ func (c *Controller) makeSyncRecords(ctx context.Context, syncClusterPods []any) return syncRecords } +// isPreferredSyncPod reports whether candidate should replace current as the +// chosen pod for a deployment_name during startup sync. Running pods are +// preferred over terminal pods; among pods in the same category the most +// recently created pod wins. This makes the startup snapshot reflect the +// rollout target (newest running digest) when multiple digests coexist. +func isPreferredSyncPod(candidate, current *corev1.Pod) bool { + candRunning := candidate.Status.Phase == corev1.PodRunning + curRunning := current.Status.Phase == corev1.PodRunning + if candRunning != curRunning { + return candRunning + } + return candidate.CreationTimestamp.After(current.CreationTimestamp.Time) +} + +// logSyncedRecords emits one log line per accepted record after a cluster job +// has been queued. +func logSyncedRecords(jobResp *deploymentrecord.JobResponse, records []*deploymentrecord.Record) { + rejected := make(map[string]bool, len(jobResp.Errors)) + for _, e := range jobResp.Errors { + rejected[e.Name] = true + } + + for _, r := range records { + if rejected[r.Name] { + continue + } + slog.Info("Submitted record for sync", + "event_type", EventCreated, + "name", r.Name, + "deployment_name", r.DeploymentName, + "status", r.Status, + "digest", r.Digest, + "runtime_risks", r.RuntimeRisks, + "tags", r.Tags, + "job_id", jobResp.JobID, + ) + } +} + // fillCachesFromSubmitted populates the observedDeployments cache from the // records we submitted, without waiting for a response. Used when we can't // get a response (409 conflict, wait timeout, job failure). diff --git a/internal/controller/reporting_test.go b/internal/controller/reporting_test.go index 377941c..31dcdf8 100644 --- a/internal/controller/reporting_test.go +++ b/internal/controller/reporting_test.go @@ -252,6 +252,186 @@ func TestProcessSyncEvents_DedupeContainers(t *testing.T) { assert.Equal(t, 1, poster.clusterRecordCount, "CreateClusterJob should receive only 1 record") } +func TestMakeSyncRecords_RolloutPicksNewestRunningDigest(t *testing.T) { + t.Parallel() + oldDigest := "sha256:old" + newDigest := "sha256:new" + now := time.Now() + + // Same deployment_name (default/test-deploy/app) with two digests, as + // happens mid-rollout when old and new pods are both Running. + oldPod := makeTestPod("app", "test-deploy-old", oldDigest, "ReplicaSet") + oldPod.Name = "pod-old" + oldPod.CreationTimestamp = metav1.NewTime(now.Add(-10 * time.Minute)) + + newPod := makeTestPod("app", "test-deploy-new", newDigest, "ReplicaSet") + newPod.Name = "pod-new" + newPod.CreationTimestamp = metav1.NewTime(now) + + ctrl := newTestController(&mockPoster{}) + ctrl.workloadResolver = &mockResolver{name: "test-deploy"} + + // Feed old first so we exercise replacement, not just first-seen. + records := ctrl.makeSyncRecords(context.Background(), []any{oldPod, newPod}) + require.Len(t, records, 1, "rollout should collapse to one record per deployment_name") + assert.Equal(t, newDigest, records[0].Digest, "newest running pod digest should win") + assert.Equal(t, "default/test-deploy/app", records[0].DeploymentName) +} + +func TestMakeSyncRecords_NewestWinsRegardlessOfOrder(t *testing.T) { + t.Parallel() + oldDigest := "sha256:old" + newDigest := "sha256:new" + now := time.Now() + + oldPod := makeTestPod("app", "test-deploy-old", oldDigest, "ReplicaSet") + oldPod.Name = "pod-old" + oldPod.CreationTimestamp = metav1.NewTime(now.Add(-10 * time.Minute)) + + newPod := makeTestPod("app", "test-deploy-new", newDigest, "ReplicaSet") + newPod.Name = "pod-new" + newPod.CreationTimestamp = metav1.NewTime(now) + + ctrl := newTestController(&mockPoster{}) + ctrl.workloadResolver = &mockResolver{name: "test-deploy"} + + // Feed newest first, then older. The older pod must not overwrite. + records := ctrl.makeSyncRecords(context.Background(), []any{newPod, oldPod}) + require.Len(t, records, 1) + assert.Equal(t, newDigest, records[0].Digest, "newest wins even when the older pod is processed last") +} + +func TestMakeSyncRecords_PrefersRunningOverNewerTerminal(t *testing.T) { + t.Parallel() + runningDigest := "sha256:running" + terminalDigest := "sha256:terminal" + now := time.Now() + + // Running pod is OLDER, terminal Job pod is NEWER. Running should still + // win because a running pod reflects current cluster state. + runningPod := makeTestPod("app", "test-job", runningDigest, "Job") + runningPod.Name = "pod-running" + runningPod.Status.Phase = corev1.PodRunning + runningPod.CreationTimestamp = metav1.NewTime(now.Add(-10 * time.Minute)) + + terminalPod := makeTestPod("app", "test-job", terminalDigest, "Job") + terminalPod.Name = "pod-terminal" + terminalPod.Status.Phase = corev1.PodSucceeded + terminalPod.CreationTimestamp = metav1.NewTime(now) + + ctrl := newTestController(&mockPoster{}) + ctrl.workloadResolver = &mockResolver{name: "test-job"} + + records := ctrl.makeSyncRecords(context.Background(), []any{runningPod, terminalPod}) + require.Len(t, records, 1) + assert.Equal(t, runningDigest, records[0].Digest, "running pod should win over a newer terminal pod") +} + +func TestMakeSyncRecords_TerminalPicksNewest(t *testing.T) { + t.Parallel() + oldDigest := "sha256:old" + newDigest := "sha256:new" + now := time.Now() + + // Two terminal Job pods (no running pod). Newest terminal wins. + oldPod := makeTestPod("app", "test-job", oldDigest, "Job") + oldPod.Name = "pod-old" + oldPod.Status.Phase = corev1.PodSucceeded + oldPod.CreationTimestamp = metav1.NewTime(now.Add(-10 * time.Minute)) + + newPod := makeTestPod("app", "test-job", newDigest, "Job") + newPod.Name = "pod-new" + newPod.Status.Phase = corev1.PodSucceeded + newPod.CreationTimestamp = metav1.NewTime(now) + + ctrl := newTestController(&mockPoster{}) + ctrl.workloadResolver = &mockResolver{name: "test-job"} + + records := ctrl.makeSyncRecords(context.Background(), []any{oldPod, newPod}) + require.Len(t, records, 1) + assert.Equal(t, newDigest, records[0].Digest, "newest terminal pod digest should win") +} + +func TestMakeSyncRecords_DistinctDeploymentNamesNotCollapsed(t *testing.T) { + t.Parallel() + now := time.Now() + + // Different container names produce different deployment_names, so both + // must be kept even though they share a pod-creation window. + podA := makeTestPod("app", "test-deploy-1", "sha256:aaa", "ReplicaSet") + podA.Name = "pod-a" + podA.CreationTimestamp = metav1.NewTime(now) + + podB := makeTestPod("sidecar", "test-deploy-2", "sha256:bbb", "ReplicaSet") + podB.Name = "pod-b" + podB.CreationTimestamp = metav1.NewTime(now) + + ctrl := newTestController(&mockPoster{}) + ctrl.workloadResolver = &mockResolver{name: "test-deploy"} + + records := ctrl.makeSyncRecords(context.Background(), []any{podA, podB}) + require.Len(t, records, 2, "distinct deployment_names should not be collapsed") + + byName := map[string]string{} + for _, r := range records { + byName[r.DeploymentName] = r.Digest + } + assert.Equal(t, "sha256:aaa", byName["default/test-deploy/app"]) + assert.Equal(t, "sha256:bbb", byName["default/test-deploy/sidecar"]) +} + +// TestMakeSyncRecords_NoDuplicateDeploymentNames is the regression guard for the +// 400 "Duplicate deployment_name" error: under a messy real-world mix (a rollout +// with multiple replicas of two digests, plus a second workload), every submitted +// record must have a unique deployment_name. +func TestMakeSyncRecords_NoDuplicateDeploymentNames(t *testing.T) { + t.Parallel() + now := time.Now() + + mkPod := func(name, container, digest string, phase corev1.PodPhase, created time.Time) *corev1.Pod { + p := makeTestPod(container, "test-deploy-rs", digest, "ReplicaSet") + p.Name = name + p.Status.Phase = phase + p.CreationTimestamp = metav1.NewTime(created) + return p + } + + old := now.Add(-10 * time.Minute) + pods := []any{ + // Workload A ("app"): rollout with 2 old replicas + 2 new replicas. + mkPod("a-old-1", "app", "sha256:old", corev1.PodRunning, old), + mkPod("a-old-2", "app", "sha256:old", corev1.PodRunning, old), + mkPod("a-new-1", "app", "sha256:new", corev1.PodRunning, now), + mkPod("a-new-2", "app", "sha256:new", corev1.PodRunning, now), + // Workload B ("sidecar"): single stable version with 3 replicas. + mkPod("b-1", "sidecar", "sha256:bbb", corev1.PodRunning, now), + mkPod("b-2", "sidecar", "sha256:bbb", corev1.PodRunning, now), + mkPod("b-3", "sidecar", "sha256:bbb", corev1.PodRunning, now), + } + + ctrl := newTestController(&mockPoster{}) + ctrl.workloadResolver = &mockResolver{name: "test-deploy"} + + records := ctrl.makeSyncRecords(context.Background(), pods) + + // Ensure no duplicate deployment_names are submitted. + seen := map[string]bool{} + for _, r := range records { + assert.Falsef(t, seen[r.DeploymentName], + "duplicate deployment_name submitted: %s", r.DeploymentName) + seen[r.DeploymentName] = true + } + + // And the winning digest for the rolled-out workload is the newest. + require.Len(t, records, 2, "two distinct deployment_names expected") + byName := map[string]string{} + for _, r := range records { + byName[r.DeploymentName] = r.Digest + } + assert.Equal(t, "sha256:new", byName["default/test-deploy/app"], "newest digest should win for the rollout") + assert.Equal(t, "sha256:bbb", byName["default/test-deploy/sidecar"]) +} + func TestProcessSyncEvents_409Conflict(t *testing.T) { t.Parallel() digest := "sha256:abc123"