diff --git a/persys-scheduler/go.mod b/persys-scheduler/go.mod index 752fb69..0ba853b 100644 --- a/persys-scheduler/go.mod +++ b/persys-scheduler/go.mod @@ -6,6 +6,7 @@ require ( github.com/google/uuid v1.6.0 github.com/hashicorp/vault/api v1.16.0 github.com/prometheus/client_golang v1.11.1 + github.com/redis/go-redis/v9 v9.19.0 github.com/sirupsen/logrus v1.6.0 go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.65.0 go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.46.1 @@ -54,7 +55,7 @@ require ( go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.40.0 // indirect go.opentelemetry.io/otel/metric v1.40.0 // indirect go.opentelemetry.io/proto/otlp v1.9.0 // indirect - go.uber.org/atomic v1.7.0 // indirect + go.uber.org/atomic v1.11.0 // indirect go.uber.org/multierr v1.6.0 // indirect golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20260128011058-8636f8732409 // indirect diff --git a/persys-scheduler/go.sum b/persys-scheduler/go.sum index 9615b3a..c11a7fd 100644 --- a/persys-scheduler/go.sum +++ b/persys-scheduler/go.sum @@ -10,6 +10,10 @@ github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+Ce github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs= +github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= +github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= +github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= +github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/cenkalti/backoff/v5 v5.0.3 h1:ZN+IMa753KfX5hd8vVaMixjnqRZ3y8CuJKRKj1xcsSM= @@ -106,6 +110,8 @@ github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7V github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/klauspost/cpuid/v2 v2.2.10 h1:tBs3QSyvjDyFTq3uoc/9xFpCuOsJQFNPiAhYdw2skhE= +github.com/klauspost/cpuid/v2 v2.2.10/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.3 h1:CE8S1cTafDpPvMhIxNJKvHsGVBgn1xWYf1NbHQhywc8= github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= @@ -159,6 +165,8 @@ github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsT github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU= github.com/prometheus/procfs v0.6.0 h1:mxy4L2jP6qMonqmq+aTtOx1ifVWUgG/TAmntgbh3xv4= github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= +github.com/redis/go-redis/v9 v9.19.0 h1:XPVaaPSnG6RhYf7p+rmSa9zZfeVAnWsH5h3lxthOm/k= +github.com/redis/go-redis/v9 v9.19.0/go.mod h1:v/M13XI1PVCDcm01VtPFOADfZtHf8YW3baQf57KlIkA= github.com/ryanuber/columnize v2.1.0+incompatible/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts= github.com/ryanuber/go-glob v1.0.0 h1:iQh3xXAumdQ+4Ufa5b25cRpC5TYKlno6hsv6Cb3pkBk= github.com/ryanuber/go-glob v1.0.0/go.mod h1:807d1WSdnB0XRJzKNil9Om6lcp/3a0v4qIHxIXzX/Yc= @@ -176,6 +184,8 @@ github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/zeebo/xxh3 v1.1.0 h1:s7DLGDK45Dyfg7++yxI0khrfwq9661w9EN78eP/UZVs= +github.com/zeebo/xxh3 v1.1.0/go.mod h1:IisAie1LELR4xhVinxWS5+zf1lA4p0MW4T+w+W07F5s= go.etcd.io/etcd/api/v3 v3.5.21 h1:A6O2/JDb3tvHhiIz3xf9nJ7REHvtEFJJ3veW3FbCnS8= go.etcd.io/etcd/api/v3 v3.5.21/go.mod h1:c3aH5wcvXv/9dqIw2Y810LDXJfhSYdHQ0vxmP3CCHVY= go.etcd.io/etcd/client/pkg/v3 v3.5.21 h1:lPBu71Y7osQmzlflM9OfeIV2JlmpBjqBNlLtcoBqUTc= @@ -204,8 +214,9 @@ go.opentelemetry.io/otel/trace v1.40.0 h1:WA4etStDttCSYuhwvEa8OP8I5EWu24lkOzp+ZY go.opentelemetry.io/otel/trace v1.40.0/go.mod h1:zeAhriXecNGP/s2SEG3+Y8X9ujcJOTqQ5RgdEJcawiA= go.opentelemetry.io/proto/otlp v1.9.0 h1:l706jCMITVouPOqEnii2fIAuO3IVGBRPV5ICjceRb/A= go.opentelemetry.io/proto/otlp v1.9.0/go.mod h1:xE+Cx5E/eEHw+ISFkwPLwCZefwVjY+pqKg1qcK03+/4= -go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= +go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= +go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4= diff --git a/persys-scheduler/internal/config/config.go b/persys-scheduler/internal/config/config.go index a4ffa01..2f20f27 100644 --- a/persys-scheduler/internal/config/config.go +++ b/persys-scheduler/internal/config/config.go @@ -20,6 +20,12 @@ type Config struct { // etcd / discovery EtcdEndpoints []string + RedisAddr string + RedisPassword string + RedisDB int + RedisReconcileTTL time.Duration + RedisEventTTL time.Duration + RedisEventMaxEntries int64 Domain string AgentsDiscoveryDomain string SchedulerShardKey string @@ -80,9 +86,15 @@ func Load(insecureFlag bool) (*Config, error) { GRPCAddr: envOr("PERSYS_GRPC_ADDR", "0.0.0.0"), GRPCPort: grpcPort, MetricsPort: metricsPort, - ExternalIP: envOr("PERSYS_EXTERNAL_IP", ""), + ExternalIP: envOr("PERSYS_EXTERNAL_IP", ""), EtcdEndpoints: splitCSV(envOr("ETCD_ENDPOINTS", "localhost:2379")), + RedisAddr: strings.TrimSpace(os.Getenv("REDIS_ADDR")), + RedisPassword: strings.TrimSpace(os.Getenv("REDIS_PASSWORD")), + RedisDB: envIntOr("REDIS_DB", 0), + RedisReconcileTTL: envDurationOrFlexibleSeconds("REDIS_RECONCILE_TTL", 24*time.Hour), + RedisEventTTL: envDurationOrFlexibleSeconds("REDIS_EVENT_TTL", 24*time.Hour), + RedisEventMaxEntries: int64(envIntOr("REDIS_EVENT_MAX_ENTRIES", 1000)), Domain: envOr("DOMAIN", "persys.local"), AgentsDiscoveryDomain: envOr("AGENTS_DISCOVERY_DOMAIN", "agents.persys.cloud"), SchedulerShardKey: envOr("SCHEDULER_SHARD_KEY", "genesis"), diff --git a/persys-scheduler/internal/metrics/metrics.go b/persys-scheduler/internal/metrics/metrics.go index 61fd783..27b7508 100644 --- a/persys-scheduler/internal/metrics/metrics.go +++ b/persys-scheduler/internal/metrics/metrics.go @@ -109,6 +109,15 @@ var ( }, []string{"desired_state"}, ) + stateStoreWritesTotal = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "persys", + Subsystem: "scheduler", + Name: "state_store_writes_total", + Help: "Number of scheduler state-store writes by category.", + }, + []string{"category"}, + ) ) var defaultNodeStatuses = []string{"ready", "active", "notready", "unknown"} @@ -132,6 +141,7 @@ func Register() { nodeStatusGauge, workloadStatusGauge, workloadDesiredGauge, + stateStoreWritesTotal, ) for _, s := range defaultNodeStatuses { @@ -146,6 +156,10 @@ func Register() { }) } +func IncStateStoreWrite(category string) { + stateStoreWritesTotal.WithLabelValues(category).Inc() +} + func GRPCUnaryServerInterceptor() grpc.UnaryServerInterceptor { return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { start := time.Now() diff --git a/persys-scheduler/internal/scheduler/reconciler.go b/persys-scheduler/internal/scheduler/reconciler.go index cb96088..531cef4 100644 --- a/persys-scheduler/internal/scheduler/reconciler.go +++ b/persys-scheduler/internal/scheduler/reconciler.go @@ -440,11 +440,8 @@ func (r *Reconciler) applyDesiredState(ctx context.Context, workload models.Work workload.Metadata[workloadReapplyRevisionKey] = strings.TrimSpace(workload.RevisionID) workload.Metadata["lastLaunchTime"] = now - workloadJSON, marshalErr := json.Marshal(workload) - if marshalErr == nil { - if err := r.scheduler.RetryableEtcdPut("/workloads/"+workload.ID, string(workloadJSON)); err != nil { - return action, fmt.Errorf("persist apply metadata: %w", err) - } + if err := r.scheduler.saveWorkload(workload); err != nil { + return action, fmt.Errorf("persist apply metadata: %w", err) } return action, nil @@ -545,13 +542,29 @@ func (r *Reconciler) updateWorkloadReconciliationStatus(workloadID string, resul workload.Metadata["lastReconciliationSuccess"] = result.Success workload.Metadata["reconciliationRetryCount"] = result.RetryCount - workloadJSON, err := json.Marshal(workload) - if err != nil { - reconcilerLogger.WithError(err).WithField("workload_id", workloadID).Warn("failed to marshal workload during reconciliation status update") + // High-churn reconciliation metadata should live in Redis when available. + if r.scheduler.redisClient != nil { + ttl := 24 * time.Hour + if r.scheduler.cfg != nil && r.scheduler.cfg.RedisReconcileTTL > 0 { + ttl = r.scheduler.cfg.RedisReconcileTTL + } + statusKey := fmt.Sprintf("workload:%s:reconcile_status", workloadID) + if payload, mErr := json.Marshal(workload.Metadata); mErr == nil { + if err := r.scheduler.redisClient.Set(context.Background(), statusKey, payload, ttl).Err(); err == nil { + return + } + } + } + + currentLastAction, _ := workload.Metadata["last_action"].(string) + if result.Action == "NoAction" && currentLastAction == "NoAction" { return } - if err := r.scheduler.RetryableEtcdPut("/workloads/"+workloadID, string(workloadJSON)); err != nil { + workload.Metadata["last_action"] = result.Action + + if err := r.scheduler.saveWorkload(workload); err != nil { reconcilerLogger.WithError(err).WithField("workload_id", workloadID).Warn("failed to persist reconciliation status") + return } } diff --git a/persys-scheduler/internal/scheduler/redis_store.go b/persys-scheduler/internal/scheduler/redis_store.go new file mode 100644 index 0000000..42b3d6c --- /dev/null +++ b/persys-scheduler/internal/scheduler/redis_store.go @@ -0,0 +1,92 @@ +package scheduler + +import ( + "context" + "encoding/json" + "fmt" + "time" + + "github.com/persys-dev/persys-cloud/persys-scheduler/internal/logging" + "github.com/redis/go-redis/v9" + "github.com/sirupsen/logrus" +) + +var redisLogger = logging.C("scheduler.redis") + +func (s *Scheduler) initRedisStore() { + if s.cfg == nil || s.cfg.RedisAddr == "" { + return + } + client := redis.NewClient(&redis.Options{ + Addr: s.cfg.RedisAddr, + Password: s.cfg.RedisPassword, + DB: s.cfg.RedisDB, + }) + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + if err := client.Ping(ctx).Err(); err != nil { + redisLogger.WithError(err).Warn("redis configured but unavailable; falling back to etcd for reconciliation telemetry") + _ = client.Close() + return + } + s.redisClient = client + redisLogger.WithField("addr", s.cfg.RedisAddr).Info("redis telemetry store enabled") +} + +func (s *Scheduler) writeReconciliationTelemetry(workloadID, action string, success bool, reason string, attemptedAt time.Time) { + rec := map[string]interface{}{ + "workloadId": workloadID, + "action": action, + "success": success, + "reason": reason, + "attemptedAt": attemptedAt.UTC().Format(time.RFC3339Nano), + } + payload, err := json.Marshal(rec) + if err != nil { + return + } + if s.redisClient != nil { + ttl := 24 * time.Hour + if s.cfg != nil && s.cfg.RedisReconcileTTL > 0 { + ttl = s.cfg.RedisReconcileTTL + } + key := fmt.Sprintf("reconciliation:%s", workloadID) + historyKey := "reconciliation:history" + if err := s.redisClient.Set(context.Background(), key, payload, ttl).Err(); err == nil { + maxEntries := int64(2000) + if s.cfg != nil && s.cfg.RedisEventMaxEntries > 0 { + maxEntries = s.cfg.RedisEventMaxEntries + } + pipe := s.redisClient.TxPipeline() + pipe.LPush(context.Background(), historyKey, payload) + pipe.LTrim(context.Background(), historyKey, 0, maxEntries-1) + pipe.Expire(context.Background(), historyKey, ttl) + _, _ = pipe.Exec(context.Background()) + return + } + redisLogger.WithError(err).WithFields(logrus.Fields{"key": key}).Warn("failed writing reconciliation telemetry to redis") + } + _ = s.RetryableEtcdPut(reconciliationKey(workloadID), string(payload)) +} + +func (s *Scheduler) writeEventTelemetry(payload []byte) bool { + if s.redisClient == nil { + return false + } + ttl := 24 * time.Hour + maxEntries := int64(2000) + if s.cfg != nil { + if s.cfg.RedisEventTTL > 0 { + ttl = s.cfg.RedisEventTTL + } + if s.cfg.RedisEventMaxEntries > 0 { + maxEntries = s.cfg.RedisEventMaxEntries + } + } + pipe := s.redisClient.TxPipeline() + pipe.LPush(context.Background(), "events:history", payload) + pipe.LTrim(context.Background(), "events:history", 0, maxEntries-1) + pipe.Expire(context.Background(), "events:history", ttl) + _, err := pipe.Exec(context.Background()) + return err == nil +} diff --git a/persys-scheduler/internal/scheduler/scheduler.go b/persys-scheduler/internal/scheduler/scheduler.go index 98442b5..7b761fd 100644 --- a/persys-scheduler/internal/scheduler/scheduler.go +++ b/persys-scheduler/internal/scheduler/scheduler.go @@ -13,6 +13,7 @@ import ( cfgpkg "github.com/persys-dev/persys-cloud/persys-scheduler/internal/config" "github.com/persys-dev/persys-cloud/persys-scheduler/internal/logging" "github.com/persys-dev/persys-cloud/persys-scheduler/internal/models" + "github.com/redis/go-redis/v9" "github.com/sirupsen/logrus" clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/zap" @@ -31,6 +32,7 @@ var schedulerLogger = logging.C("scheduler.core") type Scheduler struct { cfg *cfgpkg.Config etcdClient *clientv3.Client + redisClient *redis.Client domain string agentsDomain string schedulerShard string @@ -86,6 +88,7 @@ func NewScheduler(cfg *cfgpkg.Config) (*Scheduler, error) { } // Initialize monitor and reconciler + scheduler.initRedisStore() scheduler.monitor = NewMonitor(scheduler) scheduler.reconciler = NewReconciler(scheduler, scheduler.monitor) @@ -95,7 +98,10 @@ func NewScheduler(cfg *cfgpkg.Config) (*Scheduler, error) { // Close shuts down the scheduler gracefully. func (s *Scheduler) Close() error { if s.etcdClient != nil { - return s.etcdClient.Close() + _ = s.etcdClient.Close() + } + if s.redisClient != nil { + _ = s.redisClient.Close() } return nil } @@ -491,7 +497,7 @@ func (s *Scheduler) DeleteNode(nodeID string) error { // GetWorkloads retrieves all workloads from etcd. func (s *Scheduler) GetWorkloads() ([]models.Workload, error) { - resp, err := s.RetryableEtcdGet("/workloads/", clientv3.WithPrefix()) + specResp, err := s.RetryableEtcdGet(workloadSpecPrefix, clientv3.WithPrefix()) if err != nil { if s.currentMode() != ModeNormal { return s.getCachedWorkloads(), nil @@ -500,16 +506,41 @@ func (s *Scheduler) GetWorkloads() ([]models.Workload, error) { } workloads := make([]models.Workload, 0) - if resp == nil { + if specResp == nil { return workloads, nil } - - for _, kv := range resp.Kvs { - var workload models.Workload - if err := json.Unmarshal(kv.Value, &workload); err != nil { + statusResp, _ := s.RetryableEtcdGet(workloadStatusPrefix, clientv3.WithPrefix()) + statusMap := map[string]workloadStatus{} + if statusResp != nil { + for _, kv := range statusResp.Kvs { + var st workloadStatus + if err := json.Unmarshal(kv.Value, &st); err == nil { + statusMap[st.ID] = st + } + } + } + for _, kv := range specResp.Kvs { + var spec workloadSpec + if err := json.Unmarshal(kv.Value, &spec); err != nil { schedulerLogger.WithError(err).WithField("key", string(kv.Key)).Warn("failed to unmarshal workload data") continue } + workload := models.Workload{ + ID: spec.ID, Name: spec.Name, Type: spec.Type, RevisionID: spec.RevisionID, Image: spec.Image, Command: spec.Command, + CommandList: spec.CommandList, Compose: spec.Compose, ComposeYAML: spec.ComposeYAML, ProjectName: spec.ProjectName, + GitRepo: spec.GitRepo, GitBranch: spec.GitBranch, GitToken: spec.GitToken, EnvVars: spec.EnvVars, Resources: spec.Resources, + DesiredState: spec.DesiredState, Labels: spec.Labels, LocalPath: spec.LocalPath, Ports: spec.Ports, Volumes: spec.Volumes, + Network: spec.Network, RestartPolicy: spec.RestartPolicy, VM: spec.VM, + } + if st, ok := statusMap[workload.ID]; ok { + workload.AssignedNode = st.AssignedNode + workload.NodeID = st.NodeID + workload.Status = st.Status + workload.Logs = st.Logs + workload.Metadata = st.Metadata + workload.Retry = st.Retry + workload.StatusInfo = st.StatusInfo + } workloads = append(workloads, workload) } if s.currentMode() != ModeNormal && len(workloads) == 0 { @@ -529,7 +560,7 @@ func (s *Scheduler) GetWorkloads() ([]models.Workload, error) { // GetWorkloadByID retrieves a specific workload by ID. func (s *Scheduler) GetWorkloadByID(workloadID string) (models.Workload, error) { - resp, err := s.RetryableEtcdGet("/workloads/" + workloadID) + specResp, err := s.RetryableEtcdGet(workloadSpecKey(workloadID)) if err != nil { if s.currentMode() != ModeNormal { if workload, ok := s.getCachedWorkload(workloadID); ok { @@ -538,15 +569,29 @@ func (s *Scheduler) GetWorkloadByID(workloadID string) (models.Workload, error) } return models.Workload{}, fmt.Errorf("failed to get workload %s: %v", workloadID, err) } - - if len(resp.Kvs) == 0 { - return models.Workload{}, fmt.Errorf("workload %s not found", workloadID) + if specResp == nil || len(specResp.Kvs) == 0 { + // compatibility shim for legacy persisted full objects. + resp, legacyErr := s.RetryableEtcdGet("/workloads/" + workloadID) + if legacyErr != nil || len(resp.Kvs) == 0 { + return models.Workload{}, fmt.Errorf("workload %s not found", workloadID) + } + var legacy models.Workload + if err := json.Unmarshal(resp.Kvs[0].Value, &legacy); err != nil { + return models.Workload{}, fmt.Errorf("failed to unmarshal legacy workload %s: %v", workloadID, err) + } + s.cacheWorkload(legacy) + return legacy, nil } - - var workload models.Workload - if err := json.Unmarshal(resp.Kvs[0].Value, &workload); err != nil { - return models.Workload{}, fmt.Errorf("failed to unmarshal workload %s: %v", workloadID, err) + var spec workloadSpec + if err := json.Unmarshal(specResp.Kvs[0].Value, &spec); err != nil { + return models.Workload{}, fmt.Errorf("failed to unmarshal workload spec %s: %v", workloadID, err) + } + statusResp, _ := s.RetryableEtcdGet(workloadStatusKey(workloadID)) + var st workloadStatus + if statusResp != nil && len(statusResp.Kvs) > 0 { + _ = json.Unmarshal(statusResp.Kvs[0].Value, &st) } + workload := models.Workload{ID: spec.ID, Name: spec.Name, Type: spec.Type, RevisionID: spec.RevisionID, Image: spec.Image, Command: spec.Command, CommandList: spec.CommandList, Compose: spec.Compose, ComposeYAML: spec.ComposeYAML, ProjectName: spec.ProjectName, GitRepo: spec.GitRepo, GitBranch: spec.GitBranch, GitToken: spec.GitToken, EnvVars: spec.EnvVars, Resources: spec.Resources, DesiredState: spec.DesiredState, Labels: spec.Labels, LocalPath: spec.LocalPath, Ports: spec.Ports, Volumes: spec.Volumes, Network: spec.Network, RestartPolicy: spec.RestartPolicy, VM: spec.VM, AssignedNode: st.AssignedNode, NodeID: st.NodeID, Status: st.Status, Logs: st.Logs, Metadata: st.Metadata, Retry: st.Retry, StatusInfo: st.StatusInfo} s.cacheWorkload(workload) return workload, nil @@ -591,9 +636,11 @@ func (s *Scheduler) DeleteWorkloadWithContext(ctx context.Context, workloadID st } } - if err := s.RetryableEtcdDelete("/workloads/" + workloadID); err != nil { + if err := s.RetryableEtcdDelete(workloadSpecKey(workloadID)); err != nil { return fmt.Errorf("failed to delete workload %s: %v", workloadID, err) } + _ = s.RetryableEtcdDelete(workloadStatusKey(workloadID)) + _ = s.RetryableEtcdDelete("/workloads/" + workloadID) _ = s.RetryableEtcdDelete(assignmentKey(workloadID)) _ = s.RetryableEtcdDelete(retryKey(workloadID)) _ = s.RetryableEtcdDelete(reconciliationKey(workloadID)) @@ -618,6 +665,9 @@ func (s *Scheduler) UpdateWorkloadStatus(workloadID, status string) error { return err } + if strings.EqualFold(workload.Status, status) && strings.EqualFold(workload.StatusInfo.ActualState, status) { + return nil + } workload.Status = status workload.StatusInfo.ActualState = status workload.StatusInfo.LastUpdated = time.Now().UTC() @@ -651,7 +701,9 @@ func (s *Scheduler) UpdateWorkloadLogs(workloadID, logs string) error { // Append logs with timestamp timestamp := time.Now().Format("2006-01-02 15:04:05") logEntry := fmt.Sprintf("[%s] %s\n", timestamp, logs) - + if strings.TrimSpace(logs) == "" { + return nil + } if workload.Logs == "" { workload.Logs = logEntry } else { @@ -681,13 +733,20 @@ func (s *Scheduler) UpdateWorkloadMetadata(workloadID string, metadata map[strin if workload.Metadata == nil { workload.Metadata = map[string]interface{}{} } + changed := false for k, v := range metadata { key := strings.TrimSpace(k) if key == "" { continue } + if existing, ok := workload.Metadata[key]; !ok || fmt.Sprintf("%v", existing) != strings.TrimSpace(v) { + changed = true + } workload.Metadata[key] = strings.TrimSpace(v) } + if !changed { + return nil + } workload.StatusInfo.LastUpdated = time.Now().UTC() if err := s.saveWorkload(workload); err != nil { return fmt.Errorf("failed to update workload %s metadata: %v", workloadID, err) diff --git a/persys-scheduler/internal/scheduler/state_store.go b/persys-scheduler/internal/scheduler/state_store.go index 8e725cb..88b521a 100644 --- a/persys-scheduler/internal/scheduler/state_store.go +++ b/persys-scheduler/internal/scheduler/state_store.go @@ -7,6 +7,7 @@ import ( "time" "github.com/google/uuid" + metricspkg "github.com/persys-dev/persys-cloud/persys-scheduler/internal/metrics" "github.com/persys-dev/persys-cloud/persys-scheduler/internal/models" clientv3 "go.etcd.io/etcd/client/v3" ) @@ -14,6 +15,8 @@ import ( const ( nodesPrefix = "/nodes/" workloadsPrefix = "/workloads/" + workloadSpecPrefix = "/workloads-spec/" + workloadStatusPrefix = "/workloads-status/" assignmentsPrefix = "/assignments/" reconciliationPrefix = "/reconciliation/" retriesPrefix = "/retries/" @@ -22,6 +25,8 @@ const ( ) func workloadKey(workloadID string) string { return workloadsPrefix + workloadID } +func workloadSpecKey(workloadID string) string { return workloadSpecPrefix + workloadID } +func workloadStatusKey(workloadID string) string { return workloadStatusPrefix + workloadID } func assignmentKey(workloadID string) string { return assignmentsPrefix + workloadID } func reconciliationKey(workloadID string) string { return reconciliationPrefix + workloadID } func retryKey(workloadID string) string { return retriesPrefix + workloadID } @@ -84,17 +89,27 @@ func (s *Scheduler) initializeWorkloadDefaults(workload *models.Workload) { } func (s *Scheduler) saveWorkload(workload models.Workload) error { - payload, err := json.Marshal(workload) + specPayload, err := json.Marshal(workloadSpecFromWorkload(workload)) if err != nil { return fmt.Errorf("marshal workload %s: %w", workload.ID, err) } - if err := s.RetryableEtcdPut(workloadKey(workload.ID), string(payload)); err != nil { + statusPayload, err := json.Marshal(workloadStatusFromWorkload(workload)) + if err != nil { + return fmt.Errorf("marshal workload status %s: %w", workload.ID, err) + } + if err := s.RetryableEtcdPut(workloadSpecKey(workload.ID), string(specPayload)); err != nil { + return err + } + metricspkg.IncStateStoreWrite("spec") + if err := s.RetryableEtcdPut(workloadStatusKey(workload.ID), string(statusPayload)); err != nil { return err } + metricspkg.IncStateStoreWrite("status") s.cacheWorkload(workload) retryPayload, err := json.Marshal(workload.Retry) if err == nil { _ = s.RetryableEtcdPut(retryKey(workload.ID), string(retryPayload)) + metricspkg.IncStateStoreWrite("retry") } return nil } @@ -113,23 +128,14 @@ func (s *Scheduler) writeAssignment(workloadID, nodeID, reason string) error { if err := s.RetryableEtcdPut(assignmentKey(workloadID), string(payload)); err != nil { return err } + metricspkg.IncStateStoreWrite("assignment") s.cacheAssignment(rec) return nil } func (s *Scheduler) writeReconciliationRecord(workloadID, action string, success bool, reason string) { - rec := models.ReconciliationRecord{ - WorkloadID: workloadID, - Action: action, - Success: success, - Reason: reason, - AttemptedAt: time.Now().UTC(), - } - payload, err := json.Marshal(rec) - if err != nil { - return - } - _ = s.RetryableEtcdPut(reconciliationKey(workloadID), string(payload)) + s.writeReconciliationTelemetry(workloadID, action, success, reason, time.Now().UTC()) + metricspkg.IncStateStoreWrite("reconciliation") } func (s *Scheduler) emitEvent(eventType, workloadID, nodeID, reason string, details map[string]interface{}) { @@ -146,7 +152,12 @@ func (s *Scheduler) emitEvent(eventType, workloadID, nodeID, reason string, deta if err != nil { return } + if s.writeEventTelemetry(payload) { + metricspkg.IncStateStoreWrite("event") + return + } _ = s.RetryableEtcdPut(eventKey(event.ID), string(payload)) + metricspkg.IncStateStoreWrite("event") } func (s *Scheduler) writeDriftRecord(record models.DriftRecord) { diff --git a/persys-scheduler/internal/scheduler/workload_projection.go b/persys-scheduler/internal/scheduler/workload_projection.go new file mode 100644 index 0000000..f1bb8b3 --- /dev/null +++ b/persys-scheduler/internal/scheduler/workload_projection.go @@ -0,0 +1,49 @@ +package scheduler + +import "github.com/persys-dev/persys-cloud/persys-scheduler/internal/models" + +type workloadSpec struct { + ID string `json:"id,omitempty"` + Name string `json:"name,omitempty"` + Type string `json:"type,omitempty"` + RevisionID string `json:"revisionId,omitempty"` + Image string `json:"image,omitempty"` + Command string `json:"command,omitempty"` + CommandList []string `json:"commandList,omitempty"` + Compose string `json:"compose,omitempty"` + ComposeYAML string `json:"composeYaml,omitempty"` + ProjectName string `json:"projectName,omitempty"` + GitRepo string `json:"gitRepo,omitempty"` + GitBranch string `json:"gitBranch,omitempty"` + GitToken string `json:"gitToken,omitempty"` + EnvVars map[string]string `json:"envVars,omitempty"` + Resources models.Resources `json:"resources"` + DesiredState string `json:"desiredState,omitempty"` + Labels map[string]string `json:"labels,omitempty"` + CreatedAt interface{} `json:"createdAt,omitempty"` + LocalPath string `json:"localPath,omitempty"` + Ports []string `json:"ports,omitempty"` + Volumes []string `json:"volumes,omitempty"` + Network string `json:"network,omitempty"` + RestartPolicy string `json:"restartPolicy,omitempty"` + VM *models.VMSpec `json:"vm,omitempty"` +} + +type workloadStatus struct { + ID string `json:"id,omitempty"` + AssignedNode string `json:"assignedNode,omitempty"` + NodeID string `json:"nodeId,omitempty"` + Status string `json:"status,omitempty"` + Logs string `json:"logs,omitempty"` + Metadata map[string]interface{} `json:"metadata,omitempty"` + Retry models.RetryState `json:"retry"` + StatusInfo models.WorkloadStatusInfo `json:"statusInfo"` +} + +func workloadSpecFromWorkload(w models.Workload) workloadSpec { + return workloadSpec{ID: w.ID, Name: w.Name, Type: w.Type, RevisionID: w.RevisionID, Image: w.Image, Command: w.Command, CommandList: w.CommandList, Compose: w.Compose, ComposeYAML: w.ComposeYAML, ProjectName: w.ProjectName, GitRepo: w.GitRepo, GitBranch: w.GitBranch, GitToken: w.GitToken, EnvVars: w.EnvVars, Resources: w.Resources, DesiredState: w.DesiredState, Labels: w.Labels, CreatedAt: w.CreatedAt, LocalPath: w.LocalPath, Ports: w.Ports, Volumes: w.Volumes, Network: w.Network, RestartPolicy: w.RestartPolicy, VM: w.VM} +} + +func workloadStatusFromWorkload(w models.Workload) workloadStatus { + return workloadStatus{ID: w.ID, AssignedNode: w.AssignedNode, NodeID: w.NodeID, Status: w.Status, Logs: w.Logs, Metadata: w.Metadata, Retry: w.Retry, StatusInfo: w.StatusInfo} +}