A worker lifecycle library for Go — manage background goroutines with panic recovery, configurable restart, tracing, and structured shutdown.
Built on suture for Erlang-style supervisor trees. Part of the ColdBrew framework.
import "github.com/go-coldbrew/workers"Package workers provides a worker lifecycle library for Go, built on thejerf/suture. It manages background goroutines with automatic panic recovery, configurable restart with backoff, and structured shutdown.
Every worker runs inside its own supervisor subtree. This means:
- Each worker gets panic recovery and restart independently
- Workers can dynamically spawn child workers via WorkerInfo
- When a parent worker stops, all its children stop (scoped lifecycle)
- The supervisor tree prevents cascading failures and CPU-burn restart storms
Create workers with NewWorker and run them with Run:
workers.Run(ctx, []*workers.Worker{
workers.NewWorker("kafka").HandlerFunc(consume),
workers.NewWorker("cleanup").HandlerFunc(cleanup).Every(5 * time.Minute),
})
For long-running workers (no Worker.Every): the handler should block until ctx is cancelled, then return ctx.Err().
For periodic workers (with Worker.Every): the handler runs once per tick and should return quickly. Returning nil means success (the next tick will fire). Returning an error triggers restart (if enabled) or stops the worker.
Returning nil from a non-periodic handler stops the worker permanently, even with restart enabled. Use ErrDoNotRestart for explicit permanent completion from periodic handlers.
The return value from a periodic handler determines what happens next:
| Return value | Timer loop | Restart? | Use case |
|-----------------|------------|----------|--------------------------|
| nil | continues | n/a | Success, next tick fires |
| ErrSkipTick | continues | n/a | Transient failure, skip |
| ErrDoNotRestart | exits | no | Permanent completion |
| other error | exits | yes* | Failure, needs restart |
| ctx.Err() | exits | no | Graceful shutdown |
*Only if Worker.WithRestart(true) (the default).
Cross-cutting concerns like tracing, logging, and panic recovery are implemented as Middleware. The middleware chain follows the gRPC interceptor convention: a flat function that calls next to continue:
func myMiddleware(ctx context.Context, info *workers.WorkerInfo, next workers.CycleFunc) error {
// before
err := next(ctx, info)
// after
return err
}
Attach middleware per-worker via Worker.Interceptors or per-run via WithInterceptors. Built-in middleware is available in the middleware/ sub-package.
Run-level interceptors (WithInterceptors) wrap all workers and are best for cross-cutting defaults (tracing, logging, panic recovery). Worker-level interceptors (Worker.Interceptors) are best for worker-specific concerns (distributed locks with per-worker TTL, rate limiting). Children inherit run-level interceptors but not the parent's worker-level interceptors.
Common patterns are provided as helpers:
- EveryInterval — periodic execution on a fixed interval
- ChannelWorker — consume items from a channel one at a time
- BatchChannelWorker — collect items into batches, flush on size or timer
Manager workers can spawn and remove child workers at runtime using the Add, Remove, and GetChildren methods on WorkerInfo. Children join the parent's supervisor subtree and get full framework guarantees (panic recovery, restart). See [Example_dynamicWorkerPool].
Example (Dynamic Worker Pool)
Simulates a config-driven worker pool manager that reconciles desired workers against running workers on each tick.
package main
import (
"context"
"fmt"
"time"
"github.com/go-coldbrew/workers"
)
func main() {
// Simulate config that changes over 3 ticks.
// Tick 1: start worker-a
// Tick 2: add worker-b
// Tick 3: remove worker-a
configs := [][]string{
{"worker-a"},
{"worker-a", "worker-b"},
{"worker-b"},
}
tick := 0
manager := workers.NewWorker("pool-manager").HandlerFunc(func(ctx context.Context, info *workers.WorkerInfo) error {
ticker := time.NewTicker(40 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
if tick >= len(configs) {
continue
}
desired := map[string]bool{}
for _, name := range configs[tick] {
desired[name] = true
}
tick++
running := map[string]bool{}
for _, name := range info.GetChildren() {
running[name] = true
}
// Remove workers no longer desired.
for name := range running {
if !desired[name] {
info.Remove(name)
}
}
// Add workers that aren't already running.
for name := range desired {
if !running[name] {
info.Add(workers.NewWorker(name).HandlerFunc(func(ctx context.Context, _ *workers.WorkerInfo) error {
<-ctx.Done()
return ctx.Err()
}))
}
}
time.Sleep(10 * time.Millisecond) // let children start
fmt.Printf("tick %d: children=%v\n", tick, info.GetChildren())
}
}
})
ctx, cancel := context.WithTimeout(context.Background(), 250*time.Millisecond)
defer cancel()
workers.Run(ctx, []*workers.Worker{manager})
fmt.Println("pool shut down")
}tick 1: children=[worker-a]
tick 2: children=[worker-a worker-b]
tick 3: children=[worker-b]
pool shut down
Example (Reconciler With Change Detection)
Demonstrates config-driven reconciliation with change detection using the handler-as-metadata pattern. The handler struct carries a config version that the reconciler inspects via GetChild().GetHandler() type assertion, eliminating the need for a parallel tracking map.
package main
import (
"context"
"fmt"
"time"
"github.com/go-coldbrew/workers"
)
// solverHandler is used in Example_reconcilerWithChangeDetection to
// demonstrate the handler-as-metadata pattern.
type solverHandler struct {
version int
}
func (h *solverHandler) RunCycle(ctx context.Context, _ *workers.WorkerInfo) error {
<-ctx.Done()
return ctx.Err()
}
func (h *solverHandler) Close() error { return nil }
func main() {
type solverConfig struct {
version int
}
// Simulate config that changes over 3 ticks.
configs := []map[string]solverConfig{
{"a": {version: 1}},
{"a": {version: 1}, "b": {version: 1}},
{"a": {version: 2}, "b": {version: 1}}, // a gets new version
}
tick := 0
manager := workers.NewWorker("reconciler").HandlerFunc(func(ctx context.Context, info *workers.WorkerInfo) error {
ticker := time.NewTicker(40 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
if tick >= len(configs) {
continue
}
desired := configs[tick]
tick++
// Remove workers no longer desired.
for _, name := range info.GetChildren() {
if _, ok := desired[name]; !ok {
info.Remove(name)
}
}
// Add new or replace changed workers.
for key, cfg := range desired {
child, exists := info.GetChild(key)
if exists {
// Check if config changed via handler type assertion.
if h, ok := child.GetHandler().(*solverHandler); ok && h.version == cfg.version {
continue // unchanged, skip
}
info.Remove(key) // config changed, replace
time.Sleep(10 * time.Millisecond) // let old worker stop
}
info.Add(workers.NewWorker(key).Handler(&solverHandler{version: cfg.version}))
}
time.Sleep(10 * time.Millisecond)
fmt.Printf("tick %d: children=%v count=%d\n", tick, info.GetChildren(), len(info.GetChildren()))
}
}
})
ctx, cancel := context.WithTimeout(context.Background(), 250*time.Millisecond)
defer cancel()
workers.Run(ctx, []*workers.Worker{manager})
}tick 1: children=[a] count=1
tick 2: children=[a b] count=2
tick 3: children=[a b] count=2
Example (Standalone)
Standalone usage with signal handling — no ColdBrew required.
package main
import (
"context"
"fmt"
"time"
"github.com/go-coldbrew/workers"
)
func main() {
// In production you'd use signal.NotifyContext(ctx, os.Interrupt).
// For the example, use a short timeout.
ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
defer cancel()
workers.Run(ctx, []*workers.Worker{
workers.NewWorker("kafka").HandlerFunc(func(ctx context.Context, _ *workers.WorkerInfo) error {
fmt.Println("consuming messages")
<-ctx.Done()
return ctx.Err()
}),
})
fmt.Println("shutdown complete")
}consuming messages
shutdown complete
- Variables
- func Run(ctx context.Context, workers []*Worker, opts ...RunOption) error
- func RunWorker(ctx context.Context, w *Worker, opts ...RunOption)
- type BaseMetrics
- func (BaseMetrics) ObserveRunDuration(string, time.Duration)
- func (BaseMetrics) SetActiveWorkers(int)
- func (BaseMetrics) WorkerFailed(string, error)
- func (BaseMetrics) WorkerPanicked(string)
- func (BaseMetrics) WorkerRestarted(string, int)
- func (BaseMetrics) WorkerStarted(string)
- func (BaseMetrics) WorkerStopped(string)
- type CycleFunc
- func BatchChannelWorker[T any](ch <-chan T, maxSize int, maxDelay time.Duration, fn func(ctx context.Context, info *WorkerInfo, batch []T) error) CycleFunc
- func ChannelWorker[T any](ch <-chan T, fn func(ctx context.Context, info *WorkerInfo, item T) error) CycleFunc
- func EveryInterval(d time.Duration, fn CycleFunc) CycleFunc
- func (fn CycleFunc) Close() error
- func (fn CycleFunc) RunCycle(ctx context.Context, info *WorkerInfo) error
- type CycleHandler
- type Metrics
- type Middleware
- type RunOption
- type Worker
- func NewWorker(name string) *Worker
- func (w *Worker) AddInterceptors(mw ...Middleware) *Worker
- func (w *Worker) Every(d time.Duration) *Worker
- func (w *Worker) GetHandler() CycleHandler
- func (w *Worker) GetInitialDelay() time.Duration
- func (w *Worker) GetInterval() time.Duration
- func (w *Worker) GetJitterPercent() int
- func (w *Worker) GetName() string
- func (w *Worker) GetRestartOnFail() bool
- func (w *Worker) Handler(h CycleHandler) *Worker
- func (w *Worker) HandlerFunc(fn CycleFunc) *Worker
- func (w *Worker) Interceptors(mw ...Middleware) *Worker
- func (w *Worker) WithBackoffJitter(jitter func(time.Duration) time.Duration) *Worker
- func (w *Worker) WithFailureBackoff(d time.Duration) *Worker
- func (w *Worker) WithFailureDecay(decay float64) *Worker
- func (w *Worker) WithFailureThreshold(threshold float64) *Worker
- func (w *Worker) WithInitialDelay(d time.Duration) *Worker
- func (w *Worker) WithJitter(percent int) *Worker
- func (w *Worker) WithMetrics(m Metrics) *Worker
- func (w *Worker) WithRestart(restart bool) *Worker
- func (w *Worker) WithTimeout(d time.Duration) *Worker
- type WorkerInfo
- func NewWorkerInfo(name string, attempt int, opts ...WorkerInfoOption) *WorkerInfo
- func (info *WorkerInfo) Add(w *Worker) bool
- func (info *WorkerInfo) GetAttempt() int
- func (info *WorkerInfo) GetChild(name string) (Worker, bool)
- func (info *WorkerInfo) GetChildCount() int
- func (info *WorkerInfo) GetChildren() []string
- func (info *WorkerInfo) GetHandler() CycleHandler
- func (info *WorkerInfo) GetName() string
- func (info *WorkerInfo) Remove(name string)
- type WorkerInfoOption
ErrDoNotRestart can be returned from a handler to signal that the worker should not be restarted, even when restart is enabled. Use this for permanent completion (e.g., channel closed, work exhausted).
var ErrDoNotRestart = suture.ErrDoNotRestartErrSkipTick can be returned from a periodic handler to skip the current tick without triggering restart. The timer continues and the next tick fires normally. Only meaningful for periodic workers (with Worker.Every).
var ErrSkipTick = errors.New("workers: skip tick")func Run
func Run(ctx context.Context, workers []*Worker, opts ...RunOption) errorRun starts all workers under a suture supervisor and blocks until ctx is cancelled and all workers have exited. Each worker gets its own child supervisor — when a worker stops, its children stop too. A worker exiting early (without restart) does not stop other workers. Returns nil on clean shutdown.
Example
Run multiple workers concurrently. All workers start together and stop when the context is cancelled.
package main
import (
"context"
"fmt"
"time"
"github.com/go-coldbrew/workers"
)
func main() {
w1 := workers.NewWorker("api-poller").HandlerFunc(func(ctx context.Context, _ *workers.WorkerInfo) error {
fmt.Println("api-poller started")
<-ctx.Done()
return ctx.Err()
})
w2 := workers.NewWorker("cache-warmer").HandlerFunc(func(ctx context.Context, _ *workers.WorkerInfo) error {
fmt.Println("cache-warmer started")
<-ctx.Done()
return ctx.Err()
})
ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
defer cancel()
workers.Run(ctx, []*workers.Worker{w1, w2})
fmt.Println("all workers stopped")
}api-poller started
cache-warmer started
all workers stopped
func RunWorker
func RunWorker(ctx context.Context, w *Worker, opts ...RunOption)RunWorker runs a single worker with panic recovery and optional restart. Blocks until ctx is cancelled or the worker exits without restart. Unlike Run, RunWorker discards the error. Use Run if you need the error.
Example
RunWorker runs a single worker — useful for dynamic managers that spawn child workers in their own goroutines.
package main
import (
"context"
"fmt"
"time"
"github.com/go-coldbrew/workers"
)
func main() {
w := workers.NewWorker("single").HandlerFunc(func(ctx context.Context, _ *workers.WorkerInfo) error {
fmt.Println("running")
<-ctx.Done()
return ctx.Err()
})
ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
defer cancel()
workers.RunWorker(ctx, w)
fmt.Println("done")
}running
done
type BaseMetrics
BaseMetrics provides no-op implementations of all Metrics methods. Embed it in custom Metrics implementations so that new methods added to the Metrics interface in future versions get safe no-op defaults instead of breaking your build:
type myMetrics struct {
workers.BaseMetrics // forward-compatible
client *statsd.Client
}
func (m *myMetrics) WorkerStarted(name string) {
m.client.Incr("worker.started", []string{"worker:" + name}, 1)
}
type BaseMetrics struct{}func (BaseMetrics) ObserveRunDuration
func (BaseMetrics) ObserveRunDuration(string, time.Duration)func (BaseMetrics) SetActiveWorkers
func (BaseMetrics) SetActiveWorkers(int)func (BaseMetrics) WorkerFailed
func (BaseMetrics) WorkerFailed(string, error)func (BaseMetrics) WorkerPanicked
func (BaseMetrics) WorkerPanicked(string)func (BaseMetrics) WorkerRestarted
func (BaseMetrics) WorkerRestarted(string, int)func (BaseMetrics) WorkerStarted
func (BaseMetrics) WorkerStarted(string)func (BaseMetrics) WorkerStopped
func (BaseMetrics) WorkerStopped(string)type CycleFunc
CycleFunc adapts a plain function into a CycleHandler. Close is a no-op — use this for simple, stateless handlers.
type CycleFunc func(ctx context.Context, info *WorkerInfo) errorfunc BatchChannelWorker
func BatchChannelWorker[T any](ch <-chan T, maxSize int, maxDelay time.Duration, fn func(ctx context.Context, info *WorkerInfo, batch []T) error) CycleFuncBatchChannelWorker collects items from ch into batches and calls fn when either the batch reaches maxSize or maxDelay elapses since the first item in the current batch — whichever comes first. Flushes any partial batch on context cancellation or channel close before returning.
Example
BatchChannelWorker collects items into batches and flushes on maxSize or maxDelay — whichever comes first.
package main
import (
"context"
"fmt"
"time"
"github.com/go-coldbrew/workers"
)
func main() {
ch := make(chan int, 10)
for i := 1; i <= 6; i++ {
ch <- i
}
close(ch)
fn := workers.BatchChannelWorker(ch, 3, time.Hour, func(_ context.Context, _ *workers.WorkerInfo, batch []int) error {
fmt.Println(batch)
return nil
})
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
w := workers.NewWorker("batcher").HandlerFunc(fn)
workers.Run(ctx, []*workers.Worker{w})
}[1 2 3]
[4 5 6]
func ChannelWorker
func ChannelWorker[T any](ch <-chan T, fn func(ctx context.Context, info *WorkerInfo, item T) error) CycleFuncChannelWorker consumes items from ch one at a time, calling fn for each. Returns when ctx is cancelled or ch is closed.
Example
ChannelWorker consumes items from a channel one at a time.
package main
import (
"context"
"fmt"
"time"
"github.com/go-coldbrew/workers"
)
func main() {
ch := make(chan string, 3)
ch <- "hello"
ch <- "world"
ch <- "!"
close(ch)
fn := workers.ChannelWorker(ch, func(_ context.Context, _ *workers.WorkerInfo, item string) error {
fmt.Println(item)
return nil
})
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
w := workers.NewWorker("consumer").HandlerFunc(fn)
workers.Run(ctx, []*workers.Worker{w})
}hello
world
!
func EveryInterval
func EveryInterval(d time.Duration, fn CycleFunc) CycleFuncEveryInterval wraps fn in a timer loop that calls fn at the given interval. Returns when ctx is cancelled. If fn returns an error, EveryInterval returns that error (the supervisor decides whether to restart based on Worker.WithRestart).
Example
EveryInterval wraps a function in a timer loop.
package main
import (
"context"
"fmt"
"time"
"github.com/go-coldbrew/workers"
)
func main() {
count := 0
fn := workers.EveryInterval(20*time.Millisecond, func(_ context.Context, _ *workers.WorkerInfo) error {
count++
fmt.Printf("tick %d\n", count)
return nil
})
w := workers.NewWorker("periodic").HandlerFunc(fn)
ctx, cancel := context.WithTimeout(context.Background(), 55*time.Millisecond)
defer cancel()
workers.Run(ctx, []*workers.Worker{w})
}tick 1
tick 2
func (CycleFunc) Close
func (fn CycleFunc) Close() errorClose is a no-op for CycleFunc.
func (CycleFunc) RunCycle
func (fn CycleFunc) RunCycle(ctx context.Context, info *WorkerInfo) errortype CycleHandler
CycleHandler handles worker execution cycles. For periodic workers, RunCycle is called once per tick. Close is called once when the worker stops, allowing cleanup of resources.
type CycleHandler interface {
RunCycle(ctx context.Context, info *WorkerInfo) error
Close() error
}type Metrics
Metrics collects worker lifecycle metrics. Implement this interface to provide custom metrics (e.g., Datadog, StatsD). Use BaseMetrics{} to disable metrics, or NewPrometheusMetrics for the built-in Prometheus implementation.
type Metrics interface {
WorkerStarted(name string)
WorkerStopped(name string)
WorkerPanicked(name string)
WorkerFailed(name string, err error)
WorkerRestarted(name string, attempt int)
// ObserveRunDuration records the attempt lifetime — the duration from
// when the worker started to when it stopped or failed. For per-cycle
// timing, use middleware.Duration instead.
ObserveRunDuration(name string, duration time.Duration)
SetActiveWorkers(count int)
}func NewPrometheusMetrics
func NewPrometheusMetrics(namespace string) MetricsNewPrometheusMetrics creates a Metrics implementation backed by Prometheus. The namespace is prepended to all metric names (e.g., "myapp" → "myapp_worker_started_total"). Metrics are auto-registered with the default Prometheus registry. Safe to call multiple times with the same namespace — returns the cached instance. The cache is process-global; use a small number of static namespaces (not per-request/tenant values).
type Middleware
Middleware intercepts each execution cycle. Call next to continue the chain. Matches gRPC interceptor convention.
type Middleware func(ctx context.Context, info *WorkerInfo, next CycleFunc) errortype RunOption
RunOption configures the behavior of Run.
type RunOption func(*runConfig)func AddInterceptors
func AddInterceptors(mw ...Middleware) RunOptionAddInterceptors appends to the run-level interceptor list.
func WithDefaultJitter
func WithDefaultJitter(percent int) RunOptionWithDefaultJitter sets a run-level default jitter percentage for all periodic workers. Worker-level Worker.WithJitter takes precedence. Setting Worker.WithJitter(0) disables jitter for a specific worker even when a run-level default is set.
func WithInterceptors
func WithInterceptors(mw ...Middleware) RunOptionWithInterceptors replaces the run-level interceptor list. Run-level interceptors wrap outside worker-level interceptors.
func WithMetrics
func WithMetrics(m Metrics) RunOptionWithMetrics sets the metrics implementation for all workers started by Run. Workers inherit this unless they override via Worker.WithMetrics. If not set, BaseMetrics is used.
type Worker
Worker represents a background goroutine managed by the framework. Create with NewWorker and configure with builder methods.
type Worker struct {
// contains filtered or unexported fields
}func NewWorker
func NewWorker(name string) *WorkerNewWorker creates a Worker with the given name. Set the handler via Worker.Handler or Worker.HandlerFunc.
Example
A simple worker that runs until cancelled.
package main
import (
"context"
"fmt"
"time"
"github.com/go-coldbrew/workers"
)
func main() {
w := workers.NewWorker("greeter").HandlerFunc(func(ctx context.Context, info *workers.WorkerInfo) error {
fmt.Printf("worker %q started (attempt %d)\n", info.GetName(), info.GetAttempt())
<-ctx.Done()
return ctx.Err()
})
ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
defer cancel()
workers.Run(ctx, []*workers.Worker{w})
}worker "greeter" started (attempt 0)
func (*Worker) AddInterceptors
func (w *Worker) AddInterceptors(mw ...Middleware) *WorkerAddInterceptors appends to the worker-level interceptor list.
func (*Worker) Every
func (w *Worker) Every(d time.Duration) *WorkerEvery configures the worker to run periodically at the given interval. The interval is stored as data — wrapping is deferred to startup where jitter configuration is resolved.
Example
A periodic worker that runs a function on a fixed interval.
package main
import (
"context"
"fmt"
"time"
"github.com/go-coldbrew/workers"
)
func main() {
count := 0
w := workers.NewWorker("ticker").HandlerFunc(func(_ context.Context, _ *workers.WorkerInfo) error {
count++
fmt.Printf("tick %d\n", count)
return nil
}).Every(20 * time.Millisecond)
ctx, cancel := context.WithTimeout(context.Background(), 55*time.Millisecond)
defer cancel()
workers.Run(ctx, []*workers.Worker{w})
}tick 1
tick 2
func (*Worker) GetHandler
func (w *Worker) GetHandler() CycleHandlerGetHandler returns the worker's CycleHandler, or nil if not set.
func (*Worker) GetInitialDelay
func (w *Worker) GetInitialDelay() time.DurationGetInitialDelay returns the initial delay before the first tick, or 0 if not set.
func (*Worker) GetInterval
func (w *Worker) GetInterval() time.DurationGetInterval returns the periodic interval, or 0 if this is not a periodic worker.
func (*Worker) GetJitterPercent
func (w *Worker) GetJitterPercent() intGetJitterPercent returns the jitter percentage. -1 means inherit run-level default, 0 means no jitter.
func (*Worker) GetName
func (w *Worker) GetName() stringGetName returns the worker's name.
func (*Worker) GetRestartOnFail
func (w *Worker) GetRestartOnFail() boolGetRestartOnFail returns whether the worker restarts on failure.
func (*Worker) Handler
func (w *Worker) Handler(h CycleHandler) *WorkerHandler sets the worker's CycleHandler. Use this for handlers that need cleanup via Close (e.g., database connections, leases).
func (*Worker) HandlerFunc
func (w *Worker) HandlerFunc(fn CycleFunc) *WorkerHandlerFunc sets the worker's handler from a plain function. This is the common case for simple, stateless workers.
func (*Worker) Interceptors
func (w *Worker) Interceptors(mw ...Middleware) *WorkerInterceptors replaces the worker-level interceptor list.
Example
Per-worker middleware using the interceptor pattern.
package main
import (
"context"
"fmt"
"time"
"github.com/go-coldbrew/workers"
)
func main() {
loggingMW := func(ctx context.Context, info *workers.WorkerInfo, next workers.CycleFunc) error {
fmt.Printf("[%s] cycle start\n", info.GetName())
err := next(ctx, info)
fmt.Printf("[%s] cycle end\n", info.GetName())
return err
}
w := workers.NewWorker("with-logging").
HandlerFunc(func(_ context.Context, info *workers.WorkerInfo) error {
fmt.Printf("[%s] doing work\n", info.GetName())
return nil
}).
Every(20 * time.Millisecond).
Interceptors(loggingMW)
ctx, cancel := context.WithTimeout(context.Background(), 35*time.Millisecond)
defer cancel()
workers.Run(ctx, []*workers.Worker{w})
}[with-logging] cycle start
[with-logging] doing work
[with-logging] cycle end
func (*Worker) WithBackoffJitter
func (w *Worker) WithBackoffJitter(jitter func(time.Duration) time.Duration) *WorkerWithBackoffJitter adds random jitter to the backoff duration to prevent thundering herd on coordinated restarts. The function receives the base backoff duration and returns a jittered duration.
func (*Worker) WithFailureBackoff
func (w *Worker) WithFailureBackoff(d time.Duration) *WorkerWithFailureBackoff sets the duration to wait between restarts. Suture default is 15 seconds.
func (*Worker) WithFailureDecay
func (w *Worker) WithFailureDecay(decay float64) *WorkerWithFailureDecay sets the rate at which failure count decays over time. A value of 1.0 means failures decay by one per second. Suture default is 1.0.
func (*Worker) WithFailureThreshold
func (w *Worker) WithFailureThreshold(threshold float64) *WorkerWithFailureThreshold sets the number of failures allowed before the supervisor gives up restarting. Suture default is 5.
func (*Worker) WithInitialDelay
func (w *Worker) WithInitialDelay(d time.Duration) *WorkerWithInitialDelay delays the first tick to stagger startup. Requires Worker.Every.
func (*Worker) WithJitter
func (w *Worker) WithJitter(percent int) *WorkerWithJitter sets per-worker jitter as a percentage of the base interval. Each tick is randomized within ±percent of the base. Requires Worker.Every. Setting WithJitter(0) explicitly disables jitter even when a run-level default is set via WithDefaultJitter.
func (*Worker) WithMetrics
func (w *Worker) WithMetrics(m Metrics) *WorkerWithMetrics sets a per-worker metrics implementation, overriding the metrics inherited from the parent WorkerInfo or Run options.
func (*Worker) WithRestart
func (w *Worker) WithRestart(restart bool) *WorkerWithRestart configures whether the worker should be restarted on failure. Default is true. Set to false for one-shot workers that should exit after completion or failure. Note: a handler returning nil always stops the worker permanently, regardless of this setting.
Periodic workers (with Worker.Every) should generally keep the default (restart enabled). Use ErrSkipTick for transient failures and ErrDoNotRestart for permanent completion instead of disabling restart.
Example
A worker with automatic restart on failure. The supervisor logs restart events; the worker succeeds on the third attempt.
package main
import (
"context"
"fmt"
"time"
"github.com/go-coldbrew/workers"
)
func main() {
attempt := 0
w := workers.NewWorker("resilient").HandlerFunc(func(ctx context.Context, _ *workers.WorkerInfo) error {
attempt++
if attempt <= 2 {
return fmt.Errorf("transient error")
}
fmt.Printf("succeeded on attempt %d\n", attempt)
<-ctx.Done()
return ctx.Err()
}).WithRestart(true)
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
workers.Run(ctx, []*workers.Worker{w})
// This example demonstrates restart behavior. Log output from the
// supervisor is expected between restarts. The worker prints on success.
}func (*Worker) WithTimeout
func (w *Worker) WithTimeout(d time.Duration) *WorkerWithTimeout sets the maximum time to wait for the worker to stop during graceful shutdown. Suture default is 10 seconds.
type WorkerInfo
WorkerInfo carries worker metadata and child management. The framework always creates it — it is never nil. context.Context handles cancellation/deadlines/values; WorkerInfo handles everything worker-specific.
type WorkerInfo struct {
// contains filtered or unexported fields
}func NewWorkerInfo
func NewWorkerInfo(name string, attempt int, opts ...WorkerInfoOption) *WorkerInfoNewWorkerInfo creates a WorkerInfo with the given name and attempt. This is useful for testing middleware and handlers — the framework creates fully populated instances internally.
Use WithTestChildren to enable Add/Remove/GetChildren in tests.
func (*WorkerInfo) Add
func (info *WorkerInfo) Add(w *Worker) boolAdd starts a child worker under this worker's supervisor subtree. Returns true if the worker was added, false if a worker with the same name is already running (no-op). To replace a running worker, call WorkerInfo.Remove first then Add.
Note: Remove + Add is not atomic — there is a brief window where the worker is not running. For most reconciliation patterns this is fine.
Children inherit run-level interceptors, metrics (unless overridden via Worker.WithMetrics), and scoped lifecycle — when this worker stops, all its children stop too.
When a child permanently stops, it is automatically removed from the children map on the next call to WorkerInfo.Add, WorkerInfo.GetChildren, WorkerInfo.GetChild, or WorkerInfo.GetChildCount.
Example
A manager worker that dynamically spawns and removes child workers using WorkerInfo.Add, Remove, and GetChildren.
package main
import (
"context"
"fmt"
"time"
"github.com/go-coldbrew/workers"
)
func main() {
manager := workers.NewWorker("manager").HandlerFunc(func(ctx context.Context, info *workers.WorkerInfo) error {
// Spawn two child workers dynamically.
info.Add(workers.NewWorker("child-a").HandlerFunc(func(ctx context.Context, childInfo *workers.WorkerInfo) error {
fmt.Printf("%s started\n", childInfo.GetName())
<-ctx.Done()
return ctx.Err()
}))
info.Add(workers.NewWorker("child-b").HandlerFunc(func(ctx context.Context, childInfo *workers.WorkerInfo) error {
fmt.Printf("%s started\n", childInfo.GetName())
<-ctx.Done()
return ctx.Err()
}))
// Give children time to start.
time.Sleep(30 * time.Millisecond)
fmt.Printf("children: %v\n", info.GetChildren())
// Remove one child.
info.Remove("child-a")
time.Sleep(30 * time.Millisecond)
fmt.Printf("after remove: %v\n", info.GetChildren())
<-ctx.Done()
return ctx.Err()
})
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel()
workers.Run(ctx, []*workers.Worker{manager})
}child-a started
child-b started
children: [child-a child-b]
after remove: [child-b]
Example (Replace)
Replace a child worker using Remove + Add. Add is a no-op if the name exists, so Remove first.
package main
import (
"context"
"fmt"
"time"
"github.com/go-coldbrew/workers"
)
func main() {
manager := workers.NewWorker("manager").HandlerFunc(func(ctx context.Context, info *workers.WorkerInfo) error {
info.Add(workers.NewWorker("processor").HandlerFunc(func(ctx context.Context, _ *workers.WorkerInfo) error {
fmt.Println("processor v1")
<-ctx.Done()
return ctx.Err()
}))
time.Sleep(30 * time.Millisecond)
// Replace: Remove the old worker, then Add the new one.
info.Remove("processor")
time.Sleep(10 * time.Millisecond) // brief gap while old stops
info.Add(workers.NewWorker("processor").HandlerFunc(func(ctx context.Context, _ *workers.WorkerInfo) error {
fmt.Println("processor v2")
<-ctx.Done()
return ctx.Err()
}))
time.Sleep(30 * time.Millisecond)
<-ctx.Done()
return ctx.Err()
})
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel()
workers.Run(ctx, []*workers.Worker{manager})
}processor v1
processor v2
func (*WorkerInfo) GetAttempt
func (info *WorkerInfo) GetAttempt() intGetAttempt returns the restart attempt number (0 on first run).
func (*WorkerInfo) GetChild
func (info *WorkerInfo) GetChild(name string) (Worker, bool)GetChild returns a copy of a running child worker and true, or the zero value and false if not found. The returned value is a snapshot — mutations to the Worker fields have no effect on the running worker.
The CycleHandler (accessible via Worker.GetHandler) is shared with the running worker, not copied. Use type assertion to inspect handler state (e.g., config versions for reconciliation). See [Example_reconcilerWithChangeDetection].
func (*WorkerInfo) GetChildCount
func (info *WorkerInfo) GetChildCount() intGetChildCount returns the number of currently running child workers. This is more efficient than calling WorkerInfo.GetChildren and taking len, as it avoids allocating a sorted slice. Stopped children are lazily pruned.
func (*WorkerInfo) GetChildren
func (info *WorkerInfo) GetChildren() []stringGetChildren returns the names of currently running child workers. Stopped children are lazily pruned before building the list.
func (*WorkerInfo) GetHandler
func (info *WorkerInfo) GetHandler() CycleHandlerGetHandler returns the worker's CycleHandler, or nil if not set. Use type assertion to access handler-specific state or interfaces:
if h, ok := info.GetHandler().(MyHandler); ok {
// access h.Config, h.Version, etc.
}
func (*WorkerInfo) GetName
func (info *WorkerInfo) GetName() stringGetName returns the worker's name as passed to NewWorker.
func (*WorkerInfo) Remove
func (info *WorkerInfo) Remove(name string)Remove stops a child worker by name.
type WorkerInfoOption
WorkerInfoOption configures a WorkerInfo created by NewWorkerInfo.
type WorkerInfoOption func(*WorkerInfo)func WithTestChildren
func WithTestChildren(ctx context.Context) WorkerInfoOptionWithTestChildren creates an internal supervisor so that Add, Remove, and GetChildren work in tests without calling Run. The caller must cancel the returned context when the test is done to stop the supervisor.
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
info := workers.NewWorkerInfo("test", 0, workers.WithTestChildren(ctx))
func WithTestHandler
func WithTestHandler(h CycleHandler) WorkerInfoOptionWithTestHandler sets the handler on a test WorkerInfo so that WorkerInfo.GetHandler works in unit tests.
Generated by gomarkdoc