Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion persys-scheduler/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/google/uuid v1.6.0
github.com/hashicorp/vault/api v1.16.0
github.com/prometheus/client_golang v1.11.1
github.com/redis/go-redis/v9 v9.19.0
github.com/sirupsen/logrus v1.6.0
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.65.0
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.46.1
Expand Down Expand Up @@ -54,7 +55,7 @@ require (
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.40.0 // indirect
go.opentelemetry.io/otel/metric v1.40.0 // indirect
go.opentelemetry.io/proto/otlp v1.9.0 // indirect
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/atomic v1.11.0 // indirect
go.uber.org/multierr v1.6.0 // indirect
golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20260128011058-8636f8732409 // indirect
Expand Down
13 changes: 12 additions & 1 deletion persys-scheduler/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+Ce
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c=
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8=
github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/cenkalti/backoff/v5 v5.0.3 h1:ZN+IMa753KfX5hd8vVaMixjnqRZ3y8CuJKRKj1xcsSM=
Expand Down Expand Up @@ -106,6 +110,8 @@ github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7V
github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/cpuid/v2 v2.2.10 h1:tBs3QSyvjDyFTq3uoc/9xFpCuOsJQFNPiAhYdw2skhE=
github.com/klauspost/cpuid/v2 v2.2.10/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.3 h1:CE8S1cTafDpPvMhIxNJKvHsGVBgn1xWYf1NbHQhywc8=
github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
Expand Down Expand Up @@ -159,6 +165,8 @@ github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsT
github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU=
github.com/prometheus/procfs v0.6.0 h1:mxy4L2jP6qMonqmq+aTtOx1ifVWUgG/TAmntgbh3xv4=
github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
github.com/redis/go-redis/v9 v9.19.0 h1:XPVaaPSnG6RhYf7p+rmSa9zZfeVAnWsH5h3lxthOm/k=
github.com/redis/go-redis/v9 v9.19.0/go.mod h1:v/M13XI1PVCDcm01VtPFOADfZtHf8YW3baQf57KlIkA=
github.com/ryanuber/columnize v2.1.0+incompatible/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts=
github.com/ryanuber/go-glob v1.0.0 h1:iQh3xXAumdQ+4Ufa5b25cRpC5TYKlno6hsv6Cb3pkBk=
github.com/ryanuber/go-glob v1.0.0/go.mod h1:807d1WSdnB0XRJzKNil9Om6lcp/3a0v4qIHxIXzX/Yc=
Expand All @@ -176,6 +184,8 @@ github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/zeebo/xxh3 v1.1.0 h1:s7DLGDK45Dyfg7++yxI0khrfwq9661w9EN78eP/UZVs=
github.com/zeebo/xxh3 v1.1.0/go.mod h1:IisAie1LELR4xhVinxWS5+zf1lA4p0MW4T+w+W07F5s=
go.etcd.io/etcd/api/v3 v3.5.21 h1:A6O2/JDb3tvHhiIz3xf9nJ7REHvtEFJJ3veW3FbCnS8=
go.etcd.io/etcd/api/v3 v3.5.21/go.mod h1:c3aH5wcvXv/9dqIw2Y810LDXJfhSYdHQ0vxmP3CCHVY=
go.etcd.io/etcd/client/pkg/v3 v3.5.21 h1:lPBu71Y7osQmzlflM9OfeIV2JlmpBjqBNlLtcoBqUTc=
Expand Down Expand Up @@ -204,8 +214,9 @@ go.opentelemetry.io/otel/trace v1.40.0 h1:WA4etStDttCSYuhwvEa8OP8I5EWu24lkOzp+ZY
go.opentelemetry.io/otel/trace v1.40.0/go.mod h1:zeAhriXecNGP/s2SEG3+Y8X9ujcJOTqQ5RgdEJcawiA=
go.opentelemetry.io/proto/otlp v1.9.0 h1:l706jCMITVouPOqEnii2fIAuO3IVGBRPV5ICjceRb/A=
go.opentelemetry.io/proto/otlp v1.9.0/go.mod h1:xE+Cx5E/eEHw+ISFkwPLwCZefwVjY+pqKg1qcK03+/4=
go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE=
go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4=
Expand Down
14 changes: 13 additions & 1 deletion persys-scheduler/internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@ type Config struct {

// etcd / discovery
EtcdEndpoints []string
RedisAddr string
RedisPassword string
RedisDB int
RedisReconcileTTL time.Duration
RedisEventTTL time.Duration
RedisEventMaxEntries int64
Domain string
AgentsDiscoveryDomain string
SchedulerShardKey string
Expand Down Expand Up @@ -80,9 +86,15 @@ func Load(insecureFlag bool) (*Config, error) {
GRPCAddr: envOr("PERSYS_GRPC_ADDR", "0.0.0.0"),
GRPCPort: grpcPort,
MetricsPort: metricsPort,
ExternalIP: envOr("PERSYS_EXTERNAL_IP", ""),
ExternalIP: envOr("PERSYS_EXTERNAL_IP", ""),

EtcdEndpoints: splitCSV(envOr("ETCD_ENDPOINTS", "localhost:2379")),
RedisAddr: strings.TrimSpace(os.Getenv("REDIS_ADDR")),
RedisPassword: strings.TrimSpace(os.Getenv("REDIS_PASSWORD")),
RedisDB: envIntOr("REDIS_DB", 0),
RedisReconcileTTL: envDurationOrFlexibleSeconds("REDIS_RECONCILE_TTL", 24*time.Hour),
RedisEventTTL: envDurationOrFlexibleSeconds("REDIS_EVENT_TTL", 24*time.Hour),
RedisEventMaxEntries: int64(envIntOr("REDIS_EVENT_MAX_ENTRIES", 1000)),
Domain: envOr("DOMAIN", "persys.local"),
AgentsDiscoveryDomain: envOr("AGENTS_DISCOVERY_DOMAIN", "agents.persys.cloud"),
SchedulerShardKey: envOr("SCHEDULER_SHARD_KEY", "genesis"),
Expand Down
14 changes: 14 additions & 0 deletions persys-scheduler/internal/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,15 @@ var (
},
[]string{"desired_state"},
)
stateStoreWritesTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "persys",
Subsystem: "scheduler",
Name: "state_store_writes_total",
Help: "Number of scheduler state-store writes by category.",
},
[]string{"category"},
)
)

var defaultNodeStatuses = []string{"ready", "active", "notready", "unknown"}
Expand All @@ -132,6 +141,7 @@ func Register() {
nodeStatusGauge,
workloadStatusGauge,
workloadDesiredGauge,
stateStoreWritesTotal,
)

for _, s := range defaultNodeStatuses {
Expand All @@ -146,6 +156,10 @@ func Register() {
})
}

func IncStateStoreWrite(category string) {
stateStoreWritesTotal.WithLabelValues(category).Inc()
}

func GRPCUnaryServerInterceptor() grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
start := time.Now()
Expand Down
31 changes: 22 additions & 9 deletions persys-scheduler/internal/scheduler/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,11 +440,8 @@ func (r *Reconciler) applyDesiredState(ctx context.Context, workload models.Work
workload.Metadata[workloadReapplyRevisionKey] = strings.TrimSpace(workload.RevisionID)
workload.Metadata["lastLaunchTime"] = now

workloadJSON, marshalErr := json.Marshal(workload)
if marshalErr == nil {
if err := r.scheduler.RetryableEtcdPut("/workloads/"+workload.ID, string(workloadJSON)); err != nil {
return action, fmt.Errorf("persist apply metadata: %w", err)
}
if err := r.scheduler.saveWorkload(workload); err != nil {
return action, fmt.Errorf("persist apply metadata: %w", err)
}

return action, nil
Expand Down Expand Up @@ -545,13 +542,29 @@ func (r *Reconciler) updateWorkloadReconciliationStatus(workloadID string, resul
workload.Metadata["lastReconciliationSuccess"] = result.Success
workload.Metadata["reconciliationRetryCount"] = result.RetryCount

workloadJSON, err := json.Marshal(workload)
if err != nil {
reconcilerLogger.WithError(err).WithField("workload_id", workloadID).Warn("failed to marshal workload during reconciliation status update")
// High-churn reconciliation metadata should live in Redis when available.
if r.scheduler.redisClient != nil {
ttl := 24 * time.Hour
if r.scheduler.cfg != nil && r.scheduler.cfg.RedisReconcileTTL > 0 {
ttl = r.scheduler.cfg.RedisReconcileTTL
}
statusKey := fmt.Sprintf("workload:%s:reconcile_status", workloadID)
if payload, mErr := json.Marshal(workload.Metadata); mErr == nil {
if err := r.scheduler.redisClient.Set(context.Background(), statusKey, payload, ttl).Err(); err == nil {
return
}
}
}

currentLastAction, _ := workload.Metadata["last_action"].(string)
if result.Action == "NoAction" && currentLastAction == "NoAction" {
return
}
if err := r.scheduler.RetryableEtcdPut("/workloads/"+workloadID, string(workloadJSON)); err != nil {
workload.Metadata["last_action"] = result.Action

if err := r.scheduler.saveWorkload(workload); err != nil {
reconcilerLogger.WithError(err).WithField("workload_id", workloadID).Warn("failed to persist reconciliation status")
return
}
}

Expand Down
92 changes: 92 additions & 0 deletions persys-scheduler/internal/scheduler/redis_store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package scheduler

import (
"context"
"encoding/json"
"fmt"
"time"

"github.com/persys-dev/persys-cloud/persys-scheduler/internal/logging"
"github.com/redis/go-redis/v9"
"github.com/sirupsen/logrus"
)

var redisLogger = logging.C("scheduler.redis")

func (s *Scheduler) initRedisStore() {
if s.cfg == nil || s.cfg.RedisAddr == "" {
return
}
client := redis.NewClient(&redis.Options{
Addr: s.cfg.RedisAddr,
Password: s.cfg.RedisPassword,
DB: s.cfg.RedisDB,
})
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
if err := client.Ping(ctx).Err(); err != nil {
redisLogger.WithError(err).Warn("redis configured but unavailable; falling back to etcd for reconciliation telemetry")
_ = client.Close()
return
}
s.redisClient = client
redisLogger.WithField("addr", s.cfg.RedisAddr).Info("redis telemetry store enabled")
}

func (s *Scheduler) writeReconciliationTelemetry(workloadID, action string, success bool, reason string, attemptedAt time.Time) {
rec := map[string]interface{}{
"workloadId": workloadID,
"action": action,
"success": success,
"reason": reason,
"attemptedAt": attemptedAt.UTC().Format(time.RFC3339Nano),
}
payload, err := json.Marshal(rec)
if err != nil {
return
}
if s.redisClient != nil {
ttl := 24 * time.Hour
if s.cfg != nil && s.cfg.RedisReconcileTTL > 0 {
ttl = s.cfg.RedisReconcileTTL
}
key := fmt.Sprintf("reconciliation:%s", workloadID)
historyKey := "reconciliation:history"
if err := s.redisClient.Set(context.Background(), key, payload, ttl).Err(); err == nil {
maxEntries := int64(2000)
if s.cfg != nil && s.cfg.RedisEventMaxEntries > 0 {
maxEntries = s.cfg.RedisEventMaxEntries
}
pipe := s.redisClient.TxPipeline()
pipe.LPush(context.Background(), historyKey, payload)
pipe.LTrim(context.Background(), historyKey, 0, maxEntries-1)
pipe.Expire(context.Background(), historyKey, ttl)
_, _ = pipe.Exec(context.Background())
return
}
redisLogger.WithError(err).WithFields(logrus.Fields{"key": key}).Warn("failed writing reconciliation telemetry to redis")
}
_ = s.RetryableEtcdPut(reconciliationKey(workloadID), string(payload))
}

func (s *Scheduler) writeEventTelemetry(payload []byte) bool {
if s.redisClient == nil {
return false
}
ttl := 24 * time.Hour
maxEntries := int64(2000)
if s.cfg != nil {
if s.cfg.RedisEventTTL > 0 {
ttl = s.cfg.RedisEventTTL
}
if s.cfg.RedisEventMaxEntries > 0 {
maxEntries = s.cfg.RedisEventMaxEntries
}
}
pipe := s.redisClient.TxPipeline()
pipe.LPush(context.Background(), "events:history", payload)
pipe.LTrim(context.Background(), "events:history", 0, maxEntries-1)
pipe.Expire(context.Background(), "events:history", ttl)
_, err := pipe.Exec(context.Background())
return err == nil
}
Loading
Loading