Skip to content

kingwill101/stem

Folders and files

NameName
Last commit message
Last commit date

Latest commit

Β 

History

555 Commits
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 

Repository files navigation

Stem Logo

Dart-native background job platform
Queues, retries, scheduling, workflows, and observability β€” all in pure Dart.

Pub Version CI Status License: MIT


Why Stem?

  • Pure Dart β€” No external worker processes, no FFI bindings. Runs anywhere Dart runs.
  • Pluggable backends β€” Swap between SQLite, Redis, or Postgres with a single line.
  • Battle-tested patterns β€” Retries with backoff, rate limiting, dead-letter queues, and priority scheduling.
  • Workflows β€” Durable, checkpointed execution for complex multi-step processes.
  • Canvas API β€” Compose tasks into groups, chains, and chords.
  • Observability β€” Built-in OpenTelemetry integration for traces and metrics.

Quick Start

import 'dart:async';

import 'package:stem/stem.dart';

class EmailTask extends TaskHandler<String> {
  @override
  String get name => 'email.send';

  @override
  Future<String> call(TaskContext ctx, Map<String, Object?> args) async {
    final to = args['to'] as String;
    return 'sent to $to';
  }
}

Future<void> main() async {
  final client = await StemClient.inMemory(tasks: [EmailTask()]);
  final worker = await client.createWorker();
  unawaited(worker.start());

  final taskId = await client.stem.enqueue(
    'email.send',
    args: {'to': 'hello@example.com'},
  );
  final result = await client.stem.waitForTask<String>(taskId);
  print('Result: ${result?.value}');

  await worker.shutdown();
  await client.close();
}

Architecture

                               PRODUCERS
            β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
            β”‚                   β”‚                   β”‚                  β”‚
            v                   v                   v                  v
       β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”        β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”        β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”        β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
       β”‚  Stem   β”‚        β”‚  Canvas  β”‚        β”‚ Workflow  β”‚        β”‚  Client  β”‚
       β”‚ Client  β”‚        β”‚ (chains, β”‚        β”‚   API     β”‚        β”‚  SDKs    β”‚
       β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”˜        β”‚ groups)  β”‚        β””β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”˜        β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”˜
            β”‚             β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”˜              β”‚                    β”‚
            β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                               β”‚
                               v
        β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
        β”‚                         BROKER                           β”‚
        β”‚ queues / leases / acks / nack / delayed / dlq             β”‚
        β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                        β”‚                           β”‚
                        v                           v
               β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”         β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
               β”‚  Workflow Engine β”‚         β”‚       Workers       β”‚
               β”‚  claim runs &    β”‚         β”‚  (many, independent)β”‚
               β”‚  schedule steps  β”‚         β””β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”˜
               β””β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                 β”‚       β”‚
                       β”‚                            β”‚       β”‚
            enqueue steps β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜       β”‚
                       β”‚                                    β”‚
                       v                                    v
            β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
            β”‚  Workflow Store  β”‚                  β”‚  Task Registry   β”‚
            β”‚  (runs/steps)    β”‚                  β”‚   & Handlers     β”‚
            β””β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                  β””β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                     β”‚                                     β”‚
                     v                                     v
            β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
            β”‚    Event Bus     β”‚                  β”‚  Result Backend  β”‚
            β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

        ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─

                              SCHEDULING

            β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
            β”‚ Beat Scheduler β”‚---->β”‚ Schedule Store β”‚
            β”‚     (cron)     β”‚     β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
            β””β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”˜             β”‚
                    β”‚                      β”‚
                    v                      v
                 β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”          β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
                 β”‚ Broker β”‚<---------β”‚ Lock Store β”‚
                 β””β”€β”€β”€β”€β”€β”€β”€β”€β”˜          β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
            (enqueues scheduled tasks / lease guards)

       ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─

                             ADAPTERS

         β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
         β”‚   SQLite   β”‚   β”‚   Redis    β”‚   β”‚  Postgres  β”‚
         β”‚  (local)   β”‚   β”‚ (streams)  β”‚   β”‚ (durable)  β”‚
         β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Packages

Package Description pub.dev
stem Core runtime: contracts, worker, scheduler, in-memory adapters, signals, Canvas, workflows pub
stem_cli Command-line tooling (stem executable) and CLI utilities pub
stem_memory In-memory adapter package (broker/backend/workflow/scheduler factories) pub
stem_sqlite SQLite broker and result backend for local dev/testing pub
stem_redis Redis Streams broker, result backend, and watchdog helpers pub
stem_postgres Postgres broker, result backend, and scheduler stores pub
stem_flutter Adapter-neutral Flutter helpers for mobile worker isolates and queue monitoring pub
stem_flutter_sqlite Flutter SQLite runtime/storage helpers for mobile Stem apps pub
stem_builder Build-time code generator for annotated tasks and workflows pub
stem_adapter_tests Shared contract test suites for adapter implementations pub
stem_dashboard Hotwire-based operations dashboard (experimental) β€”

Features

Task Options

TaskOptions(
  queue: 'high-priority',      // Target queue
  maxRetries: 5,               // Retry on failure
  priority: 10,                // Higher = processed first
  rateLimit: '100/m',          // Rate limiting
  softTimeLimit: Duration(seconds: 30),
  hardTimeLimit: Duration(minutes: 2),
  visibilityTimeout: Duration(minutes: 5),
)

Canvas β€” Task Composition

// Chain: sequential execution, results flow forward
await canvas.chain([
  task('resize', args: {'file': 'img.png'}),
  task('upload', args: {'bucket': 's3://photos'}),
  task('notify', args: {'channel': 'slack'}),
]);

// Group: parallel execution
await canvas.group([
  task('resize', args: {'size': 'small'}),
  task('resize', args: {'size': 'medium'}),
  task('resize', args: {'size': 'large'}),
]);

// Chord: parallel tasks + callback when all complete
await canvas.chord(
  [task('fetch.a'), task('fetch.b'), task('fetch.c')],
  callback: task('aggregate'),
);

Durable Workflows

final workflow = WorkflowScript('order.process', (wf) async {
  final validated = await wf.activity('validate', args: order);
  final charged = await wf.activity('charge', args: validated);
  
  await wf.sleep(Duration(hours: 24)); // Durable sleep!
  
  await wf.activity('ship', args: charged);
});

Cron Scheduling

final beat = BeatScheduler(
  broker: broker,
  scheduleStore: store,
  entries: [
    ScheduleEntry(
      name: 'daily-report',
      cron: '0 9 * * *',  // Every day at 9 AM
      task: task('reports.generate'),
    ),
  ],
);

CLI

# Run a worker
stem worker --queue default --concurrency 8

# Run the beat scheduler
stem schedule list

# Inspect dead-letter queue
stem dlq list
stem dlq retry <task-id>

# List registered tasks
stem tasks ls

# Health check
stem health

Development

Prerequisites

  • Dart 3.9.2+
  • Docker (for adapter integration tests)

Setup

# Clone the repository
git clone https://github.com/kingwill101/stem.git
cd stem

# Install dependencies
dart pub get

# Run quality gates
dart format --output=none --set-exit-if-changed .
dart analyze
task test:no-env

Adapter Tests

Integration tests require the Docker test stack:

# Run all package tests with Docker-backed integration env
task test

# Run coverage workflow for core adapters/runtime packages
task coverage

# Run targeted adapter suites (auto-bootstraps integration env)
task test:contract
task test:redis
task test:postgres

Targeted adapter tasks now bootstrap integration environment automatically. If bootstrap still fails (for example Docker unavailable), run:

source ./packages/stem_cli/_init_test_env

Capability flags and skip behavior for adapter contract suites are documented in packages/stem_adapter_tests/README.md.


Contributing

Contributions are welcome! Please read the contribution guidelines before submitting a PR.

  1. Fork the repository
  2. Create a feature branch (git checkout -b feature/amazing-feature)
  3. Commit your changes (git commit -m 'Add amazing feature')
  4. Push to the branch (git push origin feature/amazing-feature)
  5. Open a Pull Request

License

MIT License β€” see LICENSE for details.