Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 114 additions & 18 deletions internal/controller/reporting.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,11 @@ func (c *Controller) processSyncEvents(ctx context.Context, syncClusterPods []an
)
}

// Log per-deployment at INFO now that the job is queued, matching the
// individual "Posted record" path so startup sync has equivalent
// per-deployment visibility. Records rejected at submission are excluded.
logSyncedRecords(jobResp, syncRecords)

// Wait for job completion with a timeout to prevent indefinite startup delay.
jobCtx, cancel := context.WithTimeout(ctx, 5*time.Minute)
defer cancel()
Expand Down Expand Up @@ -197,9 +202,20 @@ func (c *Controller) processSyncEvents(ctx context.Context, syncClusterPods []an
return nil
}

// syncCandidate is a single container observed on a pod that is eligible to
// become the deployment record for a given deployment_name during startup sync.
type syncCandidate struct {
pod *corev1.Pod
container corev1.Container
wlName string
}

func (c *Controller) makeSyncRecords(ctx context.Context, syncClusterPods []any) []*deploymentrecord.Record {
seenSyncRecords := make(map[string]bool)
var syncRecords []*deploymentrecord.Record
// The API requires a unique set of deployment_names for a snapshot job.
// During a rollout the same deployment_name can appear with multiple digests
// (old and new pods running at once), so we must collapse to a single digest per workload and
// container name. We pick the newest running pod so the snapshot reflects the rollout target.
Comment thread
ajbeattie marked this conversation as resolved.
winners := make(map[string]syncCandidate)
for _, p := range syncClusterPods {
pod, ok := p.(*corev1.Pod)
if !ok {
Expand Down Expand Up @@ -227,34 +243,75 @@ func (c *Controller) makeSyncRecords(ctx context.Context, syncClusterPods []any)
allContainers = append(allContainers, pod.Spec.Containers...)
allContainers = append(allContainers, pod.Spec.InitContainers...)

// Filter out containers already in syncRecords
var newContainers []corev1.Container
// Dedupe to one record per deployment_name. On a digest conflict
// (rollout in progress) the preferred pod wins (running over terminal; otherwise newest).
for _, container := range allContainers {
dn := getARDeploymentName(pod, container, c.cfg.Template, wl.Name)
digest := getContainerDigest(pod, container.Name)
if dn == "" || digest == "" {
continue
}
key := getCacheKey(EventCreated, dn, digest)
if seenSyncRecords[key] {

existing, exists := winners[dn]
if !exists {
winners[dn] = syncCandidate{pod: pod, container: container, wlName: wl.Name}
continue
}
seenSyncRecords[key] = true
newContainers = append(newContainers, container)
}

if len(newContainers) == 0 {
continue
// Same deployment_name with the same digest is just another replica
// of the same version. The submitted record (deployment_name, digest,
// image) is identical regardless of which pod we pick, so keep the
// one we already have.
existingDigest := getContainerDigest(existing.pod, existing.container.Name)
if existingDigest == digest {
continue
}

// Collision with a different digest (e.g. rollout in progress).
// Keep the newest running pod's digest and log the decision so the
// rollout is visible.
if isPreferredSyncPod(pod, existing.pod) {
slog.Info("Multiple digests observed for deployment_name during sync, choosing preferred pod",
"deployment_name", dn,
"chosen_pod", pod.Name,
"chosen_digest", digest,
"replaced_pod", existing.pod.Name,
"replaced_digest", existingDigest,
)
winners[dn] = syncCandidate{pod: pod, container: container, wlName: wl.Name}
} else {
slog.Info("Multiple digests observed for deployment_name during sync, keeping preferred pod",
"deployment_name", dn,
"kept_pod", existing.pod.Name,
"kept_digest", existingDigest,
"skipped_pod", pod.Name,
"skipped_digest", digest,
)
}
}
}

// Only gather aggregate metadata if there are new containers
aggPodMetadata := c.metadataAggregator.BuildAggregatePodMetadata(ctx, podToPartialMetadata(pod))
// Build records from the winning candidates. Iterate in sorted
// deployment_name order for deterministic output, and cache aggregate
// metadata per pod so pods hosting multiple winners aren't recomputed.
aggCache := make(map[*corev1.Pod]*metadata.AggregatePodMetadata)
dns := make([]string, 0, len(winners))
for dn := range winners {
dns = append(dns, dn)
}
slices.Sort(dns)

for _, container := range newContainers {
record := c.buildRecord(pod, container, EventCreated, wl.Name, aggPodMetadata)
if record != nil {
syncRecords = append(syncRecords, record)
}
syncRecords := make([]*deploymentrecord.Record, 0, len(winners))
for _, dn := range dns {
cand := winners[dn]
agg, ok := aggCache[cand.pod]
if !ok {
agg = c.metadataAggregator.BuildAggregatePodMetadata(ctx, podToPartialMetadata(cand.pod))
aggCache[cand.pod] = agg
}
record := c.buildRecord(cand.pod, cand.container, EventCreated, cand.wlName, agg)
if record != nil {
syncRecords = append(syncRecords, record)
}
}

Expand All @@ -264,6 +321,45 @@ func (c *Controller) makeSyncRecords(ctx context.Context, syncClusterPods []any)
return syncRecords
}

// isPreferredSyncPod reports whether candidate should replace current as the
// chosen pod for a deployment_name during startup sync. Running pods are
// preferred over terminal pods; among pods in the same category the most
// recently created pod wins. This makes the startup snapshot reflect the
// rollout target (newest running digest) when multiple digests coexist.
func isPreferredSyncPod(candidate, current *corev1.Pod) bool {
candRunning := candidate.Status.Phase == corev1.PodRunning
curRunning := current.Status.Phase == corev1.PodRunning
if candRunning != curRunning {
return candRunning
}
return candidate.CreationTimestamp.After(current.CreationTimestamp.Time)
}

// logSyncedRecords emits one log line per accepted record after a cluster job
// has been queued.
func logSyncedRecords(jobResp *deploymentrecord.JobResponse, records []*deploymentrecord.Record) {
rejected := make(map[string]bool, len(jobResp.Errors))
for _, e := range jobResp.Errors {
rejected[e.Name] = true
}

for _, r := range records {
if rejected[r.Name] {
continue
}
slog.Info("Submitted record for sync",
"event_type", EventCreated,
"name", r.Name,
"deployment_name", r.DeploymentName,
"status", r.Status,
"digest", r.Digest,
"runtime_risks", r.RuntimeRisks,
"tags", r.Tags,
"job_id", jobResp.JobID,
)
}
}

// fillCachesFromSubmitted populates the observedDeployments cache from the
// records we submitted, without waiting for a response. Used when we can't
// get a response (409 conflict, wait timeout, job failure).
Expand Down
180 changes: 180 additions & 0 deletions internal/controller/reporting_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,186 @@ func TestProcessSyncEvents_DedupeContainers(t *testing.T) {
assert.Equal(t, 1, poster.clusterRecordCount, "CreateClusterJob should receive only 1 record")
}

func TestMakeSyncRecords_RolloutPicksNewestRunningDigest(t *testing.T) {
t.Parallel()
oldDigest := "sha256:old"
newDigest := "sha256:new"
now := time.Now()

// Same deployment_name (default/test-deploy/app) with two digests, as
// happens mid-rollout when old and new pods are both Running.
oldPod := makeTestPod("app", "test-deploy-old", oldDigest, "ReplicaSet")
oldPod.Name = "pod-old"
oldPod.CreationTimestamp = metav1.NewTime(now.Add(-10 * time.Minute))

newPod := makeTestPod("app", "test-deploy-new", newDigest, "ReplicaSet")
newPod.Name = "pod-new"
newPod.CreationTimestamp = metav1.NewTime(now)

ctrl := newTestController(&mockPoster{})
ctrl.workloadResolver = &mockResolver{name: "test-deploy"}

// Feed old first so we exercise replacement, not just first-seen.
records := ctrl.makeSyncRecords(context.Background(), []any{oldPod, newPod})
require.Len(t, records, 1, "rollout should collapse to one record per deployment_name")
assert.Equal(t, newDigest, records[0].Digest, "newest running pod digest should win")
assert.Equal(t, "default/test-deploy/app", records[0].DeploymentName)
}

func TestMakeSyncRecords_NewestWinsRegardlessOfOrder(t *testing.T) {
t.Parallel()
oldDigest := "sha256:old"
newDigest := "sha256:new"
now := time.Now()

oldPod := makeTestPod("app", "test-deploy-old", oldDigest, "ReplicaSet")
oldPod.Name = "pod-old"
oldPod.CreationTimestamp = metav1.NewTime(now.Add(-10 * time.Minute))

newPod := makeTestPod("app", "test-deploy-new", newDigest, "ReplicaSet")
newPod.Name = "pod-new"
newPod.CreationTimestamp = metav1.NewTime(now)

ctrl := newTestController(&mockPoster{})
ctrl.workloadResolver = &mockResolver{name: "test-deploy"}

// Feed newest first, then older. The older pod must not overwrite.
records := ctrl.makeSyncRecords(context.Background(), []any{newPod, oldPod})
require.Len(t, records, 1)
assert.Equal(t, newDigest, records[0].Digest, "newest wins even when the older pod is processed last")
}

func TestMakeSyncRecords_PrefersRunningOverNewerTerminal(t *testing.T) {
t.Parallel()
runningDigest := "sha256:running"
terminalDigest := "sha256:terminal"
now := time.Now()

// Running pod is OLDER, terminal Job pod is NEWER. Running should still
// win because a running pod reflects current cluster state.
runningPod := makeTestPod("app", "test-job", runningDigest, "Job")
runningPod.Name = "pod-running"
runningPod.Status.Phase = corev1.PodRunning
runningPod.CreationTimestamp = metav1.NewTime(now.Add(-10 * time.Minute))

terminalPod := makeTestPod("app", "test-job", terminalDigest, "Job")
terminalPod.Name = "pod-terminal"
terminalPod.Status.Phase = corev1.PodSucceeded
terminalPod.CreationTimestamp = metav1.NewTime(now)

ctrl := newTestController(&mockPoster{})
ctrl.workloadResolver = &mockResolver{name: "test-job"}

records := ctrl.makeSyncRecords(context.Background(), []any{runningPod, terminalPod})
require.Len(t, records, 1)
assert.Equal(t, runningDigest, records[0].Digest, "running pod should win over a newer terminal pod")
}

func TestMakeSyncRecords_TerminalPicksNewest(t *testing.T) {
t.Parallel()
oldDigest := "sha256:old"
newDigest := "sha256:new"
now := time.Now()

// Two terminal Job pods (no running pod). Newest terminal wins.
oldPod := makeTestPod("app", "test-job", oldDigest, "Job")
oldPod.Name = "pod-old"
oldPod.Status.Phase = corev1.PodSucceeded
oldPod.CreationTimestamp = metav1.NewTime(now.Add(-10 * time.Minute))

newPod := makeTestPod("app", "test-job", newDigest, "Job")
newPod.Name = "pod-new"
newPod.Status.Phase = corev1.PodSucceeded
newPod.CreationTimestamp = metav1.NewTime(now)

ctrl := newTestController(&mockPoster{})
ctrl.workloadResolver = &mockResolver{name: "test-job"}

records := ctrl.makeSyncRecords(context.Background(), []any{oldPod, newPod})
require.Len(t, records, 1)
assert.Equal(t, newDigest, records[0].Digest, "newest terminal pod digest should win")
}

func TestMakeSyncRecords_DistinctDeploymentNamesNotCollapsed(t *testing.T) {
t.Parallel()
now := time.Now()

// Different container names produce different deployment_names, so both
// must be kept even though they share a pod-creation window.
podA := makeTestPod("app", "test-deploy-1", "sha256:aaa", "ReplicaSet")
podA.Name = "pod-a"
podA.CreationTimestamp = metav1.NewTime(now)

podB := makeTestPod("sidecar", "test-deploy-2", "sha256:bbb", "ReplicaSet")
podB.Name = "pod-b"
podB.CreationTimestamp = metav1.NewTime(now)

ctrl := newTestController(&mockPoster{})
ctrl.workloadResolver = &mockResolver{name: "test-deploy"}

records := ctrl.makeSyncRecords(context.Background(), []any{podA, podB})
require.Len(t, records, 2, "distinct deployment_names should not be collapsed")

byName := map[string]string{}
for _, r := range records {
byName[r.DeploymentName] = r.Digest
}
assert.Equal(t, "sha256:aaa", byName["default/test-deploy/app"])
assert.Equal(t, "sha256:bbb", byName["default/test-deploy/sidecar"])
}

// TestMakeSyncRecords_NoDuplicateDeploymentNames is the regression guard for the
// 400 "Duplicate deployment_name" error: under a messy real-world mix (a rollout
// with multiple replicas of two digests, plus a second workload), every submitted
// record must have a unique deployment_name.
func TestMakeSyncRecords_NoDuplicateDeploymentNames(t *testing.T) {
t.Parallel()
now := time.Now()

mkPod := func(name, container, digest string, phase corev1.PodPhase, created time.Time) *corev1.Pod {
p := makeTestPod(container, "test-deploy-rs", digest, "ReplicaSet")
p.Name = name
p.Status.Phase = phase
p.CreationTimestamp = metav1.NewTime(created)
return p
}

old := now.Add(-10 * time.Minute)
pods := []any{
// Workload A ("app"): rollout with 2 old replicas + 2 new replicas.
mkPod("a-old-1", "app", "sha256:old", corev1.PodRunning, old),
mkPod("a-old-2", "app", "sha256:old", corev1.PodRunning, old),
mkPod("a-new-1", "app", "sha256:new", corev1.PodRunning, now),
mkPod("a-new-2", "app", "sha256:new", corev1.PodRunning, now),
// Workload B ("sidecar"): single stable version with 3 replicas.
mkPod("b-1", "sidecar", "sha256:bbb", corev1.PodRunning, now),
mkPod("b-2", "sidecar", "sha256:bbb", corev1.PodRunning, now),
mkPod("b-3", "sidecar", "sha256:bbb", corev1.PodRunning, now),
}

ctrl := newTestController(&mockPoster{})
ctrl.workloadResolver = &mockResolver{name: "test-deploy"}

records := ctrl.makeSyncRecords(context.Background(), pods)

// Ensure no duplicate deployment_names are submitted.
seen := map[string]bool{}
for _, r := range records {
assert.Falsef(t, seen[r.DeploymentName],
"duplicate deployment_name submitted: %s", r.DeploymentName)
seen[r.DeploymentName] = true
}

// And the winning digest for the rolled-out workload is the newest.
require.Len(t, records, 2, "two distinct deployment_names expected")
byName := map[string]string{}
for _, r := range records {
byName[r.DeploymentName] = r.Digest
}
assert.Equal(t, "sha256:new", byName["default/test-deploy/app"], "newest digest should win for the rollout")
assert.Equal(t, "sha256:bbb", byName["default/test-deploy/sidecar"])
}

func TestProcessSyncEvents_409Conflict(t *testing.T) {
t.Parallel()
digest := "sha256:abc123"
Expand Down