diff --git a/api/v1alpha1/targetsource_types.go b/api/v1alpha1/targetsource_types.go index 3d69743..dd6fb59 100644 --- a/api/v1alpha1/targetsource_types.go +++ b/api/v1alpha1/targetsource_types.go @@ -52,9 +52,10 @@ type ConsulConfig struct { // TargetSourceStatus defines the observed state of TargetSource type TargetSourceStatus struct { - Status string `json:"status"` - TargetsCount int32 `json:"targetsCount"` - LastSync metav1.Time `json:"lastSync"` + Status string `json:"status,omitempty"` + ObservedGeneration int64 `json:"observedGeneration"` + TargetsCount int32 `json:"targetsCount,omitempty"` + LastSync metav1.Time `json:"lastSync,omitempty"` } //+kubebuilder:object:root=true diff --git a/cmd/main.go b/cmd/main.go index aaf398a..3bb04f7 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -41,8 +41,8 @@ import ( operatorv1alpha1 "github.com/gnmic/operator/api/v1alpha1" "github.com/gnmic/operator/internal/apiserver" "github.com/gnmic/operator/internal/controller" - "github.com/gnmic/operator/internal/controller/discovery/core" "github.com/gnmic/operator/internal/controller/discovery" + "github.com/gnmic/operator/internal/controller/discovery/core" webhookv1alpha1 "github.com/gnmic/operator/internal/webhook/v1alpha1" //+kubebuilder:scaffold:imports ) diff --git a/config/crd/bases/operator.gnmic.dev_targetsources.yaml b/config/crd/bases/operator.gnmic.dev_targetsources.yaml index 37d6919..b8d24e1 100644 --- a/config/crd/bases/operator.gnmic.dev_targetsources.yaml +++ b/config/crd/bases/operator.gnmic.dev_targetsources.yaml @@ -79,15 +79,16 @@ spec: lastSync: format: date-time type: string + observedGeneration: + format: int64 + type: integer status: type: string targetsCount: format: int32 type: integer required: - - lastSync - - status - - targetsCount + - observedGeneration type: object type: object served: true diff --git a/internal/controller/discovery/client.go b/internal/controller/discovery/client.go index cb02161..e5cc5ea 100644 --- a/internal/controller/discovery/client.go +++ b/internal/controller/discovery/client.go @@ -3,17 +3,18 @@ package discovery import ( "context" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/util/retry" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" gnmicv1alpha1 "github.com/gnmic/operator/api/v1alpha1" ) -func fetchExistingTargets( - ctx context.Context, - c client.Client, - ts *gnmicv1alpha1.TargetSource, -) ([]gnmicv1alpha1.Target, error) { - +func fetchExistingTargets(ctx context.Context, c client.Client, ts *gnmicv1alpha1.TargetSource) ([]gnmicv1alpha1.Target, error) { var targetList gnmicv1alpha1.TargetList err := c.List( @@ -30,3 +31,59 @@ func fetchExistingTargets( return targetList.Items, nil } + +func applyTarget(ctx context.Context, c client.Client, s *runtime.Scheme, desired *gnmicv1alpha1.Target, ts *gnmicv1alpha1.TargetSource) error { + existing := &gnmicv1alpha1.Target{ + ObjectMeta: metav1.ObjectMeta{ + Name: desired.Name, + Namespace: desired.Namespace, + }, + } + + _, err := controllerutil.CreateOrUpdate(ctx, c, existing, func() error { + existing.Spec = desired.Spec + existing.Labels = desired.Labels + + return controllerutil.SetControllerReference(ts, existing, s) + }) + + return err +} + +func deleteTarget(ctx context.Context, c client.Client, name string, namespace string) error { + existing := &gnmicv1alpha1.Target{} + + err := c.Get(ctx, types.NamespacedName{ + Name: name, + Namespace: namespace, + }, existing) + if apierrors.IsNotFound(err) { + return nil + } else if err != nil { + return err + } + + err = c.Delete(ctx, existing) + if apierrors.IsNotFound(err) { + return nil + } + + return err +} + +// updateTargetSourceStatus updates the status of the TargetSource Object ts. The only fields updated are targetCount and LastSync, which takes the current timestamp. +func updateTargetSourceStatus(ctx context.Context, c client.Client, ts *gnmicv1alpha1.TargetSource, targetCount int32) error { + err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + latest := &gnmicv1alpha1.TargetSource{} + if err := c.Get(ctx, client.ObjectKeyFromObject(ts), latest); err != nil { + return err + } + + latest.Status.TargetsCount = targetCount + latest.Status.LastSync = metav1.Now() + + return c.Status().Update(ctx, latest) + }) + + return err +} diff --git a/internal/controller/discovery/const.go b/internal/controller/discovery/const.go index ac7a57f..b48331d 100644 --- a/internal/controller/discovery/const.go +++ b/internal/controller/discovery/const.go @@ -1,6 +1,6 @@ package discovery const ( - // Labels + // Kubernetes Side Labels LabelTargetSourceName = "operator.gnmic.dev/targetsource" ) diff --git a/internal/controller/discovery/core/types.go b/internal/controller/discovery/core/types.go index 99605b9..5a1c8cf 100644 --- a/internal/controller/discovery/core/types.go +++ b/internal/controller/discovery/core/types.go @@ -47,6 +47,17 @@ type DiscoveryEvent struct { Event EventAction } +func (e EventAction) String() string { + switch e { + case EventDelete: + return "DELETE" + case EventApply: + return "APPLY" + default: + return "UNKNOWN" + } +} + type DiscoverySnapshot struct { SnapshotID string ChunkIndex int diff --git a/internal/controller/discovery/loaders/http/loader.go b/internal/controller/discovery/loaders/http/loader.go index 3325adb..5169e59 100644 --- a/internal/controller/discovery/loaders/http/loader.go +++ b/internal/controller/discovery/loaders/http/loader.go @@ -42,6 +42,8 @@ func (l *Loader) Run(ctx context.Context, out chan<- []core.DiscoveryMessage) er ticker := time.NewTicker(30 * time.Second) defer ticker.Stop() + i := 1 + for { select { case <-ctx.Done(): @@ -49,24 +51,61 @@ func (l *Loader) Run(ctx context.Context, out chan<- []core.DiscoveryMessage) er return nil case <-ticker.C: - // Example snapshot (placeholder) - snapshotID := fmt.Sprintf("%s-%s-%s", l.commonCfg.TargetsourceNN.Namespace, l.commonCfg.TargetsourceNN.Name, uuid.NewString()) - targets := []core.DiscoveredTarget{ - { - Name: "ceos1", - Address: "clab-3-nodes-ceos1:6030", - Labels: map[string]string{"TargetSource": l.commonCfg.TargetsourceNN.String()}, - }, - { - Name: "leaf1", - Address: "clab-3-nodes-leaf1:57400", - Labels: map[string]string{"TargetSource": l.commonCfg.TargetsourceNN.String()}, - }, - } + // Switch case + i only needed to test behavior for messages with different values. + switch i { + case 1: + snapshotID := fmt.Sprintf("%s-%s-%s", l.commonCfg.TargetsourceNN.Namespace, l.commonCfg.TargetsourceNN.Name, uuid.NewString()) + targets := []core.DiscoveredTarget{ + { + Name: "ceos1", + Address: "clab-3-nodes-ceos1:6030", + Labels: map[string]string{}, + }, + { + Name: "leaf1", + Address: "clab-3-nodes-leaf1:57400", + Labels: map[string]string{"gnmic_operator_target_profile": "default1"}, + }, + } + + if err := loaderUtils.SendSnapshot(ctx, out, targets, snapshotID, l.commonCfg.ChunkSize); err != nil { + return err + } + case 2: + snapshotID := fmt.Sprintf("%s-%s-%s", l.commonCfg.TargetsourceNN.Namespace, l.commonCfg.TargetsourceNN.Name, uuid.NewString()) + targets := []core.DiscoveredTarget{ + { + Name: "ceos1", + Address: "clab-3-nodes-ceos1:6030", + Labels: map[string]string{"gnmic_operator_target_profile": "default1"}, + }, + { + Name: "leaf2", + Address: "clab-3-nodes-leaf2:57400", + Labels: map[string]string{"gnmic_operator_target_profile": "default1"}, + }, + } - if err := loaderUtils.SendSnapshot(ctx, out, targets, snapshotID, l.commonCfg.ChunkSize); err != nil { - return err + if err := loaderUtils.SendSnapshot(ctx, out, targets, snapshotID, l.commonCfg.ChunkSize); err != nil { + return err + } + + default: + snapshotID := fmt.Sprintf("%s-%s-%s", l.commonCfg.TargetsourceNN.Namespace, l.commonCfg.TargetsourceNN.Name, uuid.NewString()) + targets := []core.DiscoveredTarget{ + { + Name: "ceos1", + Address: "clab-3-nodes-ceos2:6030", + Labels: map[string]string{"gnmic_operator_target_profile": "default2"}, + }, + } + + if err := loaderUtils.SendSnapshot(ctx, out, targets, snapshotID, l.commonCfg.ChunkSize); err != nil { + return err + } } + + i++ } } } diff --git a/internal/controller/discovery/mapper.go b/internal/controller/discovery/mapper.go new file mode 100644 index 0000000..bc42531 --- /dev/null +++ b/internal/controller/discovery/mapper.go @@ -0,0 +1,74 @@ +package discovery + +import ( + "maps" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + gnmicv1alpha1 "github.com/gnmic/operator/api/v1alpha1" + "github.com/gnmic/operator/internal/controller/discovery/core" +) + +// generateTargetResource converts a DiscoveredTarget into a Kubernetes Target Object based on the TargetSource Spec. +func generateTargetResource(d core.DiscoveredTarget, ts *gnmicv1alpha1.TargetSource) *gnmicv1alpha1.Target { + // Create object instance + t := &gnmicv1alpha1.Target{ + ObjectMeta: metav1.ObjectMeta{ + Name: d.Name, + Namespace: ts.Namespace, + Labels: make(map[string]string), + }, + } + + // Add Address from DiscoveredTarget + t.Spec.Address = d.Address + // Add default Target Profile from the TargetSource Spec TargetProfile + t.Spec.Profile = ts.Spec.TargetProfile + + // Copy TargetLabels from TargetSource Spec & DiscoveredTarget. Discovered labels take precedence over TargetSource labels. + maps.Copy(t.Labels, ts.Spec.TargetLabels) + maps.Copy(t.Labels, d.Labels) + + // Add TargetSource Label to the Target (precedence over all labels) + t.Labels[LabelTargetSourceName] = ts.Name + + return t +} + +// generateEvents returns a list of DiscoveryEvents. Needed for snapshot handling to determine which devices get deleted and which applied. +func generateEvents(existing []gnmicv1alpha1.Target, discovered []core.DiscoveredTarget) []core.DiscoveryEvent { + var events []core.DiscoveryEvent + + discoveredMap := make(map[string]core.DiscoveredTarget) + for _, d := range discovered { + discoveredMap[d.Name] = d + } + + // Create delete events for targets which are present in existing but not in discovered + for _, e := range existing { + if _, found := discoveredMap[e.Name]; !found { + events = append(events, core.DiscoveryEvent{ + Target: core.DiscoveredTarget{ + Name: e.Name, + }, + Event: core.EventDelete, + }) + } + } + + // Create apply events for all targets in discovered + for _, d := range discovered { + events = append(events, core.DiscoveryEvent{ + Target: d, + Event: core.EventApply, + }) + } + + return events +} + +// normalizeTarget adds the prefix to the target name for identification in Kubernetes +func normalizeTarget(t core.DiscoveredTarget, tsName string) core.DiscoveredTarget { + t.Name = tsName + "-" + t.Name + return t +} diff --git a/internal/controller/discovery/mapper_test.go b/internal/controller/discovery/mapper_test.go new file mode 100644 index 0000000..ffaad40 --- /dev/null +++ b/internal/controller/discovery/mapper_test.go @@ -0,0 +1,452 @@ +package discovery + +import ( + "fmt" + "testing" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + gnmicv1alpha1 "github.com/gnmic/operator/api/v1alpha1" + "github.com/gnmic/operator/internal/controller/discovery/core" +) + +func mockDiscoveredTargetList(len int) []core.DiscoveredTarget { + targets := make([]core.DiscoveredTarget, len) + + if len > 100 { + len = 100 + } + + for i := range len { + targets[i] = core.DiscoveredTarget{ + Address: fmt.Sprintf("192.168.1.%d", i+1), + Name: fmt.Sprintf("router%d", i+1), + } + } + + return targets +} + +func mockDiscoveryTarget(opts ...func(*core.DiscoveredTarget)) core.DiscoveredTarget { + t := core.DiscoveredTarget{ + Name: "target1", + Address: "10.0.0.1", + Labels: map[string]string{}, + } + + for _, opt := range opts { + opt(&t) + } + + return t +} + +func withDiscoveredTargetName(name string) func(*core.DiscoveredTarget) { + return func(t *core.DiscoveredTarget) { + t.Name = name + } +} + +func withDiscoveredTargetAddress(address string) func(*core.DiscoveredTarget) { + return func(t *core.DiscoveredTarget) { + t.Address = address + } +} + +func withDiscoveredTargetLabels(labels map[string]string) func(*core.DiscoveredTarget) { + return func(t *core.DiscoveredTarget) { + t.Labels = labels + } +} + +func mockTargetSource(opts ...func(*gnmicv1alpha1.TargetSource)) gnmicv1alpha1.TargetSource { + ts := gnmicv1alpha1.TargetSource{ + ObjectMeta: metav1.ObjectMeta{ + Name: "ts1", + Namespace: "default", + }, + Spec: gnmicv1alpha1.TargetSourceSpec{ + TargetProfile: "default", + TargetLabels: map[string]string{}, + }, + } + + for _, opt := range opts { + opt(&ts) + } + + return ts +} + +func withTargetSourceName(name string) func(*gnmicv1alpha1.TargetSource) { + return func(ts *gnmicv1alpha1.TargetSource) { + ts.ObjectMeta.Name = name + } +} + +func withTargetSourceNamespace(namespace string) func(*gnmicv1alpha1.TargetSource) { + return func(ts *gnmicv1alpha1.TargetSource) { + ts.ObjectMeta.Namespace = namespace + } +} + +func withTargetSourceTargetProfile(profile string) func(*gnmicv1alpha1.TargetSource) { + return func(ts *gnmicv1alpha1.TargetSource) { + ts.Spec.TargetProfile = profile + } +} + +func withTargetSourceTargetLabels(labels map[string]string) func(*gnmicv1alpha1.TargetSource) { + return func(ts *gnmicv1alpha1.TargetSource) { + ts.Spec.TargetLabels = labels + } +} + +func mockGnmicTargetList(len int) []gnmicv1alpha1.Target { + targets := make([]gnmicv1alpha1.Target, len) + + if len > 100 { + len = 100 + } + + for i := range len { + targets[i] = gnmicv1alpha1.Target{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("router%d", i+1), + Namespace: "default", + }, + Spec: gnmicv1alpha1.TargetSpec{ + Address: fmt.Sprintf("192.168.1.%d", i+1), + Profile: "default", + }, + } + } + + return targets +} + +func TestGenerateEvents_EmptyLists(t *testing.T) { + events := generateEvents( + mockGnmicTargetList(0), + mockDiscoveredTargetList(0), + ) + + if len(events) != 0 { + t.Fatalf("expected 0 events, got %d", len(events)) + } +} + +func TestGenerateEvents_AllDiscoveredTargetsBecomeApplyEvents(t *testing.T) { + discovered := mockDiscoveredTargetList(5) + + events := generateEvents( + mockGnmicTargetList(0), + discovered, + ) + + if len(events) != len(discovered) { + t.Fatalf("expected %d events, got %d", len(discovered), len(events)) + } + + for _, event := range events { + if event.Event != core.EventApply { + t.Fatalf( + "expected all events to be %s, got %s", + core.EventApply.String(), + event.Event.String(), + ) + } + } +} + +func TestGenerateEvents_AllExistingTargetsBecomeDeleteEvents(t *testing.T) { + existing := mockGnmicTargetList(5) + + events := generateEvents( + existing, + mockDiscoveredTargetList(0), + ) + + if len(events) != len(existing) { + t.Fatalf("expected %d events, got %d", len(existing), len(events)) + } + + for _, event := range events { + if event.Event != core.EventDelete { + t.Fatalf( + "expected all events to be %s, got %s", + core.EventDelete.String(), + event.Event.String(), + ) + } + } +} + +func TestGenerateEvents_GeneratesDeleteThenApplyEvents(t *testing.T) { + existing := mockGnmicTargetList(5) + discovered := mockDiscoveredTargetList(3) + + events := generateEvents(existing, discovered) + + var ( + numDelete int + numApply int + seenApply bool + ) + + for _, event := range events { + switch event.Event { + case core.EventDelete: + if seenApply { + t.Fatalf("expected delete events before apply events") + } + numDelete++ + + case core.EventApply: + seenApply = true + numApply++ + } + } + + if numDelete != 2 { + t.Fatalf("expected 2 delete events, got %d", numDelete) + } + + if numApply != 3 { + t.Fatalf("expected 3 apply events, got %d", numApply) + } +} + +func TestGenerateEvents_OnlyApplyEventsAreGeneratedForNewTargets(t *testing.T) { + existing := mockGnmicTargetList(3) + discovered := mockDiscoveredTargetList(5) + + events := generateEvents(existing, discovered) + + var ( + numDelete int + numApply int + ) + + for _, event := range events { + switch event.Event { + case core.EventDelete: + numDelete++ + + case core.EventApply: + numApply++ + } + } + + if numDelete != 0 { + t.Fatalf("expected 0 delete events, got %d", numDelete) + } + + if numApply != 5 { + t.Fatalf("expected 5 apply events, got %d", numApply) + } +} + +func TestGenerateEvents_NonOverlappingListsGenerateDeleteAndApplyEvents(t *testing.T) { + existing := mockGnmicTargetList(5) + + discovered := mockDiscoveredTargetList(10)[5:] + + events := generateEvents(existing, discovered) + + var ( + numDelete int + numApply int + seenApply bool + ) + + for _, event := range events { + switch event.Event { + case core.EventDelete: + if seenApply { + t.Fatalf("expected delete events before apply events") + } + numDelete++ + + case core.EventApply: + seenApply = true + numApply++ + } + } + + if numDelete != 5 { + t.Fatalf("expected 5 delete events, got %d", numDelete) + } + + if numApply != 5 { + t.Fatalf("expected 5 apply events, got %d", numApply) + } +} + +func TestGenerateTargetResource_SetsTargetSourceNameLabel(t *testing.T) { + ts := mockTargetSource() + d := mockDiscoveryTarget() + + target := generateTargetResource(d, &ts) + + if got := target.Labels[LabelTargetSourceName]; got != ts.Name { + t.Fatalf( + "expected %s=%q, got %q", + LabelTargetSourceName, + ts.Name, + got, + ) + } +} + +func TestGenerateTargetResource_CopiesDiscoveredLabels(t *testing.T) { + d := mockDiscoveryTarget( + withDiscoveredTargetLabels(map[string]string{ + "discoveredLabel1": "discoveredValue1", + "discoveredLabel2": "discoveredValue2", + }), + ) + + ts := mockTargetSource() + + target := generateTargetResource(d, &ts) + + tests := map[string]string{ + "discoveredLabel1": "discoveredValue1", + "discoveredLabel2": "discoveredValue2", + } + + for k, want := range tests { + if got := target.Labels[k]; got != want { + t.Fatalf("expected label %s=%q, got %q", k, want, got) + } + } +} + +func TestGenerateTargetResource_CopiesTargetSourceLabels(t *testing.T) { + ts := mockTargetSource( + withTargetSourceTargetLabels(map[string]string{ + "targetSourceLabel1": "targetSourceValue1", + "targetSourceLabel2": "targetSourceValue2", + }), + ) + + d := mockDiscoveryTarget() + + target := generateTargetResource(d, &ts) + + tests := map[string]string{ + "targetSourceLabel1": "targetSourceValue1", + "targetSourceLabel2": "targetSourceValue2", + } + + for k, want := range tests { + if got := target.Labels[k]; got != want { + t.Fatalf("expected label %s=%q, got %q", k, want, got) + } + } +} + +func TestGenerateTargetResource_OverridesReservedTargetSourceNameLabel(t *testing.T) { + ts := mockTargetSource( + withTargetSourceTargetLabels(map[string]string{ + LabelTargetSourceName: "wrong-value", + }), + ) + + d := mockDiscoveryTarget( + withDiscoveredTargetLabels(map[string]string{ + LabelTargetSourceName: "another-wrong-value", + }), + ) + + target := generateTargetResource(d, &ts) + + if got := target.Labels[LabelTargetSourceName]; got != ts.Name { + t.Fatalf( + "expected reserved label %s=%q, got %q", + LabelTargetSourceName, + ts.Name, + got, + ) + } +} + +func TestGenerateTargetResource_DiscoveredLabelsOverrideTargetSourceLabels(t *testing.T) { + ts := mockTargetSource( + withTargetSourceTargetLabels(map[string]string{ + "sharedLabel": "targetSourceValue", + }), + ) + + d := mockDiscoveryTarget( + withDiscoveredTargetLabels(map[string]string{ + "sharedLabel": "discoveredValue", + }), + ) + + target := generateTargetResource(d, &ts) + + if got := target.Labels["sharedLabel"]; got != "discoveredValue" { + t.Fatalf( + "expected target source label to override discovered label, got %q", + got, + ) + } +} + +func TestNormalizeTarget_PrefixesTargetName(t *testing.T) { + target := mockDiscoveryTarget( + withDiscoveredTargetName("router1"), + ) + + normalized := normalizeTarget(target, "ts1") + + if got := normalized.Name; got != "ts1-router1" { + t.Fatalf( + "expected normalized name %q, got %q", + "ts1-router1", + got, + ) + } +} + +func TestNormalizeTarget_PreservesTargetAddress(t *testing.T) { + target := mockDiscoveryTarget( + withDiscoveredTargetAddress("192.168.1.10"), + ) + + normalized := normalizeTarget(target, "ts1") + + if got := normalized.Address; got != "192.168.1.10" { + t.Fatalf( + "expected address %q, got %q", + "192.168.1.10", + got, + ) + } +} + +func TestNormalizeTarget_PreservesTargetLabels(t *testing.T) { + labels := map[string]string{ + "env": "prod", + "role": "leaf", + } + + target := mockDiscoveryTarget( + withDiscoveredTargetLabels(labels), + ) + + normalized := normalizeTarget(target, "ts1") + + for k, want := range labels { + if got := normalized.Labels[k]; got != want { + t.Fatalf( + "expected label %s=%q, got %q", + k, + want, + got, + ) + } + } +} diff --git a/internal/controller/discovery/message_processor.go b/internal/controller/discovery/message_processor.go index f7aafb1..f573b1b 100644 --- a/internal/controller/discovery/message_processor.go +++ b/internal/controller/discovery/message_processor.go @@ -30,6 +30,7 @@ type MessageProcessor struct { activeSnapshot *snapshotBuffer // Events are deferred while snapshot is in progress deferredEvents []core.DiscoveryEvent + targetCount int32 } // NewMessageProcessor wires a MessageProcessor instance @@ -53,6 +54,13 @@ func (m *MessageProcessor) Run(ctx context.Context) error { logger.Info("Message processor started") + // Update internal counter in case of a process restart + if existing, err := fetchExistingTargets(ctx, m.client, m.targetSource); err != nil { + logger.Error(err, "error fetching existing targets") + } else { + m.targetCount = int32(len(existing)) + } + for { select { case batch, ok := <-m.in: @@ -90,6 +98,7 @@ func (m *MessageProcessor) Run(ctx context.Context) error { } } +// processMessage handles all of the incoming messages from the channel func (m *MessageProcessor) processMessage(ctx context.Context, message core.DiscoveryMessage, logger logr.Logger) error { if err := ctx.Err(); err != nil { return err @@ -105,6 +114,11 @@ func (m *MessageProcessor) processMessage(ctx context.Context, message core.Disc "chunkIndex", msg.ChunkIndex, "targets", len(msg.Targets), ) + + for i := range msg.Targets { + msg.Targets[i] = normalizeTarget(msg.Targets[i], m.targetSource.Name) + } + return m.processSnapshot(ctx, msg, logger) case core.DiscoveryEvent: @@ -114,6 +128,8 @@ func (m *MessageProcessor) processMessage(ctx context.Context, message core.Disc "event", msg.Event, "target", msg.Target.Name, ) + + msg.Target = normalizeTarget(msg.Target, m.targetSource.Name) return m.processEvent(ctx, msg, logger) default: @@ -204,6 +220,31 @@ func (m *MessageProcessor) collectSnapshot(ctx context.Context, chunk core.Disco return nil } +// processEvent handles a single DiscoveryEvent message. If a snapshot is in the queue, the events get deferred and applied after. +func (m *MessageProcessor) processEvent(ctx context.Context, event core.DiscoveryEvent, logger logr.Logger) error { + // If snapshot collecting is active defer events + if m.activeSnapshot != nil { + m.deferredEvents = append(m.deferredEvents, event) + return nil + } + + // Apply events + err := m.applyEvent(ctx, event, logger) + if err == nil { + switch event.Event { + case core.EventApply: + m.targetCount++ + m.updateStatus(ctx, logger) + case core.EventDelete: + m.targetCount-- + m.updateStatus(ctx, logger) + } + } + + return err +} + +// applySnapshot is in charge of getting the Events for the discovered targets and applying them through applyEvent func (m *MessageProcessor) applySnapshot(ctx context.Context, snapshot *snapshotBuffer, logger logr.Logger) error { select { case <-ctx.Done(): @@ -240,8 +281,37 @@ func (m *MessageProcessor) applySnapshot(ctx context.Context, snapshot *snapshot "targets", len(allTargets), ) - // todo: apply all targets - // a.applyTargets + existing, err := fetchExistingTargets(ctx, m.client, m.targetSource) + if err != nil { + logger.Error(err, "error fetching existing targets") + } else { + logger.Info("fetched existing targets", + "numOfTargets", len(existing), + ) + } + + events := generateEvents(existing, allTargets) + + nApply := 0 + nDelete := 0 + + for _, e := range events { + switch e.Event { + case core.EventApply: + nApply++ + case core.EventDelete: + nDelete++ + } + } + + logger.Info("generated events", + "numOfApply", nApply, + "numOfDelete", nDelete, + ) + + for _, e := range events { + m.applyEvent(ctx, e, logger) + } // Replay deferred events for _, event := range m.deferredEvents { @@ -250,47 +320,61 @@ func (m *MessageProcessor) applySnapshot(ctx context.Context, snapshot *snapshot return nil default: } - if err := m.applyEvent(ctx, event, logger); err != nil { + if err := m.processEvent(ctx, event, logger); err != nil { return err } } + // Because of idempotency, allTargets = desired state = targets existing in Kubernetes. Overwrites the counter to "reset" it. + m.targetCount = int32(len(allTargets)) + m.updateStatus(ctx, logger) + m.resetSnapshot() m.deferredEvents = nil return nil } -func (m *MessageProcessor) processEvent(ctx context.Context, event core.DiscoveryEvent, logger logr.Logger) error { - // If snapshot collecting is active defer events - if m.activeSnapshot != nil { - m.deferredEvents = append(m.deferredEvents, event) - return nil - } - - // Apply events - return m.applyEvent(ctx, event, logger) -} - func (m *MessageProcessor) applyEvent(ctx context.Context, event core.DiscoveryEvent, logger logr.Logger) error { switch event.Event { case core.EventDelete: - logger.Info( - "Deleting Target", - "target", event.Target.Name, - "targetsource", m.targetSource.Name, - ) + if err := deleteTarget(ctx, m.client, event.Target.Name, m.targetSource.Namespace); err != nil { + logger.Error(err, "error deleting target", + "targetName", event.Target.Name, + ) + return err + } else { + logger.Info("deleted target object", + "name", event.Target.Name, + ) + } case core.EventApply: - logger.Info( - "Applying Target", - "target", event.Target.Name, - "address", event.Target.Address, - "labels", event.Target.Labels, - "targetsource", m.targetSource.Name, - ) + target := generateTargetResource(event.Target, m.targetSource) + + if err := applyTarget(ctx, m.client, m.scheme, target, m.targetSource); err != nil { + logger.Error(err, "error applying target", + "targetName", event.Target.Name, + ) + return err + } else { + logger.Info("applied target object", + "name", event.Target.Name, + ) + } } + return nil } +func (m *MessageProcessor) updateStatus(ctx context.Context, logger logr.Logger) { + if err := updateTargetSourceStatus(ctx, m.client, m.targetSource, m.targetCount); err != nil { + logger.Error(err, "error updating TargetSource status") + } else { + logger.Info("updated target source status", + "targetCount", m.targetCount, + ) + } +} + func (m *MessageProcessor) resetSnapshot() { m.activeSnapshot = nil } diff --git a/internal/controller/targetsource_controller.go b/internal/controller/targetsource_controller.go index 522aabd..7f30fc8 100644 --- a/internal/controller/targetsource_controller.go +++ b/internal/controller/targetsource_controller.go @@ -23,9 +23,11 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/predicate" gnmicv1alpha1 "github.com/gnmic/operator/api/v1alpha1" "github.com/gnmic/operator/internal/controller/discovery" @@ -90,14 +92,23 @@ func (r *TargetSourceReconciler) Reconcile(ctx context.Context, req ctrl.Request } if r.DiscoveryRegistry.Exists(req.NamespacedName) { - logger.Info("Discovery runtime already running; reconciliation completed") - return ctrl.Result{}, nil + if targetSource.Generation != targetSource.Status.ObservedGeneration { + return r.reconcileDeletion(ctx, req.NamespacedName, targetSource) + } else { + logger.Info("Discovery runtime already running; reconciliation completed") + return ctrl.Result{}, nil + } } if err := r.startDiscovery(req.NamespacedName, targetSource, logger); err != nil { return ctrl.Result{}, err } + targetSource.Status.ObservedGeneration = targetSource.Generation + if err := r.Status().Update(ctx, targetSource); err != nil { + return ctrl.Result{}, err + } + logger.Info("Started discovery runtime") return ctrl.Result{}, nil } @@ -230,7 +241,10 @@ func (r *TargetSourceReconciler) startDiscovery( // SetupWithManager sets up the controller with the Manager. func (r *TargetSourceReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). - For(&gnmicv1alpha1.TargetSource{}). + For( + &gnmicv1alpha1.TargetSource{}, + builder.WithPredicates(predicate.GenerationChangedPredicate{}), + ). Named("targetsource"). Complete(r) }