diff --git a/README.md b/README.md index 2569088..31a14c4 100644 --- a/README.md +++ b/README.md @@ -392,6 +392,34 @@ There maybe more restrictions that I'm not aware of yet. Open an issue if any ar To simplify migration, if your workflows and activities don't use a single struct input type, use `NewWorkflowPositional`/`NewActivityPositional`. Don't use these functions in new code. +## Claude Code Skill + +This repository includes a Claude Code skill that provides AI-assisted development guidance for building Temporal applications with tempts. The skill covers type declarations, worker setup, signals, queries, testing, Nexus operations, and migration from the standard Go SDK. + +### Installation + +#### Via `npx skills` + +```bash +npx skills add vikstrous/tempts -s tempts-developer +``` + +#### Via manual clone + +```bash +git clone https://github.com/vikstrous/tempts ~/.claude/skills/tempts-developer --no-checkout +cd ~/.claude/skills/tempts-developer +git sparse-checkout set skill +git checkout +``` + +Or clone the full repo and symlink: + +```bash +git clone https://github.com/vikstrous/tempts ~/tempts +ln -s ~/tempts/skill ~/.claude/skills/tempts-developer +``` + ## Potential future improvements * Return typed futures instead of generic ones diff --git a/skill/SKILL.md b/skill/SKILL.md new file mode 100644 index 0000000..d48d588 --- /dev/null +++ b/skill/SKILL.md @@ -0,0 +1,128 @@ +--- +name: tempts-developer +description: This skill should be used when the user asks to "create a Temporal workflow with tempts", "write a type-safe Temporal activity", "tempts workflow", "tempts activity", "tempts signal", "tempts query", "tempts worker", "tempts Nexus", "type-safe Temporal Go", "tempts schedule", "tempts replay test", "tempts mock test", "migrate to tempts", or mentions the tempts library or type-safe Temporal Go SDK development. +version: 0.1.0 +--- + +# Skill: tempts-developer + +## Overview + +`tempts` (Temporal Type-Safe) is a Go wrapper library around the Temporal Go SDK that provides compile-time type safety for workflows, activities, signals, queries, schedules, and Nexus operations. It eliminates runtime type mismatches by using Go generics. + +This skill provides guidance for building Temporal applications in Go using the `tempts` library. + +## Core Architecture + +``` +┌─────────────────────────────────────────────────────────────────┐ +│ Temporal Cluster │ +│ ┌─────────────────┐ ┌─────────────────┐ ┌────────────────┐ │ +│ │ Event History │ │ Task Queues │ │ Visibility │ │ +│ │ (Durable Log) │ │ (Work Router) │ │ (Search) │ │ +│ └─────────────────┘ └─────────────────┘ └────────────────┘ │ +└─────────────────────────────────────────────────────────────────┘ + ▲ + │ Poll / Complete + ▼ +┌─────────────────────────────────────────────────────────────────┐ +│ Worker (tempts) │ +│ ┌─────────────────────────┐ ┌──────────────────────────────┐ │ +│ │ Workflow Definitions │ │ Activity Implementations │ │ +│ │ (Type-safe, Generic) │ │ (Type-safe, Generic) │ │ +│ └─────────────────────────┘ └──────────────────────────────┘ │ +│ ┌─────────────────────────────────────────────────────────────┐│ +│ │ tempts.NewWorker validates all registrations at startup ││ +│ └─────────────────────────────────────────────────────────────┘│ +└─────────────────────────────────────────────────────────────────┘ +``` + +**tempts enforces at compile time and startup:** +- Workflows and activities are called with correct parameter and return types +- Workflows and activities are routed to the correct queue +- Workers have all required implementations registered (no more, no less) +- Signals are sent and received with matching types +- Queries are called and handled with matching types +- Nexus operations have matching types and complete service implementations + +**Components (same as Temporal, but type-safe):** +- **Queues** (`tempts.NewQueue`) - Named task queues; workflows and activities are declared on a queue +- **Workflows** (`tempts.NewWorkflow[Param, Return]`) - Durable, deterministic functions +- **Activities** (`tempts.NewActivity[Param, Return]`) - Non-deterministic operations (API calls, I/O) +- **Workers** (`tempts.NewWorker`) - Validates and runs all registered implementations +- **Signals** (`tempts.NewWorkflowSignal[SignalParam]`) - Type-safe signals scoped to a workflow +- **Queries** (`tempts.NewQueryHandler[Param, Return]`) - Type-safe query handlers +- **Nexus Operations** - Type-safe sync and async Nexus operations on services + +## Constraint: All parameter and return types must be Go structs + +tempts enforces that all Param and Return type parameters are Go structs (or pointers to structs). Using non-struct types (string, int, etc.) will panic at declaration time. Use a wrapper struct: + +```go +// BAD - will panic +var myWorkflow = tempts.NewWorkflow[string, string](queue, "MyWorkflow") + +// GOOD +type MyWorkflowParams struct { + Name string +} +type MyWorkflowResult struct { + Greeting string +} +var myWorkflow = tempts.NewWorkflow[MyWorkflowParams, MyWorkflowResult](queue, "MyWorkflow") + +// For no parameters or return, use struct{} +var myWorkflow = tempts.NewWorkflow[struct{}, struct{}](queue, "MyWorkflow") +``` + +## History Replay: Why Determinism Matters + +Temporal achieves durability through **history replay**. This is unchanged when using tempts -- all standard Temporal determinism rules apply: + +1. **Initial Execution** - Worker runs workflow, generates Commands, stored as Events in history +2. **Recovery** - On restart/failure, Worker re-executes workflow from beginning +3. **Matching** - SDK compares generated Commands against stored Events +4. **Restoration** - Uses stored Activity results instead of re-executing + +**If Commands don't match Events = Non-determinism Error = Workflow blocked** + +See `references/determinism.md` for detailed determinism rules. + +## Getting Started + +### Ensure Temporal CLI is installed + +Check if `temporal` CLI is installed. If not, follow these instructions: + +#### macOS + +``` +brew install temporal +``` + +#### Linux + +Check your machine's architecture and download the appropriate archive: + +- [Linux amd64](https://temporal.download/cli/archive/latest?platform=linux&arch=amd64) +- [Linux arm64](https://temporal.download/cli/archive/latest?platform=linux&arch=arm64) + +Once you've downloaded the file, extract the downloaded archive and add the temporal binary to your PATH by copying it to a directory like /usr/local/bin + +### Read All Relevant References + +1. First, read `references/getting-started.md` for the quick start guide +2. Then read references relevant to your task: + - `references/patterns.md` - Signals, queries, child workflows, parallel execution + - `references/testing.md` - Unit tests, mock tests, replay tests + - `references/nexus.md` - Nexus service operations + - `references/determinism.md` - Determinism rules and safe alternatives + - `references/migration.md` - Migrating from the standard Go SDK + +## Primary References +- **`references/getting-started.md`** - Quick start, file organization, worker setup +- **`references/patterns.md`** - Signals, queries, child workflows, schedules, selectors, saga pattern, continue-as-new, cancellation +- **`references/testing.md`** - Unit testing, mock testing, replay/fixture testing +- **`references/nexus.md`** - Nexus service declarations, sync/async operations, calling from workflows +- **`references/determinism.md`** - Determinism rules, forbidden operations, safe alternatives +- **`references/migration.md`** - Migration guide from the standard Temporal Go SDK to tempts diff --git a/skill/references/determinism.md b/skill/references/determinism.md new file mode 100644 index 0000000..e0dfcce --- /dev/null +++ b/skill/references/determinism.md @@ -0,0 +1,82 @@ +# Determinism in tempts Workflows + +## Overview + +tempts wraps the Temporal Go SDK but does not change the determinism model. All standard Temporal determinism rules apply. The Go SDK has no runtime sandbox -- determinism is the developer's responsibility. + +## Why Determinism Matters + +Temporal achieves durability through **history replay**. When a Worker needs to restore workflow state (after crash, cache eviction, or a long timer), it re-executes the workflow code from the beginning, comparing generated Commands against stored Events in history. If they don't match, a non-determinism error occurs and the workflow becomes blocked. + +``` +Initial Execution: + Code runs → Generates Commands → Server stores as Events + +Replay (Recovery): + Code runs again → Generates Commands → SDK compares to Events + If match: Use stored results, continue + If mismatch: NondeterminismError +``` + +## Forbidden Operations in Workflow Code + +Do not use any of the following in workflow functions: + +- **Native goroutines** (`go func()`) -- use `workflow.Go()` instead +- **Native channels** (`chan`, send, receive) -- use `workflow.Channel` instead +- **Native `select`** -- use `workflow.Selector` instead +- **`time.Now()`** -- use `workflow.Now(ctx)` instead +- **`time.Sleep()`** -- use `workflow.Sleep(ctx, duration)` instead +- **`math/rand` global** -- use `workflow.SideEffect` instead +- **Map range iteration** (`for k, v := range myMap`) -- sort keys first, then iterate +- **I/O operations** (network, filesystem, database) -- move to activities +- **Reading environment variables** -- move to activities or pass as workflow parameters +- **`fmt.Println`** -- use `workflow.GetLogger(ctx)` for replay-safe logging + +## Safe Alternatives + +| Instead of | Use | +|---|---| +| `go func() { ... }()` | `workflow.Go(ctx, func(ctx workflow.Context) { ... })` | +| `chan T` | `workflow.NewChannel(ctx)` / `workflow.NewBufferedChannel(ctx, size)` | +| `select { ... }` | `workflow.NewSelector(ctx)` | +| `time.Now()` | `workflow.Now(ctx)` | +| `time.Sleep(d)` | `workflow.Sleep(ctx, d)` | +| `rand.Intn(100)` | `workflow.SideEffect(ctx, func(ctx workflow.Context) interface{} { return rand.Intn(100) })` | +| `log.Println(...)` | `workflow.GetLogger(ctx).Info(...)` | + +## What tempts Does NOT Change + +- Workflow functions still receive `workflow.Context` (not `context.Context`) +- Activity functions still receive `context.Context` +- All `workflow.*` APIs (Go, Channel, Selector, Sleep, Now, SideEffect, GetLogger) are used directly -- tempts does not wrap these +- The `workflowcheck` static analysis tool works with tempts workflows + +## Static Analysis with workflowcheck + +The optional `workflowcheck` tool can catch many non-deterministic operations at compile time: + +```bash +go install go.temporal.io/sdk/contrib/tools/workflowcheck@latest +workflowcheck ./... +``` + +## Detecting Non-Determinism + +### During Execution +- `NondeterminismError` is raised when Commands don't match Events +- The workflow becomes blocked until code is fixed + +### Testing with Replay +Use tempts replay tests to verify backward compatibility. See `references/testing.md` for details. + +## Recovery from Non-Determinism + +### Accidental Change +1. Revert code to match what's in history +2. Restart worker +3. Workflow auto-recovers + +### Intentional Change +1. Use the Temporal Patching API (`workflow.GetVersion`) to support both old and new code paths +2. Or terminate old workflows and start new ones with updated code diff --git a/skill/references/getting-started.md b/skill/references/getting-started.md new file mode 100644 index 0000000..a207da3 --- /dev/null +++ b/skill/references/getting-started.md @@ -0,0 +1,295 @@ +# Getting Started with tempts + +## Overview + +`tempts` wraps the Temporal Go SDK to provide compile-time type safety via Go generics. All workflows, activities, signals, queries, and Nexus operations are declared as typed global variables, and `tempts.NewWorker` validates that all implementations are correctly registered at startup. + +## Quick Start + +**Add Dependency:** +```bash +go get github.com/vikstrous/tempts@latest +``` + +**Complete working example:** + +```go +package main + +import ( + "context" + "fmt" + + "github.com/vikstrous/tempts" + "go.temporal.io/sdk/client" + "go.temporal.io/sdk/worker" + "go.temporal.io/sdk/workflow" +) + +// Declare a task queue +var queueMain = tempts.NewQueue("main") + +// Declare a workflow with typed parameters and return +var workflowTypeHello = tempts.NewWorkflow[struct{}, struct{}](queueMain, "HelloWorkflow") + +// Declare an activity with typed parameters and return +var activityTypeHello = tempts.NewActivity[struct{}, struct{}](queueMain, "HelloActivity") + +func main() { + c, err := tempts.Dial(client.Options{}) + if err != nil { + panic(err) + } + defer c.Close() + + // Create worker -- validates all implementations match declarations + wrk, err := tempts.NewWorker(queueMain, []tempts.Registerable{ + workflowTypeHello.WithImplementation(helloWorkflow), + activityTypeHello.WithImplementation(helloActivity), + }) + if err != nil { + panic(err) + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go func() { + err = wrk.Run(ctx, c, worker.Options{}) + if err != nil { + panic(err) + } + }() + + // Execute the workflow (type-safe: params and return are checked at compile time) + _, err = workflowTypeHello.Run(ctx, c, client.StartWorkflowOptions{}, struct{}{}) + if err != nil { + panic(err) + } + + fmt.Println("Workflow completed.") +} + +func helloWorkflow(ctx workflow.Context, _ struct{}) (struct{}, error) { + return activityTypeHello.Run(ctx, struct{}{}) +} + +func helloActivity(ctx context.Context, _ struct{}) (struct{}, error) { + fmt.Println("Hello, Temporal!") + return struct{}{}, nil +} +``` + +**Start the dev server:** `temporal server start-dev` + +**Run the example:** `go run .` + +## Key Concepts + +### Declaration Pattern + +tempts uses a **declare-then-implement** pattern. Types are declared as package-level variables, and implementations are attached when creating a worker. + +```go +// 1. Declare types as package-level variables (shared across packages) +var queueMain = tempts.NewQueue("main") + +type GreetParams struct { + Name string +} +type GreetResult struct { + Message string +} + +var workflowTypeGreet = tempts.NewWorkflow[GreetParams, GreetResult](queueMain, "Greet") +var activityTypeGreet = tempts.NewActivity[GreetParams, GreetResult](queueMain, "GreetActivity") + +// 2. Implement the functions (matching the declared signatures) +func greetWorkflow(ctx workflow.Context, params GreetParams) (GreetResult, error) { + return activityTypeGreet.Run(ctx, params) +} + +func greetActivity(ctx context.Context, params GreetParams) (GreetResult, error) { + return GreetResult{Message: "Hello, " + params.Name}, nil +} + +// 3. Register implementations in a worker +wrk, err := tempts.NewWorker(queueMain, []tempts.Registerable{ + workflowTypeGreet.WithImplementation(greetWorkflow), + activityTypeGreet.WithImplementation(greetActivity), +}) +``` + +### Client + +tempts wraps the Temporal SDK client: + +```go +// Connect to Temporal server +c, err := tempts.Dial(client.Options{}) + +// Lazy connection (defers until first use) +c, err := tempts.NewLazyClient(client.Options{}) + +// Wrap an existing SDK client +c, err := tempts.NewFromSDK(existingClient, "default") + +defer c.Close() +``` + +### Workflow Execution + +```go +// Synchronous -- blocks until workflow completes +result, err := workflowTypeGreet.Run(ctx, c, client.StartWorkflowOptions{}, GreetParams{Name: "World"}) + +// Asynchronous -- returns a WorkflowRun handle +run, err := workflowTypeGreet.Execute(ctx, c, client.StartWorkflowOptions{}, GreetParams{Name: "World"}) +// ... do other work ... +var result GreetResult +err = run.Get(ctx, &result) +``` + +The queue is set automatically from the workflow declaration. `client.StartWorkflowOptions.TaskQueue` does not need to be set. + +### Activity Execution (from within a workflow) + +```go +// Synchronous +result, err := activityTypeGreet.Run(ctx, GreetParams{Name: "World"}) + +// Asynchronous -- returns a workflow.Future +future := activityTypeGreet.Execute(ctx, GreetParams{Name: "World"}) +var result GreetResult +err := future.Get(ctx, &result) + +// Append to a futures slice for parallel fan-out +var futures []workflow.Future +activityTypeGreet.AppendFuture(ctx, &futures, GreetParams{Name: "Alice"}) +activityTypeGreet.AppendFuture(ctx, &futures, GreetParams{Name: "Bob"}) +for _, f := range futures { + var r GreetResult + err := f.Get(ctx, &r) + // handle result +} +``` + +The queue is set automatically. Activity options (including timeouts) are inherited from the workflow context. tempts sets a default `StartToCloseTimeout` of 10 seconds via the workflow's `WithImplementation` wrapper. Override by calling `workflow.WithActivityOptions` before executing activities: + +```go +func myWorkflow(ctx workflow.Context, params MyParams) (MyResult, error) { + ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + StartToCloseTimeout: 5 * time.Minute, + }) + return myActivity.Run(ctx, params) +} +``` + +### Worker + +```go +wrk, err := tempts.NewWorker(queueMain, []tempts.Registerable{ + workflowTypeGreet.WithImplementation(greetWorkflow), + activityTypeGreet.WithImplementation(greetActivity), +}) +if err != nil { + // Validation failed: missing implementation, extra implementation, + // wrong queue, duplicate name, etc. + panic(err) +} + +// Run blocks until context is cancelled +err = wrk.Run(ctx, c, worker.Options{}) +``` + +`NewWorker` validates: +- Every workflow/activity declared on the queue has an implementation +- No extra implementations are provided that aren't declared on the queue +- No duplicate names +- All Nexus operations for a service are complete + +## File Organization + +``` +myapp/ +├── types/ +│ └── types.go # Queue, workflow, activity, signal declarations (shared) +├── workflows/ +│ └── greeting.go # Workflow implementations +├── activities/ +│ └── greet.go # Activity implementations +├── worker/ +│ └── main.go # Worker setup, registers implementations +└── starter/ + └── main.go # Client code to start workflows +``` + +**types/types.go** -- shared type declarations: +```go +package types + +import "github.com/vikstrous/tempts" + +var QueueMain = tempts.NewQueue("main") + +type GreetParams struct { + Name string +} +type GreetResult struct { + Message string +} + +var WorkflowTypeGreet = tempts.NewWorkflow[GreetParams, GreetResult](QueueMain, "Greet") +var ActivityTypeGreet = tempts.NewActivity[GreetParams, GreetResult](QueueMain, "GreetActivity") +``` + +**worker/main.go** -- worker setup: +```go +package main + +import ( + "context" + "log" + + "yourmodule/activities" + "yourmodule/types" + "yourmodule/workflows" + + "github.com/vikstrous/tempts" + "go.temporal.io/sdk/client" + "go.temporal.io/sdk/worker" +) + +func main() { + c, err := tempts.Dial(client.Options{}) + if err != nil { + log.Fatal(err) + } + defer c.Close() + + wrk, err := tempts.NewWorker(types.QueueMain, []tempts.Registerable{ + types.WorkflowTypeGreet.WithImplementation(workflows.GreetWorkflow), + types.ActivityTypeGreet.WithImplementation(activities.GreetActivity), + }) + if err != nil { + log.Fatal(err) + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + err = wrk.Run(ctx, c, worker.Options{}) + if err != nil { + log.Fatal(err) + } +} +``` + +## Common Pitfalls + +1. **Using non-struct types as parameters** -- All Param and Return type parameters must be Go structs. `tempts.NewWorkflow[string, string](...)` will panic. Wrap in a struct. +2. **Forgetting to register an implementation** -- `NewWorker` returns an error if any declared workflow/activity on the queue is missing an implementation. +3. **Registering an extra implementation** -- `NewWorker` returns an error if you provide an implementation for something not declared on the queue. +4. **Using native goroutines/channels/select in workflows** -- Use `workflow.Go()`, `workflow.Channel`, `workflow.Selector` (same as standard Temporal Go SDK). +5. **Using `time.Sleep` or `time.Now` in workflows** -- Use `workflow.Sleep()` and `workflow.Now()`. +6. **Not setting activity timeouts** -- tempts sets a default 10-second `StartToCloseTimeout`. Override with `workflow.WithActivityOptions` for longer activities. diff --git a/skill/references/migration.md b/skill/references/migration.md new file mode 100644 index 0000000..2224384 --- /dev/null +++ b/skill/references/migration.md @@ -0,0 +1,221 @@ +# Migrating from the Standard Go SDK to tempts + +## Prerequisites + +Before migrating a queue to tempts: + +- **Queue names must be static** -- tempts declares queues as global variables +- **All workflows and activities for a given queue must be migrated at once** -- tempts validates completeness at worker creation +- **All parameter types must be Go structs** -- tempts enforces struct types for all Param and Return generics + +## Migration Reference + +### Connect to Temporal + +**Before:** +```go +c, err := client.Dial(client.Options{}) +``` + +**After:** +```go +c, err := tempts.Dial(client.Options{}) +``` + +### Declare Queues + +**Before:** +```go +const taskQueue = "main" +``` + +**After:** +```go +var queueMain = tempts.NewQueue("main") +``` + +### Declare and Run Workflows + +**Before:** +```go +// Start workflow +var ret ReturnType +run, err := c.ExecuteWorkflow(ctx, client.StartWorkflowOptions{ + TaskQueue: "main", +}, "MyWorkflow", param) +err = run.Get(ctx, &ret) +``` + +**After:** +```go +// Declare (once, as a package-level variable) +var myWorkflowType = tempts.NewWorkflow[MyParam, MyReturn](queueMain, "MyWorkflow") + +// Start workflow (queue is set automatically) +ret, err := myWorkflowType.Run(ctx, c, client.StartWorkflowOptions{}, MyParam{Name: "test"}) +``` + +### Declare and Run Activities + +**Before:** +```go +// In workflow +var ret ReturnType +err := workflow.ExecuteActivity(ctx, "MyActivity", param).Get(ctx, &ret) +``` + +**After:** +```go +// Declare (once, as a package-level variable) +var myActivityType = tempts.NewActivity[MyParam, MyReturn](queueMain, "MyActivity") + +// In workflow +ret, err := myActivityType.Run(ctx, MyParam{Name: "test"}) +``` + +### Create a Worker + +**Before:** +```go +w := worker.New(c, "main", worker.Options{}) +w.RegisterWorkflow(MyWorkflow) +w.RegisterActivity(&Activities{}) +err = w.Run(worker.InterruptCh()) +``` + +**After:** +```go +wrk, err := tempts.NewWorker(queueMain, []tempts.Registerable{ + myWorkflowType.WithImplementation(myWorkflowFn), + myActivityType.WithImplementation(myActivityFn), +}) +if err != nil { + panic(err) // validation failure +} +err = wrk.Run(ctx, c, worker.Options{}) +``` + +### Queries + +**Before:** +```go +// In workflow +workflow.SetQueryHandler(ctx, "get-status", func() (string, error) { + return status, nil +}) + +// Querying +response, err := c.QueryWorkflow(ctx, workflowID, runID, "get-status") +var value string +err = response.Get(&value) +``` + +**After:** +```go +// Declare +var queryGetStatus = tempts.NewQueryHandler[struct{}, StatusResult]("get-status") + +// In workflow +queryGetStatus.SetHandler(ctx, func(_ struct{}) (StatusResult, error) { + return StatusResult{Status: status}, nil +}) + +// Querying (typed result) +result, err := queryGetStatus.Query(ctx, c, workflowID, runID, struct{}{}) +``` + +### Signals + +**Before:** +```go +// In workflow +ch := workflow.GetSignalChannel(ctx, "my-signal") +var param SignalParam +ch.Receive(ctx, ¶m) + +// Sending +err := c.SignalWorkflow(ctx, workflowID, runID, "my-signal", param) +``` + +**After:** +```go +// Declare (scoped to a workflow) +var mySignal = tempts.NewWorkflowSignal[SignalParam](&myWorkflowType, "my-signal") + +// In workflow +param := mySignal.Receive(ctx) + +// Sending (typed) +err := mySignal.Signal(ctx, c, workflowID, runID, SignalParam{Message: "hello"}) +``` + +### Child Workflows + +**Before:** +```go +var ret ReturnType +err := workflow.ExecuteChildWorkflow(ctx, "ChildWorkflow", param).Get(ctx, &ret) +``` + +**After:** +```go +ret, err := childWorkflowType.RunChild(ctx, workflow.ChildWorkflowOptions{}, ChildParam{}) +``` + +### Schedules + +**Before:** +```go +_, err = c.ScheduleClient().Create(ctx, client.ScheduleOptions{ + ID: "my-schedule", + Spec: client.ScheduleSpec{...}, + Action: &client.ScheduleWorkflowAction{ + ID: "workflow-id", + Workflow: "MyWorkflow", + TaskQueue: "main", + Args: []any{param}, + }, +}) +``` + +**After:** +```go +err = myWorkflowType.SetSchedule(ctx, c, client.ScheduleOptions{ + ID: "my-schedule", + Spec: client.ScheduleSpec{...}, +}, MyParam{Name: "test"}) +``` + +`SetSchedule` creates the schedule if it doesn't exist, or updates it to match. + +### SignalWithStart + +**Before:** +```go +_, err := c.SignalWithStartWorkflow(ctx, workflowID, "my-signal", signalParam, opts, "MyWorkflow", workflowParam) +``` + +**After:** +```go +run, err := mySignal.SignalWithStart(ctx, c, opts, workflowParam, signalParam) +``` + +Both the workflow parameter type and signal parameter type are checked at compile time. + +## Positional Arguments (Legacy Migration) + +If existing workflows or activities use multiple positional arguments instead of a single struct, use `NewWorkflowPositional` / `NewActivityPositional` for backward compatibility: + +```go +// Existing workflow that takes (string, int) as positional args +var legacyWorkflow = tempts.NewWorkflowPositional[LegacyParams, LegacyResult](queueMain, "LegacyWorkflow") + +type LegacyParams struct { + Name string + Count int +} +``` + +The struct fields are passed as positional arguments in the order they are defined. This maintains wire compatibility with existing workflow histories. + +Do not use `NewWorkflowPositional` / `NewActivityPositional` in new code. diff --git a/skill/references/nexus.md b/skill/references/nexus.md new file mode 100644 index 0000000..7cec9bb --- /dev/null +++ b/skill/references/nexus.md @@ -0,0 +1,127 @@ +# tempts Nexus Operations + +## Overview + +tempts provides type-safe wrappers for Temporal Nexus operations. There are three operation types: + +- **SyncOperation** -- Simple RPC-style request-response +- **AsyncOperation** -- Backed by a workflow with matching input/output types +- **AsyncHandlerOperation** -- Custom handler where operation input can differ from workflow input + +Operations are declared on a `Service`, and all declared operations must have implementations when creating a worker. + +## Declaring Services and Operations + +```go +// Declare a Nexus service (in a shared API package) +var myService = tempts.NewService("my-nexus-service") + +// Sync operation -- simple RPC +type EchoInput struct { Message string } +type EchoOutput struct { Message string } +var echoOp = tempts.NewSyncOperation[EchoInput, EchoOutput](myService, "echo") + +// Async operation -- workflow-backed, input/output types match the workflow +type ProcessInput struct { Data string } +type ProcessOutput struct { Result string } +var processOp = tempts.NewAsyncOperation[ProcessInput, ProcessOutput](myService, "process") + +// Async handler operation -- operation input can differ from workflow input +type TransformInput struct { RawData string } +type TransformOutput struct { Result string } +var transformOp = tempts.NewAsyncHandlerOperation[TransformInput, TransformOutput](myService, "transform") +``` + +## Implementing Operations + +### Sync Operation + +```go +func echoHandler(ctx context.Context, input EchoInput, opts nexus.StartOperationOptions) (EchoOutput, error) { + return EchoOutput{Message: "Echo: " + input.Message}, nil +} + +// Register: +echoOp.WithImplementation(echoHandler) +``` + +### Async Operation + +Requires a workflow function and a function that returns `client.StartWorkflowOptions`: + +```go +func processWorkflow(ctx workflow.Context, input ProcessInput) (ProcessOutput, error) { + return ProcessOutput{Result: "Processed: " + input.Data}, nil +} + +func processGetOptions(ctx context.Context, input ProcessInput, opts nexus.StartOperationOptions) (client.StartWorkflowOptions, error) { + return client.StartWorkflowOptions{ + ID: "process-" + opts.RequestID, + }, nil +} + +// Register: +processOp.WithImplementation(processWorkflow, processGetOptions) +``` + +### Async Handler Operation + +The handler receives the operation input and returns a `WorkflowHandle`. This allows the operation input type to differ from the workflow input type: + +```go +func transformHandler(ctx context.Context, input TransformInput, opts nexus.StartOperationOptions) (temporalnexus.WorkflowHandle[TransformOutput], error) { + return temporalnexus.ExecuteWorkflow(ctx, opts, client.StartWorkflowOptions{ + ID: "transform-" + opts.RequestID, + }, transformWorkflow, ProcessInput{Data: input.RawData}) +} + +// Register: +transformOp.WithImplementation(transformHandler) +``` + +## Registering with a Worker + +Nexus operation implementations are passed directly into `NewWorker` alongside activities and workflows. `NewWorker` groups them by service and validates completeness: + +```go +wrk, err := tempts.NewWorker(queueMain, []tempts.Registerable{ + // Activities and workflows + workflowType.WithImplementation(workflowFn), + activityType.WithImplementation(activityFn), + // Nexus operations + echoOp.WithImplementation(echoHandler), + processOp.WithImplementation(processWorkflow, processGetOptions), + transformOp.WithImplementation(transformHandler), +}) +``` + +`NewWorker` validates that all operations declared on `myService` have implementations and that no extra implementations are provided. + +## Calling Operations from a Workflow + +Create a `NexusClient` from the service declaration, then call operations: + +```go +func callerWorkflow(ctx workflow.Context, params CallerParams) (CallerResult, error) { + c := myService.NewClient("my-nexus-endpoint") + + // Synchronous call + echoResult, err := echoOp.Run(ctx, c, EchoInput{Message: "hello"}, workflow.NexusOperationOptions{}) + if err != nil { + return CallerResult{}, err + } + + // Asynchronous call + future := processOp.Execute(ctx, c, ProcessInput{Data: echoResult.Message}, workflow.NexusOperationOptions{}) + // ... do other work ... + var processResult ProcessOutput + err = future.Get(ctx, &processResult) + if err != nil { + return CallerResult{}, err + } + + return CallerResult{Output: processResult.Result}, nil +} +``` + +The client validates at runtime that the operation belongs to the correct service. Calling an operation with a client from a different service panics with a descriptive error. diff --git a/skill/references/patterns.md b/skill/references/patterns.md new file mode 100644 index 0000000..4e60226 --- /dev/null +++ b/skill/references/patterns.md @@ -0,0 +1,357 @@ +# tempts Patterns + +## Signals + +Signals in tempts are scoped to a specific workflow via `NewWorkflowSignal`. This enforces that SignalWithStart operations use matching workflow and signal types. + +### Declaring Signals + +```go +type UpdateSuffixParams struct { + Suffix string +} + +// Signal is scoped to workflowTypeFormatAndGreet +var signalUpdateSuffix = tempts.NewWorkflowSignal[UpdateSuffixParams]( + &workflowTypeFormatAndGreet, "update_suffix", +) +``` + +### Receiving Signals in a Workflow + +**Blocking receive (single signal):** +```go +func myWorkflow(ctx workflow.Context, params MyParams) (MyResult, error) { + // Blocks until signal is received + signalParams := signalUpdateSuffix.Receive(ctx) + // use signalParams.Suffix +} +``` + +**Non-blocking receive:** +```go +if params, ok := signalUpdateSuffix.TryReceive(ctx); ok { + // Signal was received + suffix = params.Suffix +} +``` + +**Multiple signals with Selector:** +```go +selector := workflow.NewSelector(ctx) + +signalUpdateSuffix.AddToSelector(ctx, selector, func(params UpdateSuffixParams) { + suffix = params.Suffix +}) +otherSignal.AddToSelector(ctx, selector, func(params OtherParams) { + // handle other signal +}) + +selector.Select(ctx) // blocks until one signal fires +``` + +### Sending Signals from External Code + +```go +err := signalUpdateSuffix.Signal(ctx, c, workflowID, runID, UpdateSuffixParams{Suffix: "!"}) +``` + +### SignalWithStart + +Atomically starts the workflow if it doesn't exist, or signals it if it does. Both the workflow parameter type and signal parameter type are checked at compile time. + +```go +run, err := signalUpdateSuffix.SignalWithStart( + ctx, c, + client.StartWorkflowOptions{ID: "my-workflow-id"}, + FormatAndGreetParams{Name: "Viktor"}, // workflow param (type-checked) + UpdateSuffixParams{Suffix: " (started)"}, // signal param (type-checked) +) +``` + +## Queries + +Queries allow read-only inspection of workflow state. There is no compile-time enforcement that a query is registered on a specific workflow -- the name and types must match by convention. + +### Declaring and Using Queries + +```go +type GetNameResult struct { + Name string +} + +var queryGetName = tempts.NewQueryHandler[struct{}, GetNameResult]("get_formatted_name") +``` + +**Setting the handler inside a workflow:** +```go +func myWorkflow(ctx workflow.Context, params MyParams) (MyResult, error) { + currentName := "unknown" + + queryGetName.SetHandler(ctx, func(_ struct{}) (GetNameResult, error) { + return GetNameResult{Name: currentName}, nil + }) + + // ... workflow logic that updates currentName ... +} +``` + +**Querying from external code:** +```go +result, err := queryGetName.Query(ctx, c, workflowID, runID, struct{}{}) +fmt.Println(result.Name) +``` + +## Child Workflows + +```go +// Synchronous -- blocks until child completes +result, err := workflowTypeJustGreet.RunChild( + ctx, + workflow.ChildWorkflowOptions{}, + JustGreetParams{Name: "World"}, +) + +// Asynchronous -- returns a ChildWorkflowFuture +future := workflowTypeJustGreet.ExecuteChild( + ctx, + workflow.ChildWorkflowOptions{}, + JustGreetParams{Name: "World"}, +) +var result JustGreetResult +err := future.Get(ctx, &result) +``` + +The queue is set automatically from the child workflow's declaration. + +### Child Workflow Options + +```go +import enumspb "go.temporal.io/api/enums/v1" + +result, err := childWorkflowType.RunChild(ctx, workflow.ChildWorkflowOptions{ + WorkflowID: "child-" + workflow.GetInfo(ctx).WorkflowExecution.ID, + ParentClosePolicy: enumspb.PARENT_CLOSE_POLICY_ABANDON, + WorkflowExecutionTimeout: 10 * time.Minute, +}, params) +``` + +## Schedules + +tempts provides `SetSchedule`, which creates the schedule if it doesn't exist or updates it if it does. + +```go +err = workflowTypeFormatAndGreet.SetSchedule(ctx, c, client.ScheduleOptions{ + ID: "every5s", + Spec: client.ScheduleSpec{ + Intervals: []client.ScheduleIntervalSpec{ + { + Every: time.Second * 5, + }, + }, + }, +}, FormatAndGreetParams{Name: "Viktor"}) +``` + +The queue and workflow name are set automatically. The parameter type is checked at compile time. + +## Parallel Execution + +Use `AppendFuture` to fan out multiple activities, then collect results: + +```go +func parallelWorkflow(ctx workflow.Context, params ParallelParams) (ParallelResult, error) { + ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + StartToCloseTimeout: 5 * time.Minute, + }) + + var futures []workflow.Future + for _, item := range params.Items { + activityTypeProcess.AppendFuture(ctx, &futures, ProcessParams{Item: item}) + } + + var results []string + for _, f := range futures { + var r ProcessResult + if err := f.Get(ctx, &r); err != nil { + return ParallelResult{}, err + } + results = append(results, r.Output) + } + return ParallelResult{Outputs: results}, nil +} +``` + +Or use `Execute` directly: + +```go +futures := make([]workflow.Future, len(items)) +for i, item := range items { + futures[i] = activityTypeProcess.Execute(ctx, ProcessParams{Item: item}) +} +``` + +## Selector Pattern + +`workflow.Selector` replaces Go's native `select`. Use it to wait on multiple channels, futures, and timers. Signals integrate via `AddToSelector`: + +```go +func approvalWorkflow(ctx workflow.Context, _ struct{}) (ApprovalResult, error) { + var outcome string + + timerCtx, cancelTimer := workflow.WithCancel(ctx) + timer := workflow.NewTimer(timerCtx, 24*time.Hour) + + selector := workflow.NewSelector(ctx) + + approveSignal.AddToSelector(ctx, selector, func(params ApproveParams) { + cancelTimer() + outcome = "approved" + }) + + selector.AddFuture(timer, func(f workflow.Future) { + if err := f.Get(ctx, nil); err == nil { + outcome = "timed-out" + } + }) + + selector.Select(ctx) + return ApprovalResult{Outcome: outcome}, nil +} +``` + +## Continue-as-New + +```go +func longRunningWorkflow(ctx workflow.Context, state WorkflowState) (WorkflowResult, error) { + for { + state = processBatch(ctx, state) + + if state.IsComplete { + return WorkflowResult{}, nil + } + + if workflow.GetInfo(ctx).GetContinueAsNewSuggested() { + return WorkflowResult{}, workflow.NewContinueAsNewError(ctx, longRunningWorkflow, state) + } + } +} +``` + +Drain signals before continue-as-new to avoid signal loss: + +```go +for { + if params, ok := mySignal.TryReceive(ctx); ok { + // process signal + } else { + break + } +} +return WorkflowResult{}, workflow.NewContinueAsNewError(ctx, longRunningWorkflow, state) +``` + +## Cancellation Handling + +Use `ctx.Done()` to detect cancellation and `workflow.NewDisconnectedContext` for cleanup: + +```go +func myWorkflow(ctx workflow.Context, params MyParams) (MyResult, error) { + err := longRunningActivity.Run(ctx, params) + if err != nil && temporal.IsCanceledError(ctx.Err()) { + disconnectedCtx, _ := workflow.NewDisconnectedContext(ctx) + disconnectedCtx = workflow.WithActivityOptions(disconnectedCtx, workflow.ActivityOptions{ + StartToCloseTimeout: 5 * time.Minute, + }) + _ = cleanupActivity.Run(disconnectedCtx, CleanupParams{}) + return MyResult{}, err + } + return MyResult{}, err +} +``` + +## Saga Pattern (Compensations) + +```go +func orderWorkflow(ctx workflow.Context, order OrderParams) (OrderResult, error) { + var compensations []func(ctx workflow.Context) error + + runCompensations := func() { + disconnectedCtx, _ := workflow.NewDisconnectedContext(ctx) + disconnectedCtx = workflow.WithActivityOptions(disconnectedCtx, workflow.ActivityOptions{ + StartToCloseTimeout: 5 * time.Minute, + }) + for i := len(compensations) - 1; i >= 0; i-- { + if err := compensations[i](disconnectedCtx); err != nil { + workflow.GetLogger(ctx).Error("Compensation failed", "error", err) + } + } + } + + // Register compensation BEFORE running the activity + compensations = append(compensations, func(ctx workflow.Context) error { + _, err := releaseInventoryActivity.Run(ctx, ReleaseParams{OrderID: order.ID}) + return err + }) + if _, err := reserveInventoryActivity.Run(ctx, ReserveParams{OrderID: order.ID}); err != nil { + runCompensations() + return OrderResult{}, err + } + + compensations = append(compensations, func(ctx workflow.Context) error { + _, err := refundPaymentActivity.Run(ctx, RefundParams{OrderID: order.ID}) + return err + }) + if _, err := chargePaymentActivity.Run(ctx, ChargeParams{OrderID: order.ID}); err != nil { + runCompensations() + return OrderResult{}, err + } + + if _, err := shipOrderActivity.Run(ctx, ShipParams{OrderID: order.ID}); err != nil { + runCompensations() + return OrderResult{}, err + } + + return OrderResult{Status: "completed"}, nil +} +``` + +## Activity Heartbeating + +Heartbeating is unchanged from the standard SDK -- use `activity.RecordHeartbeat` and `activity.HasHeartbeatDetails`: + +```go +func processLargeFile(ctx context.Context, params ProcessFileParams) (ProcessFileResult, error) { + startIdx := 0 + if activity.HasHeartbeatDetails(ctx) { + if err := activity.GetHeartbeatDetails(ctx, &startIdx); err == nil { + startIdx++ + } + } + + lines := readFileLines(params.FilePath) + for i := startIdx; i < len(lines); i++ { + processLine(lines[i]) + activity.RecordHeartbeat(ctx, i) + if ctx.Err() != nil { + return ProcessFileResult{}, ctx.Err() + } + } + + return ProcessFileResult{Status: "completed"}, nil +} +``` + +## Timers + +```go +// Simple sleep +err := workflow.Sleep(ctx, time.Hour) + +// Timer as a Future (for use with Selector) +timerCtx, cancelTimer := workflow.WithCancel(ctx) +timer := workflow.NewTimer(timerCtx, 30*time.Minute) + +// Cancel the timer when no longer needed +cancelTimer() +``` diff --git a/skill/references/testing.md b/skill/references/testing.md new file mode 100644 index 0000000..98bc8dd --- /dev/null +++ b/skill/references/testing.md @@ -0,0 +1,227 @@ +# tempts Testing + +## Overview + +tempts provides three testing approaches: +1. **Unit tests** -- Full integration test with all implementations registered in a test environment +2. **Mock tests** -- Mock specific activities/workflows and test workflow logic in isolation +3. **Replay tests** -- Fixture-based tests that verify workflow backward compatibility + +All approaches use the standard Temporal `testsuite` package. + +## Unit Tests + +Register all implementations via `tempts.NewWorker`, then use `wrk.Register(we)` to register them in the test environment. Use `ExecuteInTest` to run the workflow and get typed results. + +```go +package myapp + +import ( + "testing" + + "github.com/vikstrous/tempts" + "go.temporal.io/sdk/testsuite" +) + +func TestGreetWorkflow(t *testing.T) { + // Create worker with all implementations (same as production) + wf := workflowTypeGreet.WithImplementation(greetWorkflow) + wrk, err := tempts.NewWorker(queueMain, []tempts.Registerable{ + activityTypeGreet.WithImplementation(greetActivity), + wf, + }) + if err != nil { + t.Fatal(err) + } + + // Set up test environment + ts := testsuite.WorkflowTestSuite{} + ts.SetDisableRegistrationAliasing(true) + we := ts.NewTestWorkflowEnvironment() + wrk.Register(we) + + // Execute and get typed result + result, err := wf.ExecuteInTest(we, GreetParams{Name: "World"}) + if err != nil { + t.Fatal(err) + } + if result.Message != "Hello, World" { + t.Fatalf("Expected 'Hello, World', got %s", result.Message) + } +} +``` + +`SetDisableRegistrationAliasing(true)` is required for tempts to work correctly with the test environment. + +## Testing Signals + +Use `RegisterDelayedCallback` to send signals during workflow execution: + +```go +func TestSignal(t *testing.T) { + wf := workflowTypeFormatAndGreet.WithImplementation(workflowFormatAndGreet) + wrk, err := tempts.NewWorker(queueMain, []tempts.Registerable{ + activityTypeFormatName.WithImplementation(activityFormatName), + activityTypeGreet.WithImplementation(activityGreet), + wf, + workflowTypeJustGreet.WithImplementation(workflowJustGreet), + }) + if err != nil { + t.Fatal(err) + } + + ts := testsuite.WorkflowTestSuite{} + ts.SetDisableRegistrationAliasing(true) + we := ts.NewTestWorkflowEnvironment() + wrk.Register(we) + + // Send signal after workflow starts + we.RegisterDelayedCallback(func() { + we.SignalWorkflow(signalUpdateSuffix.Name(), UpdateSuffixParams{Suffix: "!!!"}) + }, 0) + + result, err := wf.ExecuteInTest(we, FormatAndGreetParams{Name: "viktor"}) + if err != nil { + t.Fatal(err) + } + if result.Name != "VIKTOR!!!" { + t.Fatalf("Expected VIKTOR!!!, got %s", result.Name) + } +} +``` + +Use `signalUpdateSuffix.Name()` to get the signal channel name for `SignalWorkflow`. + +## Mock Tests + +Use `RegisterMockFallbacks` to register panicking fallbacks for all activities and workflows on a queue, then mock specific ones with `OnActivity`/`OnWorkflow`: + +```go +package myapp + +import ( + "testing" + + "github.com/stretchr/testify/mock" + "go.temporal.io/sdk/testsuite" +) + +func TestMocks(t *testing.T) { + ts := testsuite.WorkflowTestSuite{} + ts.SetDisableRegistrationAliasing(true) + we := ts.NewTestWorkflowEnvironment() + + // Register panicking fallbacks for all queue types + queueMain.RegisterMockFallbacks(we) + + // Mock specific activities/workflows + we.OnActivity(activityTypeFormatName.Name, mock.Anything, mock.Anything). + Return(FormatNameResult{Name: "VIKTOR"}, nil) + we.OnWorkflow(workflowTypeJustGreet.Name(), mock.Anything, mock.Anything). + Return(JustGreetResult{Name: "Hello, VIKTOR"}, nil) + + wf := workflowTypeFormatAndGreet.WithImplementation(workflowFormatAndGreet) + result, err := wf.ExecuteInTest(we, FormatAndGreetParams{Name: "viktor"}) + if err != nil { + t.Fatal(err) + } + if result.Name != "VIKTOR" { + t.Fatal("Expected VIKTOR, got", result.Name) + } +} +``` + +Use `activityTypeFormatName.Name` (field, not method) for the activity name in mocks, and `workflowTypeJustGreet.Name()` (method) for the workflow name. + +## Replay / Fixture Tests + +Replay tests verify that workflow code changes are backward compatible with previous executions. tempts provides `GetWorkflowHistoriesBundle` to capture histories and `ReplayWorkflow` to replay them. + +### Recording Fixtures + +Connect to a Temporal server with existing workflow executions and capture their histories: + +```go +func TestRecordFixtures(t *testing.T) { + c, err := tempts.Dial(client.Options{}) + if err != nil { + t.Fatal(err) + } + historiesData, err := tempts.GetWorkflowHistoriesBundle(ctx, c, workflowTypeFormatAndGreet) + if err != nil { + t.Fatal(err) + } + err = os.WriteFile("histories/format_and_greet.json", historiesData, 0o644) + if err != nil { + t.Fatal(err) + } +} +``` + +`GetWorkflowHistoriesBundle` fetches up to 10 open and 10 closed executions for the given workflow type. + +### Replaying from Fixtures + +```go +func TestReplayFormatAndGreet(t *testing.T) { + historiesData, err := os.ReadFile("histories/format_and_greet.json") + if err != nil { + t.Fatal(err) + } + err = tempts.ReplayWorkflow(historiesData, workflowFormatAndGreet, worker.WorkflowReplayerOptions{}) + if err != nil { + t.Fatal(err) + } +} +``` + +### Recording via Test Flag + +A common pattern is to use a test flag to toggle between recording and replaying: + +```go +var record bool + +func init() { + flag.BoolVar(&record, "tempts.record", false, "set this to update temporal history fixtures") +} + +func TestFormatAndGreetReplayability(t *testing.T) { + filename := fmt.Sprintf("histories/%s.json", workflowTypeFormatAndGreet.Name()) + testReplayability(t, workflowTypeFormatAndGreet, workflowFormatAndGreet, filename) +} + +func testReplayability(t *testing.T, workflowDeclaration tempts.WorkflowDeclaration, fn any, filename string) { + var historiesData []byte + if record { + ctx := context.Background() + c, err := tempts.Dial(client.Options{}) + if err != nil { + t.Fatal(err) + } + historiesData, err = tempts.GetWorkflowHistoriesBundle(ctx, c, workflowDeclaration) + if err != nil { + t.Fatal(err) + } + err = os.WriteFile(filename, historiesData, 0o644) + if err != nil { + t.Fatal(err) + } + } else { + var err error + historiesData, err = os.ReadFile(filename) + if err != nil { + t.Fatal(err) + } + } + + err := tempts.ReplayWorkflow(historiesData, fn, worker.WorkflowReplayerOptions{}) + if err != nil { + t.Fatal(err) + } +} +``` + +Record: `go test -run TestFormatAndGreetReplayability -tempts.record` + +Replay (CI): `go test -run TestFormatAndGreetReplayability`