Skip to content

go-coldbrew/workers

Workers

Go Go Report Card GoDoc License

A worker lifecycle library for Go — manage background goroutines with panic recovery, configurable restart, tracing, and structured shutdown.

Built on suture for Erlang-style supervisor trees. Part of the ColdBrew framework.


API Reference

workers

import "github.com/go-coldbrew/workers"

Package workers provides a worker lifecycle library for Go, built on thejerf/suture. It manages background goroutines with automatic panic recovery, configurable restart with backoff, and structured shutdown.

Architecture

Every worker runs inside its own supervisor subtree. This means:

  • Each worker gets panic recovery and restart independently
  • Workers can dynamically spawn child workers via WorkerInfo
  • When a parent worker stops, all its children stop (scoped lifecycle)
  • The supervisor tree prevents cascading failures and CPU-burn restart storms

Quick Start

Create workers with NewWorker and run them with Run:

workers.Run(ctx, []*workers.Worker{
    workers.NewWorker("kafka").HandlerFunc(consume),
    workers.NewWorker("cleanup").HandlerFunc(cleanup).Every(5 * time.Minute),
})

Handler Contract

For long-running workers (no Worker.Every): the handler should block until ctx is cancelled, then return ctx.Err().

For periodic workers (with Worker.Every): the handler runs once per tick and should return quickly. Returning nil means success (the next tick will fire). Returning an error triggers restart (if enabled) or stops the worker.

Returning nil from a non-periodic handler stops the worker permanently, even with restart enabled. Use ErrDoNotRestart for explicit permanent completion from periodic handlers.

Error Semantics for Periodic Handlers

The return value from a periodic handler determines what happens next:

| Return value    | Timer loop | Restart? | Use case                 |
|-----------------|------------|----------|--------------------------|
| nil             | continues  | n/a      | Success, next tick fires |
| ErrSkipTick     | continues  | n/a      | Transient failure, skip  |
| ErrDoNotRestart | exits      | no       | Permanent completion     |
| other error     | exits      | yes*     | Failure, needs restart   |
| ctx.Err()       | exits      | no       | Graceful shutdown        |

*Only if Worker.WithRestart(true) (the default).

Middleware

Cross-cutting concerns like tracing, logging, and panic recovery are implemented as Middleware. The middleware chain follows the gRPC interceptor convention: a flat function that calls next to continue:

func myMiddleware(ctx context.Context, info *workers.WorkerInfo, next workers.CycleFunc) error {
    // before
    err := next(ctx, info)
    // after
    return err
}

Attach middleware per-worker via Worker.Interceptors or per-run via WithInterceptors. Built-in middleware is available in the middleware/ sub-package.

Run-level interceptors (WithInterceptors) wrap all workers and are best for cross-cutting defaults (tracing, logging, panic recovery). Worker-level interceptors (Worker.Interceptors) are best for worker-specific concerns (distributed locks with per-worker TTL, rate limiting). Children inherit run-level interceptors but not the parent's worker-level interceptors.

Helpers

Common patterns are provided as helpers:

Dynamic Workers

Manager workers can spawn and remove child workers at runtime using the Add, Remove, and GetChildren methods on WorkerInfo. Children join the parent's supervisor subtree and get full framework guarantees (panic recovery, restart). See [Example_dynamicWorkerPool].

Example (Dynamic Worker Pool)

Simulates a config-driven worker pool manager that reconciles desired workers against running workers on each tick.

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/go-coldbrew/workers"
)

func main() {
	// Simulate config that changes over 3 ticks.
	// Tick 1: start worker-a
	// Tick 2: add worker-b
	// Tick 3: remove worker-a
	configs := [][]string{
		{"worker-a"},
		{"worker-a", "worker-b"},
		{"worker-b"},
	}

	tick := 0
	manager := workers.NewWorker("pool-manager").HandlerFunc(func(ctx context.Context, info *workers.WorkerInfo) error {
		ticker := time.NewTicker(40 * time.Millisecond)
		defer ticker.Stop()
		for {
			select {
			case <-ctx.Done():
				return ctx.Err()
			case <-ticker.C:
				if tick >= len(configs) {
					continue
				}
				desired := map[string]bool{}
				for _, name := range configs[tick] {
					desired[name] = true
				}
				tick++

				running := map[string]bool{}
				for _, name := range info.GetChildren() {
					running[name] = true
				}

				// Remove workers no longer desired.
				for name := range running {
					if !desired[name] {
						info.Remove(name)
					}
				}
				// Add workers that aren't already running.
				for name := range desired {
					if !running[name] {
						info.Add(workers.NewWorker(name).HandlerFunc(func(ctx context.Context, _ *workers.WorkerInfo) error {
							<-ctx.Done()
							return ctx.Err()
						}))
					}
				}
				time.Sleep(10 * time.Millisecond) // let children start
				fmt.Printf("tick %d: children=%v\n", tick, info.GetChildren())
			}
		}
	})

	ctx, cancel := context.WithTimeout(context.Background(), 250*time.Millisecond)
	defer cancel()

	workers.Run(ctx, []*workers.Worker{manager})
	fmt.Println("pool shut down")
}

Output

tick 1: children=[worker-a]
tick 2: children=[worker-a worker-b]
tick 3: children=[worker-b]
pool shut down

Example (Reconciler With Change Detection)

Demonstrates config-driven reconciliation with change detection using the handler-as-metadata pattern. The handler struct carries a config version that the reconciler inspects via GetChild().GetHandler() type assertion, eliminating the need for a parallel tracking map.

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/go-coldbrew/workers"
)

// solverHandler is used in Example_reconcilerWithChangeDetection to
// demonstrate the handler-as-metadata pattern.
type solverHandler struct {
	version int
}

func (h *solverHandler) RunCycle(ctx context.Context, _ *workers.WorkerInfo) error {
	<-ctx.Done()
	return ctx.Err()
}

func (h *solverHandler) Close() error { return nil }

func main() {
	type solverConfig struct {
		version int
	}

	// Simulate config that changes over 3 ticks.
	configs := []map[string]solverConfig{
		{"a": {version: 1}},
		{"a": {version: 1}, "b": {version: 1}},
		{"a": {version: 2}, "b": {version: 1}}, // a gets new version
	}

	tick := 0
	manager := workers.NewWorker("reconciler").HandlerFunc(func(ctx context.Context, info *workers.WorkerInfo) error {
		ticker := time.NewTicker(40 * time.Millisecond)
		defer ticker.Stop()
		for {
			select {
			case <-ctx.Done():
				return ctx.Err()
			case <-ticker.C:
				if tick >= len(configs) {
					continue
				}
				desired := configs[tick]
				tick++

				// Remove workers no longer desired.
				for _, name := range info.GetChildren() {
					if _, ok := desired[name]; !ok {
						info.Remove(name)
					}
				}

				// Add new or replace changed workers.
				for key, cfg := range desired {
					child, exists := info.GetChild(key)
					if exists {
						// Check if config changed via handler type assertion.
						if h, ok := child.GetHandler().(*solverHandler); ok && h.version == cfg.version {
							continue // unchanged, skip
						}
						info.Remove(key)                  // config changed, replace
						time.Sleep(10 * time.Millisecond) // let old worker stop
					}
					info.Add(workers.NewWorker(key).Handler(&solverHandler{version: cfg.version}))
				}
				time.Sleep(10 * time.Millisecond)
				fmt.Printf("tick %d: children=%v count=%d\n", tick, info.GetChildren(), len(info.GetChildren()))
			}
		}
	})

	ctx, cancel := context.WithTimeout(context.Background(), 250*time.Millisecond)
	defer cancel()

	workers.Run(ctx, []*workers.Worker{manager})
}

Output

tick 1: children=[a] count=1
tick 2: children=[a b] count=2
tick 3: children=[a b] count=2

Example (Standalone)

Standalone usage with signal handling — no ColdBrew required.

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/go-coldbrew/workers"
)

func main() {
	// In production you'd use signal.NotifyContext(ctx, os.Interrupt).
	// For the example, use a short timeout.
	ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
	defer cancel()

	workers.Run(ctx, []*workers.Worker{
		workers.NewWorker("kafka").HandlerFunc(func(ctx context.Context, _ *workers.WorkerInfo) error {
			fmt.Println("consuming messages")
			<-ctx.Done()
			return ctx.Err()
		}),
	})
	fmt.Println("shutdown complete")
}

Output

consuming messages
shutdown complete

Index

Variables

ErrDoNotRestart can be returned from a handler to signal that the worker should not be restarted, even when restart is enabled. Use this for permanent completion (e.g., channel closed, work exhausted).

var ErrDoNotRestart = suture.ErrDoNotRestart

ErrSkipTick can be returned from a periodic handler to skip the current tick without triggering restart. The timer continues and the next tick fires normally. Only meaningful for periodic workers (with Worker.Every).

var ErrSkipTick = errors.New("workers: skip tick")

func Run

func Run(ctx context.Context, workers []*Worker, opts ...RunOption) error

Run starts all workers under a suture supervisor and blocks until ctx is cancelled and all workers have exited. Each worker gets its own child supervisor — when a worker stops, its children stop too. A worker exiting early (without restart) does not stop other workers. Returns nil on clean shutdown.

Example

Run multiple workers concurrently. All workers start together and stop when the context is cancelled.

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/go-coldbrew/workers"
)

func main() {
	w1 := workers.NewWorker("api-poller").HandlerFunc(func(ctx context.Context, _ *workers.WorkerInfo) error {
		fmt.Println("api-poller started")
		<-ctx.Done()
		return ctx.Err()
	})
	w2 := workers.NewWorker("cache-warmer").HandlerFunc(func(ctx context.Context, _ *workers.WorkerInfo) error {
		fmt.Println("cache-warmer started")
		<-ctx.Done()
		return ctx.Err()
	})

	ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
	defer cancel()

	workers.Run(ctx, []*workers.Worker{w1, w2})
	fmt.Println("all workers stopped")
}

Output

api-poller started
cache-warmer started
all workers stopped

func RunWorker(ctx context.Context, w *Worker, opts ...RunOption)

RunWorker runs a single worker with panic recovery and optional restart. Blocks until ctx is cancelled or the worker exits without restart. Unlike Run, RunWorker discards the error. Use Run if you need the error.

Example

RunWorker runs a single worker — useful for dynamic managers that spawn child workers in their own goroutines.

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/go-coldbrew/workers"
)

func main() {
	w := workers.NewWorker("single").HandlerFunc(func(ctx context.Context, _ *workers.WorkerInfo) error {
		fmt.Println("running")
		<-ctx.Done()
		return ctx.Err()
	})

	ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
	defer cancel()

	workers.RunWorker(ctx, w)
	fmt.Println("done")
}

Output

running
done

BaseMetrics provides no-op implementations of all Metrics methods. Embed it in custom Metrics implementations so that new methods added to the Metrics interface in future versions get safe no-op defaults instead of breaking your build:

type myMetrics struct {
    workers.BaseMetrics // forward-compatible
    client *statsd.Client
}

func (m *myMetrics) WorkerStarted(name string) {
    m.client.Incr("worker.started", []string{"worker:" + name}, 1)
}
type BaseMetrics struct{}

func (BaseMetrics) ObserveRunDuration

func (BaseMetrics) ObserveRunDuration(string, time.Duration)

func (BaseMetrics) SetActiveWorkers

func (BaseMetrics) SetActiveWorkers(int)

func (BaseMetrics) WorkerFailed

func (BaseMetrics) WorkerFailed(string, error)

func (BaseMetrics) WorkerPanicked

func (BaseMetrics) WorkerPanicked(string)

func (BaseMetrics) WorkerRestarted

func (BaseMetrics) WorkerRestarted(string, int)

func (BaseMetrics) WorkerStarted

func (BaseMetrics) WorkerStarted(string)

func (BaseMetrics) WorkerStopped

func (BaseMetrics) WorkerStopped(string)

CycleFunc adapts a plain function into a CycleHandler. Close is a no-op — use this for simple, stateless handlers.

type CycleFunc func(ctx context.Context, info *WorkerInfo) error

func BatchChannelWorker[T any](ch <-chan T, maxSize int, maxDelay time.Duration, fn func(ctx context.Context, info *WorkerInfo, batch []T) error) CycleFunc

BatchChannelWorker collects items from ch into batches and calls fn when either the batch reaches maxSize or maxDelay elapses since the first item in the current batch — whichever comes first. Flushes any partial batch on context cancellation or channel close before returning.

Example

BatchChannelWorker collects items into batches and flushes on maxSize or maxDelay — whichever comes first.

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/go-coldbrew/workers"
)

func main() {
	ch := make(chan int, 10)
	for i := 1; i <= 6; i++ {
		ch <- i
	}
	close(ch)

	fn := workers.BatchChannelWorker(ch, 3, time.Hour, func(_ context.Context, _ *workers.WorkerInfo, batch []int) error {
		fmt.Println(batch)
		return nil
	})

	ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
	defer cancel()

	w := workers.NewWorker("batcher").HandlerFunc(fn)
	workers.Run(ctx, []*workers.Worker{w})
}

Output

[1 2 3]
[4 5 6]

func ChannelWorker[T any](ch <-chan T, fn func(ctx context.Context, info *WorkerInfo, item T) error) CycleFunc

ChannelWorker consumes items from ch one at a time, calling fn for each. Returns when ctx is cancelled or ch is closed.

Example

ChannelWorker consumes items from a channel one at a time.

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/go-coldbrew/workers"
)

func main() {
	ch := make(chan string, 3)
	ch <- "hello"
	ch <- "world"
	ch <- "!"
	close(ch)

	fn := workers.ChannelWorker(ch, func(_ context.Context, _ *workers.WorkerInfo, item string) error {
		fmt.Println(item)
		return nil
	})

	ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
	defer cancel()

	w := workers.NewWorker("consumer").HandlerFunc(fn)
	workers.Run(ctx, []*workers.Worker{w})
}

Output

hello
world
!

func EveryInterval(d time.Duration, fn CycleFunc) CycleFunc

EveryInterval wraps fn in a timer loop that calls fn at the given interval. Returns when ctx is cancelled. If fn returns an error, EveryInterval returns that error (the supervisor decides whether to restart based on Worker.WithRestart).

Example

EveryInterval wraps a function in a timer loop.

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/go-coldbrew/workers"
)

func main() {
	count := 0
	fn := workers.EveryInterval(20*time.Millisecond, func(_ context.Context, _ *workers.WorkerInfo) error {
		count++
		fmt.Printf("tick %d\n", count)
		return nil
	})

	w := workers.NewWorker("periodic").HandlerFunc(fn)

	ctx, cancel := context.WithTimeout(context.Background(), 55*time.Millisecond)
	defer cancel()

	workers.Run(ctx, []*workers.Worker{w})
}

Output

tick 1
tick 2

func (CycleFunc) Close

func (fn CycleFunc) Close() error

Close is a no-op for CycleFunc.

func (CycleFunc) RunCycle

func (fn CycleFunc) RunCycle(ctx context.Context, info *WorkerInfo) error

CycleHandler handles worker execution cycles. For periodic workers, RunCycle is called once per tick. Close is called once when the worker stops, allowing cleanup of resources.

type CycleHandler interface {
    RunCycle(ctx context.Context, info *WorkerInfo) error
    Close() error
}

type Metrics

Metrics collects worker lifecycle metrics. Implement this interface to provide custom metrics (e.g., Datadog, StatsD). Use BaseMetrics{} to disable metrics, or NewPrometheusMetrics for the built-in Prometheus implementation.

type Metrics interface {
    WorkerStarted(name string)
    WorkerStopped(name string)
    WorkerPanicked(name string)
    WorkerFailed(name string, err error)
    WorkerRestarted(name string, attempt int)
    // ObserveRunDuration records the attempt lifetime — the duration from
    // when the worker started to when it stopped or failed. For per-cycle
    // timing, use middleware.Duration instead.
    ObserveRunDuration(name string, duration time.Duration)
    SetActiveWorkers(count int)
}

func NewPrometheusMetrics(namespace string) Metrics

NewPrometheusMetrics creates a Metrics implementation backed by Prometheus. The namespace is prepended to all metric names (e.g., "myapp" → "myapp_worker_started_total"). Metrics are auto-registered with the default Prometheus registry. Safe to call multiple times with the same namespace — returns the cached instance. The cache is process-global; use a small number of static namespaces (not per-request/tenant values).

Middleware intercepts each execution cycle. Call next to continue the chain. Matches gRPC interceptor convention.

type Middleware func(ctx context.Context, info *WorkerInfo, next CycleFunc) error

RunOption configures the behavior of Run.

type RunOption func(*runConfig)

func AddInterceptors(mw ...Middleware) RunOption

AddInterceptors appends to the run-level interceptor list.

func WithDefaultJitter(percent int) RunOption

WithDefaultJitter sets a run-level default jitter percentage for all periodic workers. Worker-level Worker.WithJitter takes precedence. Setting Worker.WithJitter(0) disables jitter for a specific worker even when a run-level default is set.

func WithInterceptors(mw ...Middleware) RunOption

WithInterceptors replaces the run-level interceptor list. Run-level interceptors wrap outside worker-level interceptors.

func WithMetrics(m Metrics) RunOption

WithMetrics sets the metrics implementation for all workers started by Run. Workers inherit this unless they override via Worker.WithMetrics. If not set, BaseMetrics is used.

type Worker

Worker represents a background goroutine managed by the framework. Create with NewWorker and configure with builder methods.

type Worker struct {
    // contains filtered or unexported fields
}

func NewWorker(name string) *Worker

NewWorker creates a Worker with the given name. Set the handler via Worker.Handler or Worker.HandlerFunc.

Example

A simple worker that runs until cancelled.

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/go-coldbrew/workers"
)

func main() {
	w := workers.NewWorker("greeter").HandlerFunc(func(ctx context.Context, info *workers.WorkerInfo) error {
		fmt.Printf("worker %q started (attempt %d)\n", info.GetName(), info.GetAttempt())
		<-ctx.Done()
		return ctx.Err()
	})

	ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
	defer cancel()

	workers.Run(ctx, []*workers.Worker{w})
}

Output

worker "greeter" started (attempt 0)

func (*Worker) AddInterceptors

func (w *Worker) AddInterceptors(mw ...Middleware) *Worker

AddInterceptors appends to the worker-level interceptor list.

func (*Worker) Every

func (w *Worker) Every(d time.Duration) *Worker

Every configures the worker to run periodically at the given interval. The interval is stored as data — wrapping is deferred to startup where jitter configuration is resolved.

Example

A periodic worker that runs a function on a fixed interval.

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/go-coldbrew/workers"
)

func main() {
	count := 0
	w := workers.NewWorker("ticker").HandlerFunc(func(_ context.Context, _ *workers.WorkerInfo) error {
		count++
		fmt.Printf("tick %d\n", count)
		return nil
	}).Every(20 * time.Millisecond)

	ctx, cancel := context.WithTimeout(context.Background(), 55*time.Millisecond)
	defer cancel()

	workers.Run(ctx, []*workers.Worker{w})
}

Output

tick 1
tick 2

func (*Worker) GetHandler

func (w *Worker) GetHandler() CycleHandler

GetHandler returns the worker's CycleHandler, or nil if not set.

func (*Worker) GetInitialDelay

func (w *Worker) GetInitialDelay() time.Duration

GetInitialDelay returns the initial delay before the first tick, or 0 if not set.

func (*Worker) GetInterval

func (w *Worker) GetInterval() time.Duration

GetInterval returns the periodic interval, or 0 if this is not a periodic worker.

func (*Worker) GetJitterPercent

func (w *Worker) GetJitterPercent() int

GetJitterPercent returns the jitter percentage. -1 means inherit run-level default, 0 means no jitter.

func (*Worker) GetName

func (w *Worker) GetName() string

GetName returns the worker's name.

func (*Worker) GetRestartOnFail

func (w *Worker) GetRestartOnFail() bool

GetRestartOnFail returns whether the worker restarts on failure.

func (*Worker) Handler

func (w *Worker) Handler(h CycleHandler) *Worker

Handler sets the worker's CycleHandler. Use this for handlers that need cleanup via Close (e.g., database connections, leases).

func (*Worker) HandlerFunc

func (w *Worker) HandlerFunc(fn CycleFunc) *Worker

HandlerFunc sets the worker's handler from a plain function. This is the common case for simple, stateless workers.

func (*Worker) Interceptors

func (w *Worker) Interceptors(mw ...Middleware) *Worker

Interceptors replaces the worker-level interceptor list.

Example

Per-worker middleware using the interceptor pattern.

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/go-coldbrew/workers"
)

func main() {
	loggingMW := func(ctx context.Context, info *workers.WorkerInfo, next workers.CycleFunc) error {
		fmt.Printf("[%s] cycle start\n", info.GetName())
		err := next(ctx, info)
		fmt.Printf("[%s] cycle end\n", info.GetName())
		return err
	}

	w := workers.NewWorker("with-logging").
		HandlerFunc(func(_ context.Context, info *workers.WorkerInfo) error {
			fmt.Printf("[%s] doing work\n", info.GetName())
			return nil
		}).
		Every(20 * time.Millisecond).
		Interceptors(loggingMW)

	ctx, cancel := context.WithTimeout(context.Background(), 35*time.Millisecond)
	defer cancel()

	workers.Run(ctx, []*workers.Worker{w})
}

Output

[with-logging] cycle start
[with-logging] doing work
[with-logging] cycle end

func (*Worker) WithBackoffJitter

func (w *Worker) WithBackoffJitter(jitter func(time.Duration) time.Duration) *Worker

WithBackoffJitter adds random jitter to the backoff duration to prevent thundering herd on coordinated restarts. The function receives the base backoff duration and returns a jittered duration.

func (*Worker) WithFailureBackoff

func (w *Worker) WithFailureBackoff(d time.Duration) *Worker

WithFailureBackoff sets the duration to wait between restarts. Suture default is 15 seconds.

func (*Worker) WithFailureDecay

func (w *Worker) WithFailureDecay(decay float64) *Worker

WithFailureDecay sets the rate at which failure count decays over time. A value of 1.0 means failures decay by one per second. Suture default is 1.0.

func (*Worker) WithFailureThreshold

func (w *Worker) WithFailureThreshold(threshold float64) *Worker

WithFailureThreshold sets the number of failures allowed before the supervisor gives up restarting. Suture default is 5.

func (*Worker) WithInitialDelay

func (w *Worker) WithInitialDelay(d time.Duration) *Worker

WithInitialDelay delays the first tick to stagger startup. Requires Worker.Every.

func (*Worker) WithJitter

func (w *Worker) WithJitter(percent int) *Worker

WithJitter sets per-worker jitter as a percentage of the base interval. Each tick is randomized within ±percent of the base. Requires Worker.Every. Setting WithJitter(0) explicitly disables jitter even when a run-level default is set via WithDefaultJitter.

func (*Worker) WithMetrics

func (w *Worker) WithMetrics(m Metrics) *Worker

WithMetrics sets a per-worker metrics implementation, overriding the metrics inherited from the parent WorkerInfo or Run options.

func (*Worker) WithRestart

func (w *Worker) WithRestart(restart bool) *Worker

WithRestart configures whether the worker should be restarted on failure. Default is true. Set to false for one-shot workers that should exit after completion or failure. Note: a handler returning nil always stops the worker permanently, regardless of this setting.

Periodic workers (with Worker.Every) should generally keep the default (restart enabled). Use ErrSkipTick for transient failures and ErrDoNotRestart for permanent completion instead of disabling restart.

Example

A worker with automatic restart on failure. The supervisor logs restart events; the worker succeeds on the third attempt.

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/go-coldbrew/workers"
)

func main() {
	attempt := 0
	w := workers.NewWorker("resilient").HandlerFunc(func(ctx context.Context, _ *workers.WorkerInfo) error {
		attempt++
		if attempt <= 2 {
			return fmt.Errorf("transient error")
		}
		fmt.Printf("succeeded on attempt %d\n", attempt)
		<-ctx.Done()
		return ctx.Err()
	}).WithRestart(true)

	ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
	defer cancel()

	workers.Run(ctx, []*workers.Worker{w})
	// This example demonstrates restart behavior. Log output from the
	// supervisor is expected between restarts. The worker prints on success.
}

func (*Worker) WithTimeout

func (w *Worker) WithTimeout(d time.Duration) *Worker

WithTimeout sets the maximum time to wait for the worker to stop during graceful shutdown. Suture default is 10 seconds.

WorkerInfo carries worker metadata and child management. The framework always creates it — it is never nil. context.Context handles cancellation/deadlines/values; WorkerInfo handles everything worker-specific.

type WorkerInfo struct {
    // contains filtered or unexported fields
}

func NewWorkerInfo(name string, attempt int, opts ...WorkerInfoOption) *WorkerInfo

NewWorkerInfo creates a WorkerInfo with the given name and attempt. This is useful for testing middleware and handlers — the framework creates fully populated instances internally.

Use WithTestChildren to enable Add/Remove/GetChildren in tests.

func (*WorkerInfo) Add

func (info *WorkerInfo) Add(w *Worker) bool

Add starts a child worker under this worker's supervisor subtree. Returns true if the worker was added, false if a worker with the same name is already running (no-op). To replace a running worker, call WorkerInfo.Remove first then Add.

Note: Remove + Add is not atomic — there is a brief window where the worker is not running. For most reconciliation patterns this is fine.

Children inherit run-level interceptors, metrics (unless overridden via Worker.WithMetrics), and scoped lifecycle — when this worker stops, all its children stop too.

When a child permanently stops, it is automatically removed from the children map on the next call to WorkerInfo.Add, WorkerInfo.GetChildren, WorkerInfo.GetChild, or WorkerInfo.GetChildCount.

Example

A manager worker that dynamically spawns and removes child workers using WorkerInfo.Add, Remove, and GetChildren.

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/go-coldbrew/workers"
)

func main() {
	manager := workers.NewWorker("manager").HandlerFunc(func(ctx context.Context, info *workers.WorkerInfo) error {
		// Spawn two child workers dynamically.
		info.Add(workers.NewWorker("child-a").HandlerFunc(func(ctx context.Context, childInfo *workers.WorkerInfo) error {
			fmt.Printf("%s started\n", childInfo.GetName())
			<-ctx.Done()
			return ctx.Err()
		}))
		info.Add(workers.NewWorker("child-b").HandlerFunc(func(ctx context.Context, childInfo *workers.WorkerInfo) error {
			fmt.Printf("%s started\n", childInfo.GetName())
			<-ctx.Done()
			return ctx.Err()
		}))

		// Give children time to start.
		time.Sleep(30 * time.Millisecond)
		fmt.Printf("children: %v\n", info.GetChildren())

		// Remove one child.
		info.Remove("child-a")
		time.Sleep(30 * time.Millisecond)
		fmt.Printf("after remove: %v\n", info.GetChildren())

		<-ctx.Done()
		return ctx.Err()
	})

	ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
	defer cancel()

	workers.Run(ctx, []*workers.Worker{manager})
}

Output

child-a started
child-b started
children: [child-a child-b]
after remove: [child-b]

Example (Replace)

Replace a child worker using Remove + Add. Add is a no-op if the name exists, so Remove first.

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/go-coldbrew/workers"
)

func main() {
	manager := workers.NewWorker("manager").HandlerFunc(func(ctx context.Context, info *workers.WorkerInfo) error {
		info.Add(workers.NewWorker("processor").HandlerFunc(func(ctx context.Context, _ *workers.WorkerInfo) error {
			fmt.Println("processor v1")
			<-ctx.Done()
			return ctx.Err()
		}))
		time.Sleep(30 * time.Millisecond)

		// Replace: Remove the old worker, then Add the new one.
		info.Remove("processor")
		time.Sleep(10 * time.Millisecond) // brief gap while old stops
		info.Add(workers.NewWorker("processor").HandlerFunc(func(ctx context.Context, _ *workers.WorkerInfo) error {
			fmt.Println("processor v2")
			<-ctx.Done()
			return ctx.Err()
		}))
		time.Sleep(30 * time.Millisecond)

		<-ctx.Done()
		return ctx.Err()
	})

	ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
	defer cancel()

	workers.Run(ctx, []*workers.Worker{manager})
}

Output

processor v1
processor v2

func (*WorkerInfo) GetAttempt

func (info *WorkerInfo) GetAttempt() int

GetAttempt returns the restart attempt number (0 on first run).

func (*WorkerInfo) GetChild

func (info *WorkerInfo) GetChild(name string) (Worker, bool)

GetChild returns a copy of a running child worker and true, or the zero value and false if not found. The returned value is a snapshot — mutations to the Worker fields have no effect on the running worker.

The CycleHandler (accessible via Worker.GetHandler) is shared with the running worker, not copied. Use type assertion to inspect handler state (e.g., config versions for reconciliation). See [Example_reconcilerWithChangeDetection].

func (*WorkerInfo) GetChildCount

func (info *WorkerInfo) GetChildCount() int

GetChildCount returns the number of currently running child workers. This is more efficient than calling WorkerInfo.GetChildren and taking len, as it avoids allocating a sorted slice. Stopped children are lazily pruned.

func (*WorkerInfo) GetChildren

func (info *WorkerInfo) GetChildren() []string

GetChildren returns the names of currently running child workers. Stopped children are lazily pruned before building the list.

func (*WorkerInfo) GetHandler

func (info *WorkerInfo) GetHandler() CycleHandler

GetHandler returns the worker's CycleHandler, or nil if not set. Use type assertion to access handler-specific state or interfaces:

if h, ok := info.GetHandler().(MyHandler); ok {
    // access h.Config, h.Version, etc.
}

func (*WorkerInfo) GetName

func (info *WorkerInfo) GetName() string

GetName returns the worker's name as passed to NewWorker.

func (*WorkerInfo) Remove

func (info *WorkerInfo) Remove(name string)

Remove stops a child worker by name.

WorkerInfoOption configures a WorkerInfo created by NewWorkerInfo.

type WorkerInfoOption func(*WorkerInfo)

func WithTestChildren(ctx context.Context) WorkerInfoOption

WithTestChildren creates an internal supervisor so that Add, Remove, and GetChildren work in tests without calling Run. The caller must cancel the returned context when the test is done to stop the supervisor.

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
info := workers.NewWorkerInfo("test", 0, workers.WithTestChildren(ctx))

func WithTestHandler(h CycleHandler) WorkerInfoOption

WithTestHandler sets the handler on a test WorkerInfo so that WorkerInfo.GetHandler works in unit tests.

Generated by gomarkdoc

About

Standalone worker lifecycle library for Go — built on suture with builder pattern, WorkerContext, and helpers

Topics

Resources

License

Code of conduct

Contributing

Security policy

Stars

Watchers

Forks

Packages

 
 
 

Contributors