Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/deployment-tracker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ func main() {
GHAppPrivateKey: ghAppPrivateKey,
GHAppPrivateKeyPath: getEnvOrDefault("GH_APP_PRIV_KEY_PATH", ""),
Organization: os.Getenv("GITHUB_ORG"),
BulkClusterSync: strings.EqualFold(getEnvOrDefault("BULK_CLUSTER_SYNC", "false"), "true"),
}

if len(cntrlCfg.GHAppPrivateKey) > 0 && cntrlCfg.GHAppPrivateKeyPath != "" {
Expand Down
5 changes: 5 additions & 0 deletions internal/controller/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ type Config struct {
GHAppPrivateKey []byte
GHAppPrivateKeyPath string
Organization string
// BulkClusterSync enables the async cluster job endpoint for startup
// state sync. When false, startup sync is skipped and only individual
// PostOne calls are used. **Note: this is experimental and not yet available
// for public use.**
BulkClusterSync bool
}

// ValidTemplate verifies that at least one placeholder is present
Expand Down
27 changes: 26 additions & 1 deletion internal/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"log/slog"
"strings"
"sync/atomic"
"time"

"github.com/github/deployment-tracker/internal/metadata"
Expand Down Expand Up @@ -47,7 +48,9 @@ type ttlCache interface {
}

type deploymentRecordPoster interface {
PostOne(ctx context.Context, record *deploymentrecord.DeploymentRecord) error
PostOne(ctx context.Context, record *deploymentrecord.Record) error
CreateClusterJob(ctx context.Context, records []*deploymentrecord.Record, cluster string) (*deploymentrecord.JobResponse, error)
WaitForClusterJob(ctx context.Context, cluster string, jobID int64) (*deploymentrecord.JobStatus, error)
}

type podMetadataAggregator interface {
Expand Down Expand Up @@ -87,6 +90,11 @@ type Controller struct {
// informerSyncTimeout is the maximum time allowed for all informers to sync
// and prevents sync from hanging indefinitely.
informerSyncTimeout time.Duration
// syncing gates informer event handlers during startup. When true,
// pod add events are suppressed so they can be reported via the bulk
// cluster job instead of individual PostOne calls. Only set when
// BulkClusterSync is enabled.
syncing atomic.Bool
}

// New creates a new deployment tracker controller.
Expand Down Expand Up @@ -147,10 +155,21 @@ func New(clientset kubernetes.Interface, metadataAggregator podMetadataAggregato
unknownArtifacts: amcache.NewExpiring(),
informerSyncTimeout: informerSyncTimeoutDuration,
}
// Only gate informer events when bulk cluster sync is enabled.
// When disabled, all pods discovered during informer sync will be
// enqueued as individual events.
if cfg.BulkClusterSync {
cntrl.syncing.Store(true)
}

// Add event handlers to the informer
_, err = podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj any) {
// Skip adding sync events
if cntrl.syncing.Load() {
return
}
Comment thread
ajbeattie marked this conversation as resolved.

pod, ok := obj.(*corev1.Pod)
if !ok {
slog.Error("Invalid object returned",
Expand Down Expand Up @@ -314,6 +333,12 @@ func (c *Controller) Run(ctx context.Context, workers int) error {
return errors.New("timed out waiting for caches to sync - please ensure deployment tracker has the correct kubernetes permissions")
}
}
c.syncing.Store(false)
syncClusterPods := c.podInformer.GetIndexer().List()
err := c.processSyncEvents(ctx, syncClusterPods)
if err != nil {
return fmt.Errorf("sync events failed: %w", err)
}

slog.Info("Starting workers",
"count", workers,
Expand Down
14 changes: 11 additions & 3 deletions internal/controller/controller_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,27 @@ import (

type mockRecordPoster struct {
mu sync.Mutex
records []*deploymentrecord.DeploymentRecord
records []*deploymentrecord.Record
err error // to simulate failures
}

func (m *mockRecordPoster) PostOne(_ context.Context, record *deploymentrecord.DeploymentRecord) error {
func (m *mockRecordPoster) PostOne(_ context.Context, record *deploymentrecord.Record) error {
m.mu.Lock()
defer m.mu.Unlock()
m.records = append(m.records, record)
return m.err
}

func (m *mockRecordPoster) CreateClusterJob(_ context.Context, _ []*deploymentrecord.Record, _ string) (*deploymentrecord.JobResponse, error) {
return &deploymentrecord.JobResponse{}, nil
}

func (m *mockRecordPoster) WaitForClusterJob(_ context.Context, _ string, _ int64) (*deploymentrecord.JobStatus, error) {
return &deploymentrecord.JobStatus{Status: "completed"}, nil
}

// Helper that allows tests to read captured records safely.
func (m *mockRecordPoster) getRecords() []*deploymentrecord.DeploymentRecord {
func (m *mockRecordPoster) getRecords() []*deploymentrecord.Record {
m.mu.Lock()
defer m.mu.Unlock()
return slices.Clone(m.records)
Expand Down
62 changes: 54 additions & 8 deletions internal/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"testing"
"time"

"github.com/github/deployment-tracker/internal/metadata"
"github.com/github/deployment-tracker/internal/workload"
"github.com/github/deployment-tracker/pkg/deploymentrecord"
"github.com/stretchr/testify/assert"
Expand All @@ -23,35 +24,78 @@ import (

// mockPoster records all PostOne calls and returns a configurable error.
type mockPoster struct {
mu sync.Mutex
calls int
lastErr error
mu sync.Mutex
calls int
clusterRecordCount int
jobCalls int
jobWaitCalls int
lastErr error
jobResp *deploymentrecord.JobResponse
jobErr error
jobStatus *deploymentrecord.JobStatus
jobWaitErr error
}

func (m *mockPoster) PostOne(_ context.Context, _ *deploymentrecord.DeploymentRecord) error {
func (m *mockPoster) PostOne(_ context.Context, _ *deploymentrecord.Record) error {
m.mu.Lock()
defer m.mu.Unlock()
m.calls++
return m.lastErr
}

func (m *mockPoster) getCalls() int {
func (m *mockPoster) CreateClusterJob(_ context.Context, records []*deploymentrecord.Record, _ string) (*deploymentrecord.JobResponse, error) {
m.mu.Lock()
defer m.mu.Unlock()
m.jobCalls++
m.clusterRecordCount = len(records)
return m.jobResp, m.jobErr
}

func (m *mockPoster) WaitForClusterJob(_ context.Context, _ string, _ int64) (*deploymentrecord.JobStatus, error) {
m.mu.Lock()
defer m.mu.Unlock()
m.jobWaitCalls++
return m.jobStatus, m.jobWaitErr
}

func (m *mockPoster) getPostOneCalls() int {
m.mu.Lock()
defer m.mu.Unlock()
return m.calls
}

func (m *mockPoster) getCreateClusterJobCalls() int {
m.mu.Lock()
defer m.mu.Unlock()
return m.jobCalls
}

func (m *mockPoster) getWaitForClusterJobCalls() int {
m.mu.Lock()
defer m.mu.Unlock()
return m.jobWaitCalls
}

// mockResolver is a test double for the workloadResolver interface.
type mockResolver struct{}
type mockResolver struct {
name string
}

func (*mockResolver) Resolve(_ *corev1.Pod) workload.Identity {
return workload.Identity{}
func (m *mockResolver) Resolve(_ *corev1.Pod) workload.Identity {
return workload.Identity{Name: m.name}
}

func (*mockResolver) IsActive(_ string, _ workload.Identity) bool {
return false
}

// mockMetadataAggregator is a test double for the podMetadataAggregator interface.
type mockMetadataAggregator struct{}

func (*mockMetadataAggregator) BuildAggregatePodMetadata(_ context.Context, _ *metav1.PartialObjectMetadata) *metadata.AggregatePodMetadata {
return nil
}

// newTestController creates a minimal Controller suitable for unit-testing
// recordContainer without a real Kubernetes cluster.
func newTestController(poster *mockPoster) *Controller {
Expand All @@ -62,8 +106,10 @@ func newTestController(poster *mockPoster) *Controller {
LogicalEnvironment: "test",
PhysicalEnvironment: "test",
Cluster: "test",
BulkClusterSync: true,
},
workloadResolver: &mockResolver{},
metadataAggregator: &mockMetadataAggregator{},
observedDeployments: amcache.NewExpiring(),
unknownArtifacts: amcache.NewExpiring(),
}
Expand Down
Loading