diff --git a/.gitignore b/.gitignore index 5b57e2d..5e46985 100644 --- a/.gitignore +++ b/.gitignore @@ -34,3 +34,10 @@ docs/public docs/resources/_gen/ docs/.hugo_build.lock test/integration/**/clab-* + + +# Only for development and testing purposes +# To be removed after development of targetsource +# ignored in order to not add unnecassary logging messages +lab/dev/resources/targetsources +.scannerwork/ \ No newline at end of file diff --git a/api/v1alpha1/targetsource_types.go b/api/v1alpha1/targetsource_types.go index dd6fb59..d530033 100644 --- a/api/v1alpha1/targetsource_types.go +++ b/api/v1alpha1/targetsource_types.go @@ -17,37 +17,332 @@ limitations under the License. package v1alpha1 import ( + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) // TargetSourceSpec defines the desired state of TargetSource // +kubebuilder:validation:Required type TargetSourceSpec struct { + // Provider defines the source of targets for this TargetSource + // Only one provider can be specified per TargetSource + // +kubebuilder:validation:Required Provider *ProviderSpec `json:"provider"` + // TODO: implement in message processor + // Optional port to use for discovered targets if not specified by the provider + // +kubebuilder:validation:Optional + TargetPort int32 `json:"targetPort,omitempty"` + + // Optional labels to apply to all targets discovered by this TargetSource // +kubebuilder:validation:Optional TargetLabels map[string]string `json:"targetLabels,omitempty"` + // The TargetProfile to use for targets discovered by this TargetSource + // +kubebuilder:validation:Required // +kubebuilder:validation:MinLength=1 TargetProfile string `json:"targetProfile"` } -// +kubebuilder:validation:ExactlyOneOf=http;consul +// ProviderSpec defines the source of targets for a TargetSource +// Only one provider can be specified per TargetSource +// +kubebuilder:validation:ExactlyOneOf=http type ProviderSpec struct { - HTTP *HTTPConfig `json:"http,omitempty"` - Consul *ConsulConfig `json:"consul,omitempty"` + // HTTP defines the configuration for a HTTP provider + HTTP *HTTPConfig `json:"http,omitempty"` } +// HTTPConfig defines the configuration for the HTTP provider +// +kubebuilder:validation:AtLeastOneOf:=url;push type HTTPConfig struct { + // URL of the HTTP endpoint to pull targets from + // If defined, the loader will periodically poll this endpoint for targets + // +kubebuilder:validation:Optional + URL string `json:"url,omitempty"` + + // HTTP method used for the request. + // + // Defaults to GET if not specified. + // + // Supported values: + // - GET (default, no request body) + // - POST (supports request body) + // + // +kubebuilder:validation:Enum=GET;POST + // +kubebuilder:validation:Optional + Method string `json:"method,omitempty"` + + // Optional HTTP headers to include in the request. + // + // These map directly to HTTP headers (key-value pairs). + // + // Example: + // headers: + // Content-Type: application/json + // X-Custom-Header: value + // + // Precedence: + // - Authorization configuration overrides any conflicting headers + // + // +kubebuilder:validation:Optional + Headers map[string]string `json:"headers,omitempty"` + + // Optional raw request body. + // + // Typically used with POST requests and contains JSON payload. + // + // Example: + // body: | + // { + // "limit": 100, + // "status": "active" + // } + // + // Notes: + // - Ignored for GET requests + // - User must set appropriate Content-Type header if needed + // + // +kubebuilder:validation:Optional + Body string `json:"body,omitempty"` + + // Optional authorization configuration for accessing the HTTP endpoint + // +kubebuilder:validation:Optional + Authorization *AuthorizationSpec `json:"authorization,omitempty"` + + // Optional interval for polling the HTTP endpoint for targets + // TODO: document about default value + // +kubebuilder:default="6h" + // +kubebuilder:validation:Optional + Interval *metav1.Duration `json:"interval,omitempty"` + + // Optional timeout for HTTP requests to the endpoint + // +kubebuilder:default="10s" + // +kubebuilder:validation:Optional + Timeout *metav1.Duration `json:"timeout,omitempty"` + + // Optional TLS configuration for connecting to the HTTP endpoint + // If it is an HTTP endpoint, this will be ignored + // +kubebuilder:validation:Optional + TLS *ClientTLSConfig `json:"tls,omitempty"` + + // Optional pagination configuration for parsing responses from the HTTP endpoint + // +kubebuilder:validation:Optional + Pagination *PaginationSpec `json:"pagination,omitempty"` + + // Optional mapping configuration for parsing responses from the HTTP endpoint + // +kubebuilder:validation:Optional + ResponseMapping *ResponseMappingSpec `json:"mapping,omitempty"` + + // Optional configuration to enable push + // +kubebuilder:validation:Optional + Push *PushSpec `json:"push,omitempty"` +} + +type ClientTLSConfig struct { + // Skip TLS verification of the Provider's certificate. + // +kubebuilder:default:=false + InsecureSkipVerify bool `json:"insecureSkipVerify,omitempty"` + + // Reference to a ConfigMap containing a bundle of PEM-encoded CAs to use when + // verifying the certificate chain presented by the Provider when using HTTPS. + // Mutually exclusive with CABundle. + // +kubebuilder:validation:Optional + CABundleRef *corev1.ConfigMapKeySelector `json:"caBundleRef,omitempty"` +} + +// AuthorizationSpec defines the configuration for authentication +// +kubebuilder:validation:ExactlyOneOf=basic;token +type AuthorizationSpec struct { + // Basic authentication configuration + Basic *BasicAuthSpec `json:"basic,omitempty"` + // Token-based authentication configuration + Token *TokenAuthSpec `json:"token,omitempty"` +} + +// BasicAuthSpec defines the configuration for basic authentication +type BasicAuthSpec struct { + // Reference to a Secret containing "username" and "password" keys to use for + // basic authentication when connecting to the Provider. + // +kubebuilder:validation:Required + CredentialsSecretRef *corev1.SecretKeySelector `json:"credentialsSecretRef"` +} + +// TokenAuthSpec defines the configuration for token-based authentication +type TokenAuthSpec struct { + // Scheme for the token, e.g. "Bearer" // +kubebuilder:validation:MinLength=1 - URL string `json:"url"` + Scheme string `json:"scheme"` + // Reference to a Secret containing a key with the token value to use for + // authentication when connecting to the Provider. + // +kubebuilder:validation:Required + TokenSecretRef *corev1.SecretKeySelector `json:"tokenSecretRef,omitempty"` +} + +// PaginationSpec defines the configuration for paginating through responses from providers +type PaginationSpec struct { + // Field name in the JSON response that contains the next page reference. + // The value can be either: + // - a full URL (used directly for the next request), or + // - a pagination token (appended as a query parameter using this field name as the key). + // + // Must refer to a top-level key in the response object. + // Example: "next" or "nextToken" + NextField string `json:"nextField,omitempty"` +} + +// ResponseMappingSpec controls how targets are extracted from an HTTP JSON response. +// +// This allows you to map fields from a JSON API into targets using either: +// - simple direct field access (e.g. item["name"]) +// - or CEL expressions for more advanced logic +// +// General behavior: +// +// 1. Selecting targets: +// - `targetsField` is a CEL expression that selects the list of targets +// - It runs once on the full response (`self`) and MUST return a list +// - If not set, the response itself must be a JSON array +// +// 2. Extracting fields: +// - Each field (name, address, port, labels, etc.) is handled independently +// - If a CEL expression is provided → it is evaluated +// - If not provided → the value is read directly from the target object +// +// 3. Available variables in CEL: +// - item -> the current target object +// - self -> the full HTTP response JSON +// +// Example: +// +// Response: +// { +// "results": [ +// { "name": "device1", "ip": "10.0.0.1", "env": "prod" } +// ], +// "meta": { "region": "eu-west" } +// } +// +// Mapping: +// targetsField: "self.results" +// +// name: "" # direct → item["name"] +// address: "item.ip" # CEL +// +// labels: +// env: "item.env" +// region: "self.meta.region" +type ResponseMappingSpec struct { + // CEL expression that selects the list of target objects from the response. + // + // This is evaluated once using: + // self -> full JSON response + // + // Example: + // targetsField: "self.results" + // + // If not set, the response itself must be a JSON array with the targets. + // + // +kubebuilder:validation:Optional + TargetsField string `json:"targetsField,omitempty"` + + // CEL expression for the target name. + // + // If not set, defaults to: + // item["name"] + // + // Example: + // "item.hostname" + // + // +kubebuilder:validation:Optional + Name string `json:"name,omitempty"` + + // CEL expression for the target address. + // + // If not set, defaults to: + // item["address"] + // + // Example: + // "item.ip" + // + // +kubebuilder:validation:Optional + Address string `json:"address,omitempty"` + + // CEL expression for the target port. + // + // If not set, defaults to: + // item["port"] + // + // Example: + // "item.port" + // + // +kubebuilder:validation:Optional + Port string `json:"port,omitempty"` + + // CEL expression that returns a map of labels. + // The expression must evaluate to an object (map). + // + // Example: + // + // labels: | + // { + // "env": item.environment, + // "region": self.meta.region, + // item.dynamicKey: "value" + // } + // + // If not set, defaults to: + // item["labels"] + // + // The resulting map will be converted into labels. + // The extracted labels will be merged with the static TargetLabels defined in the TargetSourceSpec, + // with values from the response taking precedence in case of conflicts. + // // +kubebuilder:validation:Optional - AcceptPush bool `json:"acceptPush,omitempty"` + Labels string `json:"labels,omitempty"` + + // CEL expression for the target profile. + // + // If not set, defaults to: + // item["targetProfile"] + // + // Example: + // "item.type == 'edge' ? 'edge-profile' : 'default'" + // + // +kubebuilder:validation:Optional + TargetProfile string `json:"targetProfile,omitempty"` +} + +// PushSpec defines the settings for event-based update mechanism (i.e. webhooks sent from the server) +type PushSpec struct { + // +kubebuilder:default=false + Enabled bool `json:"enabled"` + + // +kubebuilder:validation:Optional + Auth *PushAuthSpec `json:"auth,omitempty"` +} + +// +kubebuilder:validation:ExactlyOneOf:=bearer;signature +type PushAuthSpec struct { + Bearer *PushBearerAuthSpec `json:"bearer,omitempty"` + Signature *PushSignatureAuthSpec `json:"signature,omitempty"` } -type ConsulConfig struct { +// +kubebuilder:validation:Required +type PushBearerAuthSpec struct { + TokenSecretRef *corev1.SecretKeySelector `json:"tokenSecretRef,omitempty"` +} + +// +kubebuilder:validation:Required +type PushSignatureAuthSpec struct { + SecretRef *corev1.SecretKeySelector `json:"secretRef"` + + // Header containing the signature // +kubebuilder:validation:MinLength=1 - URL string `json:"url,omitempty"` + Header string `json:"header"` + + // +kubebuilder:default="sha512" + // +kubebuilder:validation:Enum=sha1;sha256;sha512 + Algorithm string `json:"algorithm"` } // TargetSourceStatus defines the observed state of TargetSource diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index 61e81fd..1628621 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -46,6 +46,71 @@ func (in *APIConfig) DeepCopy() *APIConfig { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *AuthorizationSpec) DeepCopyInto(out *AuthorizationSpec) { + *out = *in + if in.Basic != nil { + in, out := &in.Basic, &out.Basic + *out = new(BasicAuthSpec) + (*in).DeepCopyInto(*out) + } + if in.Token != nil { + in, out := &in.Token, &out.Token + *out = new(TokenAuthSpec) + (*in).DeepCopyInto(*out) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new AuthorizationSpec. +func (in *AuthorizationSpec) DeepCopy() *AuthorizationSpec { + if in == nil { + return nil + } + out := new(AuthorizationSpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *BasicAuthSpec) DeepCopyInto(out *BasicAuthSpec) { + *out = *in + if in.CredentialsSecretRef != nil { + in, out := &in.CredentialsSecretRef, &out.CredentialsSecretRef + *out = new(v1.SecretKeySelector) + (*in).DeepCopyInto(*out) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new BasicAuthSpec. +func (in *BasicAuthSpec) DeepCopy() *BasicAuthSpec { + if in == nil { + return nil + } + out := new(BasicAuthSpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ClientTLSConfig) DeepCopyInto(out *ClientTLSConfig) { + *out = *in + if in.CABundleRef != nil { + in, out := &in.CABundleRef, &out.CABundleRef + *out = new(v1.ConfigMapKeySelector) + (*in).DeepCopyInto(*out) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ClientTLSConfig. +func (in *ClientTLSConfig) DeepCopy() *ClientTLSConfig { + if in == nil { + return nil + } + out := new(ClientTLSConfig) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Cluster) DeepCopyInto(out *Cluster) { *out = *in @@ -213,21 +278,6 @@ func (in *ClusterTargetState) DeepCopy() *ClusterTargetState { return out } -// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *ConsulConfig) DeepCopyInto(out *ConsulConfig) { - *out = *in -} - -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ConsulConfig. -func (in *ConsulConfig) DeepCopy() *ConsulConfig { - if in == nil { - return nil - } - out := new(ConsulConfig) - in.DeepCopyInto(out) - return out -} - // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *GRPCKeepAliveConfig) DeepCopyInto(out *GRPCKeepAliveConfig) { *out = *in @@ -273,6 +323,48 @@ func (in *GRPCTunnelConfig) DeepCopy() *GRPCTunnelConfig { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *HTTPConfig) DeepCopyInto(out *HTTPConfig) { *out = *in + if in.Headers != nil { + in, out := &in.Headers, &out.Headers + *out = make(map[string]string, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } + if in.Authorization != nil { + in, out := &in.Authorization, &out.Authorization + *out = new(AuthorizationSpec) + (*in).DeepCopyInto(*out) + } + if in.Interval != nil { + in, out := &in.Interval, &out.Interval + *out = new(metav1.Duration) + **out = **in + } + if in.Timeout != nil { + in, out := &in.Timeout, &out.Timeout + *out = new(metav1.Duration) + **out = **in + } + if in.TLS != nil { + in, out := &in.TLS, &out.TLS + *out = new(ClientTLSConfig) + (*in).DeepCopyInto(*out) + } + if in.Pagination != nil { + in, out := &in.Pagination, &out.Pagination + *out = new(PaginationSpec) + **out = **in + } + if in.ResponseMapping != nil { + in, out := &in.ResponseMapping, &out.ResponseMapping + *out = new(ResponseMappingSpec) + **out = **in + } + if in.Push != nil { + in, out := &in.Push, &out.Push + *out = new(PushSpec) + (*in).DeepCopyInto(*out) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new HTTPConfig. @@ -587,6 +679,21 @@ func (in *OutputStatus) DeepCopy() *OutputStatus { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *PaginationSpec) DeepCopyInto(out *PaginationSpec) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PaginationSpec. +func (in *PaginationSpec) DeepCopy() *PaginationSpec { + if in == nil { + return nil + } + out := new(PaginationSpec) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Pipeline) DeepCopyInto(out *Pipeline) { *out = *in @@ -824,12 +931,7 @@ func (in *ProviderSpec) DeepCopyInto(out *ProviderSpec) { if in.HTTP != nil { in, out := &in.HTTP, &out.HTTP *out = new(HTTPConfig) - **out = **in - } - if in.Consul != nil { - in, out := &in.Consul, &out.Consul - *out = new(ConsulConfig) - **out = **in + (*in).DeepCopyInto(*out) } } @@ -843,6 +945,106 @@ func (in *ProviderSpec) DeepCopy() *ProviderSpec { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *PushAuthSpec) DeepCopyInto(out *PushAuthSpec) { + *out = *in + if in.Bearer != nil { + in, out := &in.Bearer, &out.Bearer + *out = new(PushBearerAuthSpec) + (*in).DeepCopyInto(*out) + } + if in.Signature != nil { + in, out := &in.Signature, &out.Signature + *out = new(PushSignatureAuthSpec) + (*in).DeepCopyInto(*out) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PushAuthSpec. +func (in *PushAuthSpec) DeepCopy() *PushAuthSpec { + if in == nil { + return nil + } + out := new(PushAuthSpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *PushBearerAuthSpec) DeepCopyInto(out *PushBearerAuthSpec) { + *out = *in + if in.TokenSecretRef != nil { + in, out := &in.TokenSecretRef, &out.TokenSecretRef + *out = new(v1.SecretKeySelector) + (*in).DeepCopyInto(*out) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PushBearerAuthSpec. +func (in *PushBearerAuthSpec) DeepCopy() *PushBearerAuthSpec { + if in == nil { + return nil + } + out := new(PushBearerAuthSpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *PushSignatureAuthSpec) DeepCopyInto(out *PushSignatureAuthSpec) { + *out = *in + if in.SecretRef != nil { + in, out := &in.SecretRef, &out.SecretRef + *out = new(v1.SecretKeySelector) + (*in).DeepCopyInto(*out) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PushSignatureAuthSpec. +func (in *PushSignatureAuthSpec) DeepCopy() *PushSignatureAuthSpec { + if in == nil { + return nil + } + out := new(PushSignatureAuthSpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *PushSpec) DeepCopyInto(out *PushSpec) { + *out = *in + if in.Auth != nil { + in, out := &in.Auth, &out.Auth + *out = new(PushAuthSpec) + (*in).DeepCopyInto(*out) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PushSpec. +func (in *PushSpec) DeepCopy() *PushSpec { + if in == nil { + return nil + } + out := new(PushSpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ResponseMappingSpec) DeepCopyInto(out *ResponseMappingSpec) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ResponseMappingSpec. +func (in *ResponseMappingSpec) DeepCopy() *ResponseMappingSpec { + if in == nil { + return nil + } + out := new(ResponseMappingSpec) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ServiceConfig) DeepCopyInto(out *ServiceConfig) { *out = *in @@ -1384,6 +1586,26 @@ func (in *TargetTLSConfig) DeepCopy() *TargetTLSConfig { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *TokenAuthSpec) DeepCopyInto(out *TokenAuthSpec) { + *out = *in + if in.TokenSecretRef != nil { + in, out := &in.TokenSecretRef, &out.TokenSecretRef + *out = new(v1.SecretKeySelector) + (*in).DeepCopyInto(*out) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TokenAuthSpec. +func (in *TokenAuthSpec) DeepCopy() *TokenAuthSpec { + if in == nil { + return nil + } + out := new(TokenAuthSpec) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *TunnelTargetPolicy) DeepCopyInto(out *TunnelTargetPolicy) { *out = *in diff --git a/config/crd/bases/operator.gnmic.dev_targetsources.yaml b/config/crd/bases/operator.gnmic.dev_targetsources.yaml index b8d24e1..4fc6c32 100644 --- a/config/crd/bases/operator.gnmic.dev_targetsources.yaml +++ b/config/crd/bases/operator.gnmic.dev_targetsources.yaml @@ -40,33 +40,392 @@ spec: description: TargetSourceSpec defines the desired state of TargetSource properties: provider: + description: |- + Provider defines the source of targets for this TargetSource + Only one provider can be specified per TargetSource properties: - consul: - properties: - url: - minLength: 1 - type: string - type: object http: + description: HTTP defines the configuration for a HTTP provider properties: - acceptPush: - type: boolean + authorization: + description: Optional authorization configuration for accessing + the HTTP endpoint + properties: + basic: + description: Basic authentication configuration + properties: + credentialsSecretRef: + description: |- + Reference to a Secret containing "username" and "password" keys to use for + basic authentication when connecting to the Provider. + properties: + key: + description: The key of the secret to select from. Must + be a valid secret key. + type: string + name: + default: "" + description: |- + Name of the referent. + This field is effectively required, but due to backwards compatibility is + allowed to be empty. Instances of this type with an empty value here are + almost certainly wrong. + More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names + type: string + optional: + description: Specify whether the Secret or its + key must be defined + type: boolean + required: + - key + type: object + x-kubernetes-map-type: atomic + required: + - credentialsSecretRef + type: object + token: + description: Token-based authentication configuration + properties: + scheme: + description: Scheme for the token, e.g. "Bearer" + minLength: 1 + type: string + tokenSecretRef: + description: |- + Reference to a Secret containing a key with the token value to use for + authentication when connecting to the Provider. + properties: + key: + description: The key of the secret to select from. Must + be a valid secret key. + type: string + name: + default: "" + description: |- + Name of the referent. + This field is effectively required, but due to backwards compatibility is + allowed to be empty. Instances of this type with an empty value here are + almost certainly wrong. + More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names + type: string + optional: + description: Specify whether the Secret or its + key must be defined + type: boolean + required: + - key + type: object + x-kubernetes-map-type: atomic + required: + - scheme + - tokenSecretRef + type: object + type: object + x-kubernetes-validations: + - message: exactly one of the fields in [basic token] must + be set + rule: '[has(self.basic),has(self.token)].filter(x,x==true).size() + == 1' + body: + description: |- + Optional raw request body. + + Typically used with POST requests and contains JSON payload. + + Example: + body: | + { + "limit": 100, + "status": "active" + } + + Notes: + - Ignored for GET requests + - User must set appropriate Content-Type header if needed + type: string + headers: + additionalProperties: + type: string + description: |- + Optional HTTP headers to include in the request. + + These map directly to HTTP headers (key-value pairs). + + Example: + headers: + Content-Type: application/json + X-Custom-Header: value + + Precedence: + - Authorization configuration overrides any conflicting headers + type: object + interval: + default: 6h + description: Optional interval for polling the HTTP endpoint + for targets + type: string + mapping: + description: Optional mapping configuration for parsing responses + from the HTTP endpoint + properties: + address: + description: |- + CEL expression for the target address. + + If not set, defaults to: + item["address"] + + Example: + "item.ip" + type: string + labels: + description: |- + CEL expression that returns a map of labels. + The expression must evaluate to an object (map). + + Example: + + labels: | + { + "env": item.environment, + "region": self.meta.region, + item.dynamicKey: "value" + } + + If not set, defaults to: + item["labels"] + + The resulting map will be converted into labels. + The extracted labels will be merged with the static TargetLabels defined in the TargetSourceSpec, + with values from the response taking precedence in case of conflicts. + type: string + name: + description: |- + CEL expression for the target name. + + If not set, defaults to: + item["name"] + + Example: + "item.hostname" + type: string + port: + description: |- + CEL expression for the target port. + + If not set, defaults to: + item["port"] + + Example: + "item.port" + type: string + targetProfile: + description: |- + CEL expression for the target profile. + + If not set, defaults to: + item["targetProfile"] + + Example: + "item.type == 'edge' ? 'edge-profile' : 'default'" + type: string + targetsField: + description: |- + CEL expression that selects the list of target objects from the response. + + This is evaluated once using: + self -> full JSON response + + Example: + targetsField: "self.results" + + If not set, the response itself must be a JSON array with the targets. + type: string + type: object + method: + description: |- + HTTP method used for the request. + + Defaults to GET if not specified. + + Supported values: + - GET (default, no request body) + - POST (supports request body) + enum: + - GET + - POST + type: string + pagination: + description: Optional pagination configuration for parsing + responses from the HTTP endpoint + properties: + nextField: + description: |- + Field name in the JSON response that contains the next page reference. + The value can be either: + - a full URL (used directly for the next request), or + - a pagination token (appended as a query parameter using this field name as the key). + + Must refer to a top-level key in the response object. + Example: "next" or "nextToken" + type: string + type: object + push: + description: Optional configuration to enable push + properties: + auth: + properties: + bearer: + properties: + tokenSecretRef: + description: SecretKeySelector selects a key of + a Secret. + properties: + key: + description: The key of the secret to select + from. Must be a valid secret key. + type: string + name: + default: "" + description: |- + Name of the referent. + This field is effectively required, but due to backwards compatibility is + allowed to be empty. Instances of this type with an empty value here are + almost certainly wrong. + More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names + type: string + optional: + description: Specify whether the Secret or + its key must be defined + type: boolean + required: + - key + type: object + x-kubernetes-map-type: atomic + type: object + signature: + properties: + algorithm: + default: sha512 + enum: + - sha1 + - sha256 + - sha512 + type: string + header: + description: Header containing the signature + minLength: 1 + type: string + secretRef: + description: SecretKeySelector selects a key of + a Secret. + properties: + key: + description: The key of the secret to select + from. Must be a valid secret key. + type: string + name: + default: "" + description: |- + Name of the referent. + This field is effectively required, but due to backwards compatibility is + allowed to be empty. Instances of this type with an empty value here are + almost certainly wrong. + More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names + type: string + optional: + description: Specify whether the Secret or + its key must be defined + type: boolean + required: + - key + type: object + x-kubernetes-map-type: atomic + required: + - algorithm + - header + - secretRef + type: object + type: object + x-kubernetes-validations: + - message: exactly one of the fields in [bearer signature] + must be set + rule: '[has(self.bearer),has(self.signature)].filter(x,x==true).size() + == 1' + enabled: + default: false + type: boolean + required: + - enabled + type: object + timeout: + default: 10s + description: Optional timeout for HTTP requests to the endpoint + type: string + tls: + description: |- + Optional TLS configuration for connecting to the HTTP endpoint + If it is an HTTP endpoint, this will be ignored + properties: + caBundleRef: + description: |- + Reference to a ConfigMap containing a bundle of PEM-encoded CAs to use when + verifying the certificate chain presented by the Provider when using HTTPS. + Mutually exclusive with CABundle. + properties: + key: + description: The key to select. + type: string + name: + default: "" + description: |- + Name of the referent. + This field is effectively required, but due to backwards compatibility is + allowed to be empty. Instances of this type with an empty value here are + almost certainly wrong. + More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names + type: string + optional: + description: Specify whether the ConfigMap or its + key must be defined + type: boolean + required: + - key + type: object + x-kubernetes-map-type: atomic + insecureSkipVerify: + default: false + description: Skip TLS verification of the Provider's certificate. + type: boolean + type: object url: - minLength: 1 + description: |- + URL of the HTTP endpoint to pull targets from + If defined, the loader will periodically poll this endpoint for targets type: string - required: - - url type: object + x-kubernetes-validations: + - message: at least one of the fields in [url push] must be set + rule: '[has(self.url),has(self.push)].filter(x,x==true).size() + >= 1' type: object x-kubernetes-validations: - - message: exactly one of the fields in [http consul] must be set - rule: '[has(self.http),has(self.consul)].filter(x,x==true).size() - == 1' + - message: exactly one of the fields in [http] must be set + rule: '[has(self.http)].filter(x,x==true).size() == 1' targetLabels: additionalProperties: type: string + description: Optional labels to apply to all targets discovered by + this TargetSource type: object + targetPort: + description: Optional port to use for discovered targets if not specified + by the provider + format: int32 + type: integer targetProfile: + description: The TargetProfile to use for targets discovered by this + TargetSource minLength: 1 type: string required: diff --git a/go.mod b/go.mod index 9dc2b78..c08b9b8 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.25.5 require ( github.com/cert-manager/cert-manager v1.19.3 github.com/go-logr/logr v1.4.3 + github.com/google/cel-go v0.28.1 github.com/google/uuid v1.6.0 github.com/onsi/ginkgo/v2 v2.28.3 github.com/onsi/gomega v1.40.0 @@ -19,8 +20,10 @@ require ( ) require ( + cel.dev/expr v0.25.1 // indirect cloud.google.com/go/compute/metadata v0.9.0 // indirect github.com/Masterminds/semver/v3 v3.4.0 // indirect + github.com/antlr4-go/antlr/v4 v4.13.1 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect @@ -63,6 +66,7 @@ require ( go.uber.org/zap v1.27.1 // indirect go.yaml.in/yaml/v2 v2.4.3 // indirect go.yaml.in/yaml/v3 v3.0.4 // indirect + golang.org/x/exp v0.0.0-20250718183923-645b1fa84792 // indirect golang.org/x/mod v0.35.0 // indirect golang.org/x/net v0.53.0 // indirect golang.org/x/oauth2 v0.34.0 // indirect @@ -73,6 +77,7 @@ require ( golang.org/x/time v0.14.0 // indirect golang.org/x/tools v0.44.0 // indirect gomodules.xyz/jsonpatch/v2 v2.5.0 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20251202230838-ff82c1b0f217 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20251222181119-0a764e51fe1b // indirect google.golang.org/grpc v1.79.3 // indirect google.golang.org/protobuf v1.36.11 // indirect diff --git a/go.sum b/go.sum index 45485f1..0a845c4 100644 --- a/go.sum +++ b/go.sum @@ -1,7 +1,11 @@ +cel.dev/expr v0.25.1 h1:1KrZg61W6TWSxuNZ37Xy49ps13NUovb66QLprthtwi4= +cel.dev/expr v0.25.1/go.mod h1:hrXvqGP6G6gyx8UAHSHJ5RGk//1Oj5nXQ2NI02Nrsg4= cloud.google.com/go/compute/metadata v0.9.0 h1:pDUj4QMoPejqq20dK0Pg2N4yG9zIkYGdBtwLoEkH9Zs= cloud.google.com/go/compute/metadata v0.9.0/go.mod h1:E0bWwX5wTnLPedCKqk3pJmVgCBSM6qQI1yTBdEb3C10= github.com/Masterminds/semver/v3 v3.4.0 h1:Zog+i5UMtVoCU8oKka5P7i9q9HgrJeGzI9SA1Xbatp0= github.com/Masterminds/semver/v3 v3.4.0/go.mod h1:4V+yj/TJE1HU9XfppCwVMZq3I84lprf4nC11bSS5beM= +github.com/antlr4-go/antlr/v4 v4.13.1 h1:SqQKkuVZ+zWkMMNkjy5FZe5mr5WURWnlpmOuzYWrPrQ= +github.com/antlr4-go/antlr/v4 v4.13.1/go.mod h1:GKmUxMtwp6ZgGwZSva4eWPC5mS6vUAmOABFgjdkM7Nw= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/cert-manager/cert-manager v1.19.3 h1:3d0Nk/HO3BOmAdBJNaBh+6YgaO3Ciey3xCpOjiX5Obs= @@ -76,6 +80,8 @@ github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/google/btree v1.1.3 h1:CVpQJjYgC4VbzxeGVHfvZrv1ctoYCAI8vbl07Fcxlyg= github.com/google/btree v1.1.3/go.mod h1:qOPhT0dTNdNzV6Z/lhRX0YXUafgPLFUh+gZMl761Gm4= +github.com/google/cel-go v0.28.1 h1:YWIwi77J4xIsYUwAF/iIuS6haffzIHS8yWI8glSbLWM= +github.com/google/cel-go v0.28.1/go.mod h1:X0bD6iVNR8pkROSOoHVdgTkzmRcosof7WQqCD6wcMc8= github.com/google/gnostic-models v0.7.1 h1:SisTfuFKJSKM5CPZkffwi6coztzzeYUhc3v4yxLWH8c= github.com/google/gnostic-models v0.7.1/go.mod h1:whL5G0m6dmc5cPxKc5bdKdEN3UjI7OUGxBlw57miDrQ= github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= @@ -172,6 +178,8 @@ go.yaml.in/yaml/v2 v2.4.3 h1:6gvOSjQoTB3vt1l+CU+tSyi/HOjfOjRLJ4YwYZGwRO0= go.yaml.in/yaml/v2 v2.4.3/go.mod h1:zSxWcmIDjOzPXpjlTTbAsKokqkDNAVtZO0WOMiT90s8= go.yaml.in/yaml/v3 v3.0.4 h1:tfq32ie2Jv2UxXFdLJdh3jXuOzWiL1fo0bu/FbuKpbc= go.yaml.in/yaml/v3 v3.0.4/go.mod h1:DhzuOOF2ATzADvBadXxruRBLzYTpT36CKvDb3+aBEFg= +golang.org/x/exp v0.0.0-20250718183923-645b1fa84792 h1:R9PFI6EUdfVKgwKjZef7QIwGcBKu86OEFpJ9nUEP2l4= +golang.org/x/exp v0.0.0-20250718183923-645b1fa84792/go.mod h1:A+z0yzpGtvnG90cToK5n2tu8UJVP2XUATh+r+sfOOOc= golang.org/x/mod v0.35.0 h1:Ww1D637e6Pg+Zb2KrWfHQUnH2dQRLBQyAtpr/haaJeM= golang.org/x/mod v0.35.0/go.mod h1:+GwiRhIInF8wPm+4AoT6L0FA1QWAad3OMdTRx4tFYlU= golang.org/x/net v0.53.0 h1:d+qAbo5L0orcWAr0a9JweQpjXF19LMXJE8Ey7hwOdUA= @@ -194,6 +202,8 @@ gomodules.xyz/jsonpatch/v2 v2.5.0 h1:JELs8RLM12qJGXU4u/TO3V25KW8GreMKl9pdkk14RM0 gomodules.xyz/jsonpatch/v2 v2.5.0/go.mod h1:AH3dM2RI6uoBZxn3LVrfvJ3E0/9dG4cSrbuBJT4moAY= gonum.org/v1/gonum v0.16.0 h1:5+ul4Swaf3ESvrOnidPp4GZbzf0mxVQpDCYUQE7OJfk= gonum.org/v1/gonum v0.16.0/go.mod h1:fef3am4MQ93R2HHpKnLk4/Tbh/s0+wqD5nfa6Pnwy4E= +google.golang.org/genproto/googleapis/api v0.0.0-20251202230838-ff82c1b0f217 h1:fCvbg86sFXwdrl5LgVcTEvNC+2txB5mgROGmRL5mrls= +google.golang.org/genproto/googleapis/api v0.0.0-20251202230838-ff82c1b0f217/go.mod h1:+rXWjjaukWZun3mLfjmVnQi18E1AsFbDN9QdJ5YXLto= google.golang.org/genproto/googleapis/rpc v0.0.0-20251222181119-0a764e51fe1b h1:Mv8VFug0MP9e5vUxfBcE3vUkV6CImK3cMNMIDFjmzxU= google.golang.org/genproto/googleapis/rpc v0.0.0-20251222181119-0a764e51fe1b/go.mod h1:j9x/tPzZkyxcgEFkiKEEGxfvyumM01BEtsW8xzOahRQ= google.golang.org/grpc v1.79.3 h1:sybAEdRIEtvcD68Gx7dmnwjZKlyfuc61Dyo9pGXXkKE= diff --git a/internal/controller/discovery/client.go b/internal/controller/discovery/client.go index e5cc5ea..74edf29 100644 --- a/internal/controller/discovery/client.go +++ b/internal/controller/discovery/client.go @@ -14,7 +14,12 @@ import ( gnmicv1alpha1 "github.com/gnmic/operator/api/v1alpha1" ) -func fetchExistingTargets(ctx context.Context, c client.Client, ts *gnmicv1alpha1.TargetSource) ([]gnmicv1alpha1.Target, error) { +func fetchExistingTargets( + ctx context.Context, + c client.Client, + ts *gnmicv1alpha1.TargetSource, +) ([]gnmicv1alpha1.Target, error) { + var targetList gnmicv1alpha1.TargetList err := c.List( diff --git a/internal/controller/discovery/core/ressource_fetcher.go b/internal/controller/discovery/core/ressource_fetcher.go new file mode 100644 index 0000000..31a82cf --- /dev/null +++ b/internal/controller/discovery/core/ressource_fetcher.go @@ -0,0 +1,15 @@ +package core + +import ( + "context" + + corev1 "k8s.io/api/core/v1" +) + +// ResourceFetcher provides read-only access to namespaced Secret and +// ConfigMap data for loaders without requiring each loader to carry a +// Kubernetes client. +type ResourceFetcher interface { + GetSecretKey(ctx context.Context, namespace string, selector *corev1.SecretKeySelector) (string, error) + GetConfigMapKey(ctx context.Context, namespace string, selector *corev1.ConfigMapKeySelector) (string, error) +} diff --git a/internal/controller/discovery/core/types.go b/internal/controller/discovery/core/types.go index 5a1c8cf..a9a208f 100644 --- a/internal/controller/discovery/core/types.go +++ b/internal/controller/discovery/core/types.go @@ -19,9 +19,10 @@ type DiscoveryRegistryValue struct { } type CommonLoaderConfig struct { - TargetsourceNN types.NamespacedName - ChunkSize int - AcceptPush bool + TargetsourceNN types.NamespacedName + ChunkSize int + AcceptPush bool + ResourceFetcher ResourceFetcher } // EventAction represents the type of a discovery event @@ -37,9 +38,11 @@ const ( // DiscoveredTarget represents a target discovered from an external source // before it is materialized as a Kubernetes Target CR type DiscoveredTarget struct { - Name string - Address string - Labels map[string]string + Name string + Address string + Port int32 + Labels map[string]string + TargetProfile string } type DiscoveryEvent struct { diff --git a/internal/controller/discovery/loaders.go b/internal/controller/discovery/loaders.go index c888c27..2644db3 100644 --- a/internal/controller/discovery/loaders.go +++ b/internal/controller/discovery/loaders.go @@ -1,24 +1,26 @@ package discovery import ( + "context" "fmt" + "sigs.k8s.io/controller-runtime/pkg/client" + gnmicv1alpha1 "github.com/gnmic/operator/api/v1alpha1" "github.com/gnmic/operator/internal/controller/discovery/core" http "github.com/gnmic/operator/internal/controller/discovery/loaders/http" ) // NewLoader creates a loader by name -func NewLoader(cfg *core.CommonLoaderConfig, spec gnmicv1alpha1.TargetSourceSpec) (core.Loader, error) { +func NewLoader(ctx context.Context, c client.Client, cfg *core.CommonLoaderConfig, spec gnmicv1alpha1.TargetSourceSpec) (core.Loader, error) { switch { case spec.Provider.HTTP != nil: - cfg.AcceptPush = spec.Provider.HTTP.AcceptPush - return http.New(*cfg), nil - case spec.Provider.Consul != nil: - return nil, fmt.Errorf("Unimplemented targetsource provider, check TargetSource CRD for %s", cfg.TargetsourceNN) + httpSpec := *spec.Provider.HTTP + cfg.AcceptPush = httpSpec.Push != nil && httpSpec.Push.Enabled + cfg.ResourceFetcher = newK8sResourceFetcher(c) + return http.New(*cfg, httpSpec), nil default: return nil, fmt.Errorf("unknown targetsource provider, check TargetSource CRD for %s", cfg.TargetsourceNN) } - } diff --git a/internal/controller/discovery/loaders/http/auth.go b/internal/controller/discovery/loaders/http/auth.go new file mode 100644 index 0000000..109bbc8 --- /dev/null +++ b/internal/controller/discovery/loaders/http/auth.go @@ -0,0 +1,62 @@ +package http + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + + corev1 "k8s.io/api/core/v1" +) + +// fetchSecret uses the configured ResourceFetcher to resolve secret values. +func (l *Loader) fetchSecret(ctx context.Context, sel *corev1.SecretKeySelector) (string, error) { + if l.loaderCfg.ResourceFetcher == nil { + return "", nil + } + return l.loaderCfg.ResourceFetcher.GetSecretKey(ctx, l.loaderCfg.TargetsourceNN.Namespace, sel) +} + +func (l *Loader) applyAuthorization(req *http.Request) error { + auth := l.spec.Authorization + if auth == nil { + return nil + } + // Basic auth + if auth.Basic != nil { + // Secret-based credentials + if auth.Basic.CredentialsSecretRef != nil { + val, err := l.fetchSecret(req.Context(), auth.Basic.CredentialsSecretRef) + if err != nil { + return err + } + var cm map[string]string + if err := json.Unmarshal([]byte(val), &cm); err == nil { + username := cm["username"] + password := cm["password"] + if username != "" || password != "" { + req.SetBasicAuth(username, password) + return nil + } + } + return err + } + return fmt.Errorf("Basic auth enabled but no valid credentials provided") + } + + // Token-based auth: prefer secret ref if present + if auth.Token != nil { + if auth.Token.TokenSecretRef != nil { + token, err := l.fetchSecret(req.Context(), auth.Token.TokenSecretRef) + if err != nil { + return err + } + req.Header.Set("Authorization", fmt.Sprintf("%s %s", auth.Token.Scheme, token)) + return nil + } + return fmt.Errorf("Token auth enabled but no valid token secret reference provided") + } + + // No supported auth method configured + return fmt.Errorf("no supported authentication method configured") +} diff --git a/internal/controller/discovery/loaders/http/loader.go b/internal/controller/discovery/loaders/http/loader.go index 5169e59..b8eb1ef 100644 --- a/internal/controller/discovery/loaders/http/loader.go +++ b/internal/controller/discovery/loaders/http/loader.go @@ -1,49 +1,126 @@ package http import ( + "bytes" "context" + "crypto/tls" + "crypto/x509" + "encoding/json" "fmt" + "net/http" "time" + "github.com/go-logr/logr" + "github.com/google/uuid" + "sigs.k8s.io/controller-runtime/pkg/log" + gnmicv1alpha1 "github.com/gnmic/operator/api/v1alpha1" "github.com/gnmic/operator/internal/controller/discovery/core" loaderUtils "github.com/gnmic/operator/internal/controller/discovery/loaders/utils" - "github.com/google/uuid" ) +// Loader implements the HTTP pull discovery mechanism +// It periodically polls an HTTP endpoint, extracts targets from the response, +// and emits discovery snapshots downstream type Loader struct { - commonCfg core.CommonLoaderConfig + loaderCfg core.CommonLoaderConfig + spec gnmicv1alpha1.HTTPConfig } -// New instantiates the http loader with the provided config -func New(cfg core.CommonLoaderConfig) core.Loader { - return &Loader{commonCfg: cfg} +// New creates a new HTTP loader instance with the provided configuration. +// The loader is stateless apart from its config and spec +func New(cfg core.CommonLoaderConfig, httpConfig gnmicv1alpha1.HTTPConfig) core.Loader { + return &Loader{loaderCfg: cfg, spec: httpConfig} } +// Name returns the loader's name, used for logging and metrics func (l *Loader) Name() string { return "http" } +// Run starts the HTTP discovery loop +// It performs an immediate fetch and then continues polling at a fixed interval func (l *Loader) Run(ctx context.Context, out chan<- []core.DiscoveryMessage) error { + if l.spec.URL == "" { + return nil + } + logger := log.FromContext(ctx).WithValues( "component", "loader", "name", l.Name(), - "targetsource", l.commonCfg.TargetsourceNN, + "targetsource", l.loaderCfg.TargetsourceNN, ) logger.Info( "HTTP loader started", - "targetsource", l.commonCfg.TargetsourceNN.Name, - "namespace", l.commonCfg.TargetsourceNN.Namespace, + "targetsource", l.loaderCfg.TargetsourceNN.Name, + "namespace", l.loaderCfg.TargetsourceNN.Namespace, ) - // Only for debugging: emit a static snapshot every 30 seconds - ticker := time.NewTicker(30 * time.Second) + logger.Info("HTTP loader started") + + client, err := l.buildHTTPClient(ctx) + if err != nil { + return fmt.Errorf("failed to build HTTP client: %w", err) + } + interval := l.spec.Interval.Duration + ticker := time.NewTicker(interval) defer ticker.Stop() - i := 1 + logger.Info( + "HTTP polling discovery started", + "interval", interval.String(), + "url", l.spec.URL, + ) + + // helper function to fetch targets and emit discovery messages + fetchAndEmit := func() { + // Fetch targets from HTTP endpoint + targets, err := l.fetchTargetsFromHTTPEndpoint(ctx, client, logger) + if err != nil { + logger.Error( + err, + "Failed to fetch targets from HTTP endpoint", + "url", l.spec.URL, + ) + return + } + // TODO temporary log discovered targets + for _, t := range targets { + logger.Info( + "Discovered target", + "name", t.Name, + "address", t.Address, + "port", t.Port, + "labels", t.Labels, + "profile", t.TargetProfile, + ) + } + + // Emit discovery snapshot downstream + snapshotID := fmt.Sprintf("%s-%s-%s", l.loaderCfg.TargetsourceNN.Namespace, l.loaderCfg.TargetsourceNN.Name, uuid.NewString()) + if err := loaderUtils.SendSnapshot(ctx, out, targets, snapshotID, l.loaderCfg.ChunkSize); err != nil { + logger.Error( + err, + "Failed to send discovery snapshot", + "snapshotID", snapshotID, + "targets", len(targets), + ) + return + } + logger.Info( + "Discovery snapshot sent", + "snapshotID", snapshotID, + "targets", len(targets), + ) + } + + // Immediate fetch on startup + fetchAndEmit() + + // Periodic fetch for { select { case <-ctx.Done(): @@ -51,61 +128,213 @@ func (l *Loader) Run(ctx context.Context, out chan<- []core.DiscoveryMessage) er return nil case <-ticker.C: - // Switch case + i only needed to test behavior for messages with different values. - switch i { - case 1: - snapshotID := fmt.Sprintf("%s-%s-%s", l.commonCfg.TargetsourceNN.Namespace, l.commonCfg.TargetsourceNN.Name, uuid.NewString()) - targets := []core.DiscoveredTarget{ - { - Name: "ceos1", - Address: "clab-3-nodes-ceos1:6030", - Labels: map[string]string{}, - }, - { - Name: "leaf1", - Address: "clab-3-nodes-leaf1:57400", - Labels: map[string]string{"gnmic_operator_target_profile": "default1"}, - }, - } - - if err := loaderUtils.SendSnapshot(ctx, out, targets, snapshotID, l.commonCfg.ChunkSize); err != nil { - return err - } - case 2: - snapshotID := fmt.Sprintf("%s-%s-%s", l.commonCfg.TargetsourceNN.Namespace, l.commonCfg.TargetsourceNN.Name, uuid.NewString()) - targets := []core.DiscoveredTarget{ - { - Name: "ceos1", - Address: "clab-3-nodes-ceos1:6030", - Labels: map[string]string{"gnmic_operator_target_profile": "default1"}, - }, - { - Name: "leaf2", - Address: "clab-3-nodes-leaf2:57400", - Labels: map[string]string{"gnmic_operator_target_profile": "default1"}, - }, - } - - if err := loaderUtils.SendSnapshot(ctx, out, targets, snapshotID, l.commonCfg.ChunkSize); err != nil { - return err - } - - default: - snapshotID := fmt.Sprintf("%s-%s-%s", l.commonCfg.TargetsourceNN.Namespace, l.commonCfg.TargetsourceNN.Name, uuid.NewString()) - targets := []core.DiscoveredTarget{ - { - Name: "ceos1", - Address: "clab-3-nodes-ceos2:6030", - Labels: map[string]string{"gnmic_operator_target_profile": "default2"}, - }, - } - - if err := loaderUtils.SendSnapshot(ctx, out, targets, snapshotID, l.commonCfg.ChunkSize); err != nil { - return err - } + fetchAndEmit() + } + } +} + +// buildHTTPClient constructs an HTTP client with optional configuration +func (l *Loader) buildHTTPClient(ctx context.Context) (*http.Client, error) { + tlsConfig := &tls.Config{ + InsecureSkipVerify: l.spec.TLS != nil && l.spec.TLS.InsecureSkipVerify, + } + + // If a CA bundle is provided, add it to the TLS config. + if l.spec.TLS != nil { + var caBundle string + if l.spec.TLS.CABundleRef != nil { + var err error + caBundle, err = l.loaderCfg.ResourceFetcher.GetConfigMapKey(ctx, l.loaderCfg.TargetsourceNN.Namespace, l.spec.TLS.CABundleRef) + if err != nil { + return nil, fmt.Errorf("failed to fetch CA bundle from config map ref: %w", err) } + } + if len(caBundle) > 0 { + certPool := x509.NewCertPool() + if ok := certPool.AppendCertsFromPEM([]byte(caBundle)); !ok { + return nil, fmt.Errorf("failed to parse CA bundle for TargetSource %s/%s", l.loaderCfg.TargetsourceNN.Namespace, l.loaderCfg.TargetsourceNN.Name) + } + tlsConfig.RootCAs = certPool + } + } - i++ + timeout := l.spec.Timeout.Duration + // Build the HTTP client with the specified timeout and TLS config + return &http.Client{ + Timeout: timeout, + Transport: &http.Transport{ + TLSClientConfig: tlsConfig, + }, + }, nil +} + +// fetchTargetsFromHTTPEndpoint retrieves targets from the configured HTTP endpoint +func (l *Loader) fetchTargetsFromHTTPEndpoint( + ctx context.Context, + client *http.Client, + logger logr.Logger, +) ([]core.DiscoveredTarget, error) { + var allTargets []core.DiscoveredTarget + currentURL := l.spec.URL + + for { + raw, err := l.fetchPage(ctx, client, currentURL, logger) + if err != nil { + logger.Error(err, + "Failed to fetch page from HTTP endpoint", + "url", currentURL, + ) + break } + + // Extract targets from response + if targets, err := l.extractTargetsFromResponse(raw, logger); err != nil { + logger.Error(err, + "Failed to extract targets from HTTP response", + "url", currentURL, + ) + } else { + allTargets = append(allTargets, targets...) + } + + // Pagination + nextURL, stop := l.getNextURL(raw, currentURL, logger) + if stop { + break + } + currentURL = nextURL } + + return allTargets, nil +} + +// fetchPage performs an HTTP GET request to the specified URL and decodes the JSON response +// and returns the raw response +func (l *Loader) fetchPage(ctx context.Context, client *http.Client, url string, logger logr.Logger) (any, error) { + // Determine HTTP method (default GET) + method := l.spec.Method + if method == "" { + method = http.MethodGet + } + + // Build request body (only for POST) + if method == http.MethodGet && l.spec.Body != "" { + logger.Info("ignoring body for GET request") + } + var bodyReader *bytes.Reader + if method == http.MethodPost && l.spec.Body != "" { + bodyReader = bytes.NewReader([]byte(l.spec.Body)) + } else { + bodyReader = bytes.NewReader(nil) + } + + // Build HTTP request + req, err := http.NewRequestWithContext(ctx, method, url, bodyReader) + if err != nil { + return nil, fmt.Errorf("creating HTTP request failed: %w", err) + } + req.Header.Set("Accept", "application/json") + // Apply user-defined headers + for key, val := range l.spec.Headers { + req.Header.Set(key, val) + } + if err := l.applyAuthorization(req); err != nil { + return nil, fmt.Errorf("applying authorization to HTTP request failed: %w", err) + } + + // Execute HTTP request + resp, err := client.Do(req) + if err != nil { + return nil, fmt.Errorf("HTTP request failed: %w", err) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("unexpected HTTP status code: %d", resp.StatusCode) + } + + // Decode HTTP response + var raw any + if err := json.NewDecoder(resp.Body).Decode(&raw); err != nil { + return nil, fmt.Errorf("failed to decode HTTP response: %w", err) + } + + return raw, nil +} + +// extractTargetsFromResponse extracts items from the response and maps each item into a DiscoveredTarget +func (l *Loader) extractTargetsFromResponse(raw any, logger logr.Logger) ([]core.DiscoveredTarget, error) { + var items []any + // If ResponseMapping is configured and TargetsField is provided we treat + // it as a CEL expression that evaluates against the whole response and + // must return an array of items. + if l.spec.ResponseMapping != nil && l.spec.ResponseMapping.TargetsField != "" { + prog, err := compileCEL(l.spec.ResponseMapping.TargetsField) + if err != nil { + return nil, fmt.Errorf("invalid TargetsField CEL expression: %w", err) + } + out, _, err := prog.Eval(map[string]any{"self": raw}) + if err != nil { + return nil, fmt.Errorf("evaluating TargetsField CEL expression failed: %w", err) + } + if out == nil { + return nil, fmt.Errorf("TargetsField expression returned nil") + } + array, ok := out.Value().([]any) + if !ok { + return nil, fmt.Errorf("invalid HTTP response: targetsField expression must evaluate to an array of objects") + } + items = array + } else { + //If TargetsField is empty, the raw response is expected to be an array of items. + array, ok := raw.([]any) + if !ok { + return nil, fmt.Errorf("invalid HTTP response: expected a JSON array when itemsField is not set") + } + items = array + } + + // Map items to targets + var targets []core.DiscoveredTarget + targets, err := l.mapItemsToTargets(items, raw, logger) + if err != nil { + return nil, fmt.Errorf("mapping items to targets failed: %w", err) + } + + return targets, nil +} + +// getNextURL determines the next page URL +// Returns: +// - nextURL: next request +// - stop: whether to terminate loop +func (l *Loader) getNextURL( + raw any, + currentURL string, + logger logr.Logger, +) (string, bool) { + // Extract pagination info + nextPageInfo, err := l.extractNextPageInfo(raw) + if err != nil { + logger.Error(err, + "Failed to extract next page info from HTTP response", + "url", currentURL, + ) + return "", true + } + + if nextPageInfo == "" { + return "", true + } + + // Build next page URL + nextURL, err := l.buildNextURL(currentURL, nextPageInfo) + if err != nil { + logger.Error(err, + "Failed to build next URL", + "url", currentURL, + ) + return "", true + } + + return nextURL, false } diff --git a/internal/controller/discovery/loaders/http/mapper.go b/internal/controller/discovery/loaders/http/mapper.go new file mode 100644 index 0000000..9fb60c7 --- /dev/null +++ b/internal/controller/discovery/loaders/http/mapper.go @@ -0,0 +1,331 @@ +package http + +import ( + "fmt" + "math" + "strconv" + + "github.com/gnmic/operator/internal/controller/discovery/core" + "github.com/go-logr/logr" + "github.com/google/cel-go/cel" + "github.com/google/cel-go/common/types/ref" + "github.com/google/cel-go/ext" +) + +// mapItemsToTargets converts a list of raw JSON items into DiscoveredTargets using the configured mapping rules +func (l *Loader) mapItemsToTargets(items []any, full any, logger logr.Logger) ([]core.DiscoveredTarget, error) { + // Compile CEL expressions once for efficiency + compiled, err := l.compileMapping() + if err != nil { + return nil, fmt.Errorf("compile mapping: %w", err) + } + + // Map items to targets + targets := make([]core.DiscoveredTarget, 0, len(items)) + for _, item := range items { + obj, ok := item.(map[string]any) + if !ok { + logger.Error(fmt.Errorf("invalid target format"), + "failed to convert target to map", + "item", item, + ) + continue + } + target, err := l.mapItemToTarget(obj, full, compiled) + if err != nil { + logger.Error(err, + "failed to map target", + "item", obj, + ) + continue + } + + targets = append(targets, target) + } + + return targets, nil +} + +type compiledMapping struct { + name cel.Program + address cel.Program + port cel.Program + + targetProfile cel.Program + labels cel.Program +} + +func (l *Loader) compileMapping() (*compiledMapping, error) { + rm := l.spec.ResponseMapping + cm := &compiledMapping{} + if rm == nil { + return cm, nil + } + + var err error + if rm.Name != "" { + cm.name, err = compileCEL(rm.Name) + if err != nil { + return nil, fmt.Errorf("name: %w", err) + } + } + if rm.Address != "" { + cm.address, err = compileCEL(rm.Address) + if err != nil { + return nil, fmt.Errorf("address: %w", err) + } + } + if rm.Port != "" { + cm.port, err = compileCEL(rm.Port) + if err != nil { + return nil, fmt.Errorf("port: %w", err) + } + } + if rm.TargetProfile != "" { + cm.targetProfile, err = compileCEL(rm.TargetProfile) + if err != nil { + return nil, fmt.Errorf("targetProfile: %w", err) + } + } + if rm.Labels != "" { + cm.labels, err = compileCEL(rm.Labels) + if err != nil { + return nil, fmt.Errorf("labels: %w", err) + } + } + + return cm, nil +} + +// mapItemToTarget converts a raw JSON object into a DiscoveredTarget +func (l *Loader) mapItemToTarget(item map[string]any, full any, cm *compiledMapping) (core.DiscoveredTarget, error) { + name, err := l.getName(item, full, cm) + if err != nil { + return core.DiscoveredTarget{}, err + } + + address, err := l.getAddress(item, full, cm) + if err != nil { + return core.DiscoveredTarget{}, err + } + + return core.DiscoveredTarget{ + Name: name, + Address: address, + Port: l.getPort(item, full, cm), + Labels: l.getLabels(item, full, cm), + TargetProfile: l.getTargetProfile(item, full, cm), + }, nil +} + +// getName extracts the target name from the item using the compiled CEL expression if provided, +// otherwise it falls back to the default "name" field +func (l *Loader) getName(item map[string]any, full any, cm *compiledMapping) (string, error) { + if cm.name != nil { + val, err := evalCEL(cm.name, item, full) + if err != nil { + return "", err + } + + str, ok := val.(string) + if !ok || str == "" { + return "", fmt.Errorf("name must be non-empty string") + } + return str, nil + } + + val, ok := item["name"].(string) + if !ok || val == "" { + return "", fmt.Errorf("name must be non-empty string") + } + return val, nil +} + +// getAddress extracts the target address from the item using the compiled CEL expression if provided, +// otherwise it falls back to the default "address" field +func (l *Loader) getAddress(item map[string]any, full any, cm *compiledMapping) (string, error) { + if cm.address != nil { + val, err := evalCEL(cm.address, item, full) + if err != nil { + return "", err + } + + str, ok := val.(string) + if !ok || str == "" { + return "", fmt.Errorf("address must be non-empty string") + } + return str, nil + } + + val, ok := item["address"].(string) + if !ok || val == "" { + return "", fmt.Errorf("address must be non-empty string") + } + return val, nil +} + +// getPort extracts the target port from the item using the compiled CEL expression if provided, +// otherwise it falls back to the default "port" field +func (l *Loader) getPort(item map[string]any, full any, cm *compiledMapping) int32 { + if cm.port != nil { + val, err := evalCEL(cm.port, item, full) + if err == nil { + return extractPort(val) + } + return 0 + } + + return extractPort(item["port"]) +} + +// getLabels extracts the target labels from the item using the compiled CEL expressions if provided, +// otherwise it falls back to the default "labels" field +func (l *Loader) getLabels(item map[string]any, full any, cm *compiledMapping) map[string]string { + result := make(map[string]string) + + if cm != nil && cm.labels != nil { + val, err := evalCEL(cm.labels, item, full) + fmt.Printf("DEBUG: CEL labels result = %#v (type: %T)\n", val, val) + if err != nil { + return result + } + if m, ok := val.(map[string]any); ok { + for k, v := range m { + result[k] = fmt.Sprintf("%v", v) + } + } + return result + } + + // fallback: direct + if raw, ok := item["labels"].(map[string]any); ok { + for key, val := range raw { + result[key] = fmt.Sprintf("%v", val) + } + } + return result +} + +// getTargetProfile extracts the target profile from the item using the compiled CEL expression if provided, +// otherwise it falls back to the default "targetProfile" field +func (l *Loader) getTargetProfile(item map[string]any, full any, cm *compiledMapping) string { + if cm.targetProfile != nil { + val, err := evalCEL(cm.targetProfile, item, full) + if err == nil { + if str, ok := val.(string); ok { + return str + } + } + return "" + } + + if val, ok := item["targetProfile"].(string); ok { + return val + } + return "" +} + +var celEnv = mustNewEnv() + +// mustNewEnv creates a CEL environment with the necessary variable declarations for evaluating expressions +func mustNewEnv() *cel.Env { + env, err := cel.NewEnv( + cel.Variable("self", cel.DynType), + cel.Variable("item", cel.DynType), + // Required for ext.Regex + cel.OptionalTypes(), + // TODO: document what extensions are included + // Include standard CEL declarations for common operations and types + ext.Strings(), + ext.Math(), + ext.Lists(), + ext.Sets(), + ext.Regex(), + ext.Bindings(), + ) + if err != nil { + panic(err) + } + return env +} + +// compileCEL compiles a CEL expression into a program that can be evaluated against items +func compileCEL(expr string) (cel.Program, error) { + ast, issues := celEnv.Compile(expr) + if issues != nil && issues.Err() != nil { + return nil, issues.Err() + } + return celEnv.Program(ast, cel.EvalOptions(cel.OptOptimize)) +} + +// evalCEL evaluates a compiled CEL program against an item +func evalCEL(p cel.Program, item map[string]any, full any) (any, error) { + out, _, err := p.Eval(map[string]any{ + "self": full, + "item": item, + }) + if err != nil { + return nil, err + } + if out == nil { + return nil, fmt.Errorf("CEL returned nil") + } + + return normalizeCEL(out.Value()), nil +} + +// normalizeCEL recursively converts CEL evaluation results into standard Go types +func normalizeCEL(v any) any { + switch raw := v.(type) { + case ref.Val: + return normalizeCEL(raw.Value()) + case map[ref.Val]ref.Val: + out := make(map[string]any) + for k, v := range raw { + key := fmt.Sprintf("%v", normalizeCEL(k)) + out[key] = normalizeCEL(v) + } + return out + case map[string]any: + out := make(map[string]any) + for k, v := range raw { + out[k] = normalizeCEL(v) + } + return out + case map[any]any: + out := make(map[string]any) + for k, v := range raw { + out[fmt.Sprintf("%v", normalizeCEL(k))] = normalizeCEL(v) + } + return out + case []any: + for i := range raw { + raw[i] = normalizeCEL(raw[i]) + } + return raw + default: + return raw + } +} + +// extractPort converts a CEL evaluation result into an int32 port number, +// handling both numeric and string representations +func extractPort(val any) int32 { + switch v := val.(type) { + case float64: + if v < 0 || v > math.MaxInt32 { + return 0 + } + return int32(v) + + case string: + p, err := strconv.ParseInt(v, 10, 32) + if err != nil { + return 0 + } + return int32(p) + + default: + return 0 + } +} diff --git a/internal/controller/discovery/loaders/http/pagination.go b/internal/controller/discovery/loaders/http/pagination.go new file mode 100644 index 0000000..7f7fd51 --- /dev/null +++ b/internal/controller/discovery/loaders/http/pagination.go @@ -0,0 +1,55 @@ +package http + +import ( + "fmt" + "net/url" +) + +// extractNextPageInfo extracts pagination information from a response +func (l *Loader) extractNextPageInfo(raw any) (string, error) { + if l.spec.Pagination == nil || l.spec.Pagination.NextField == "" { + return "", nil + } + + // Only objects can have "next" fields + obj, ok := raw.(map[string]any) + if !ok { + // array case -> no pagination + return "", nil + } + + val, ok := obj[l.spec.Pagination.NextField] + if val == nil { + // No next page -> end of pagination + return "", nil + } + if !ok { + return "", fmt.Errorf("nextField '%s' not found in response", l.spec.Pagination.NextField) + } + + next, ok := val.(string) + if !ok { + return "", fmt.Errorf("nextField '%s' is not a string in response", l.spec.Pagination.NextField) + } + + return next, nil +} + +// buildNextURL constructs the URL for the next page based on the current URL and pagination info +func (l *Loader) buildNextURL(currentURL, nextVal string) (string, error) { + // nextVal is a full URL -> return as is + if parsed, err := url.Parse(nextVal); err == nil && parsed.Scheme != "" { + return nextVal, nil + } + + // nextVal is a token -> append as query parameter + parsedURL, err := url.Parse(currentURL) + if err != nil { + return "", fmt.Errorf("failed to parse current URL in order to build next URL: %w", err) + } + q := parsedURL.Query() + q.Set(l.spec.Pagination.NextField, nextVal) + parsedURL.RawQuery = q.Encode() + + return parsedURL.String(), nil +} diff --git a/internal/controller/discovery/mapper.go b/internal/controller/discovery/mapper.go index bc42531..4690fd1 100644 --- a/internal/controller/discovery/mapper.go +++ b/internal/controller/discovery/mapper.go @@ -1,6 +1,7 @@ package discovery import ( + "fmt" "maps" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -20,10 +21,19 @@ func generateTargetResource(d core.DiscoveredTarget, ts *gnmicv1alpha1.TargetSou }, } - // Add Address from DiscoveredTarget - t.Spec.Address = d.Address - // Add default Target Profile from the TargetSource Spec TargetProfile - t.Spec.Profile = ts.Spec.TargetProfile + // Add Address + Port from DiscoveredTarget or use TargetSource.spec.targetPort + targetPort := ts.Spec.TargetPort + if d.Port != 0 { + targetPort = d.Port + } + t.Spec.Address = fmt.Sprintf("%s:%d", d.Address, targetPort) + + // Add discovered Target Profile or use TargetSource.spec.targetProfile + targetProfile := ts.Spec.TargetProfile + if d.TargetProfile != "" { + targetProfile = d.TargetProfile + } + t.Spec.Profile = targetProfile // Copy TargetLabels from TargetSource Spec & DiscoveredTarget. Discovered labels take precedence over TargetSource labels. maps.Copy(t.Labels, ts.Spec.TargetLabels) diff --git a/internal/controller/discovery/ressource_fetcher.go b/internal/controller/discovery/ressource_fetcher.go new file mode 100644 index 0000000..c544b30 --- /dev/null +++ b/internal/controller/discovery/ressource_fetcher.go @@ -0,0 +1,60 @@ +package discovery + +import ( + "context" + "fmt" + + corev1 "k8s.io/api/core/v1" + "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/gnmic/operator/internal/controller/discovery/core" +) + +// k8sResourceFetcher implements core.ResourceFetcher using a controller runtime client +type k8sResourceFetcher struct { + client client.Client +} + +// GetSecretKey retrieves the value of a specific key from a Kubernetes Secret +func (f *k8sResourceFetcher) GetSecretKey(ctx context.Context, namespace string, selector *corev1.SecretKeySelector) (string, error) { + if selector == nil { + return "", nil + } + var secret corev1.Secret + key := client.ObjectKey{Namespace: namespace, Name: selector.Name} + if err := f.client.Get(ctx, key, &secret); err != nil { + return "", err + } + if selector.Key == "" { + return "", fmt.Errorf("secret key selector has empty key for secret %s/%s", namespace, selector.Name) + } + val, ok := secret.Data[selector.Key] + if !ok { + return "", fmt.Errorf("secret %s/%s does not contain key %s", namespace, selector.Name, selector.Key) + } + return string(val), nil +} + +// GetConfigMapKey retrieves the value of a specific key from a Kubernetes ConfigMap +func (f *k8sResourceFetcher) GetConfigMapKey(ctx context.Context, namespace string, selector *corev1.ConfigMapKeySelector) (string, error) { + if selector == nil { + return "", nil + } + var cm corev1.ConfigMap + key := client.ObjectKey{Namespace: namespace, Name: selector.Name} + if err := f.client.Get(ctx, key, &cm); err != nil { + return "", err + } + if selector.Key == "" { + return "", fmt.Errorf("config map key selector has empty key for config map %s/%s", namespace, selector.Name) + } + val, ok := cm.Data[selector.Key] + if !ok { + return "", fmt.Errorf("config map %s/%s does not contain key %s", namespace, selector.Name, selector.Key) + } + return val, nil +} + +func newK8sResourceFetcher(c client.Client) core.ResourceFetcher { + return &k8sResourceFetcher{client: c} +} diff --git a/internal/controller/targetsource_controller.go b/internal/controller/targetsource_controller.go index 7f30fc8..cfa5782 100644 --- a/internal/controller/targetsource_controller.go +++ b/internal/controller/targetsource_controller.go @@ -196,7 +196,7 @@ func (r *TargetSourceReconciler) startDiscovery( targetSource, targetChannel, ) - loader, err := discovery.NewLoader(&loaderConfig, targetSource.Spec) + loader, err := discovery.NewLoader(ctx, r.Client, &loaderConfig, targetSource.Spec) if err != nil { logger.Error(err, "Target loader could not be created") cleanup()