Skip to content

mishudark/cloudpebble

Repository files navigation

CloudPebble

CloudPebble is a Google Cloud Bigtable v2 compatible wide-column database built on Pebble. You can use the official cloud.google.com/go/bigtable client library or any Bigtable v2 SDK to read and write data, with durable object-storage persistence implementing an architecture similar to turbopuffer.

A local Pebble instance serves as a read-optimized NVMe/SSD cache. All writes are durably committed to object storage via a write-ahead log. Data is asynchronously indexed into SSTs and uploaded for cold reads and crash recovery.

Why CloudPebble

  • Sub-millisecond cached reads. Reads hit a local Pebble LSM tree — no network roundtrip.
  • Immediate read-your-writes. Writes are applied to the local memtable after object-storage durability, so a single node sees its own writes instantly (strong consistency).
  • Embedded or networked. Use CloudPebble as an in-process Go library (pkg/engine), or deploy it as a Bigtable-compatible gRPC server (cmd/pebble-bigtable) accessible from any Bigtable v2 SDK. Same engine, two deployment modes.
  • Incremental checkpoints. Only new or changed SST files are uploaded to object storage. Unchanged files are skipped, minimizing egress costs and sync time.
  • WAL batching with configurable window. Concurrent writes within the same window are coalesced into a single object-storage object, amortizing the ~100ms roundtrip across many writers. At 200ms batch window and 50k concurrent goroutines, extrapolated throughput exceeds 680k ops/sec.
  • Production-grade Bigtable v2 API over object storage. A gRPC server implementing MutateRow, ReadRows, CheckAndMutateRow, ReadModifyWriteRow, SampleRowKeys, and 16 RowFilter types — mapping Bigtable's wide-column model onto Pebble. Fully verified with the official cloud.google.com/go/bigtable client library across 33 integration tests + fuzz harness.
  • Multi-backend object storage. A minimal Store interface (Put/Get/Delete/List/Attrs) with GCS and local filesystem backends. Adding S3 or Azure Blob requires implementing a single interface.
  • OpenTelemetry-native metrics. Counters and latency histograms registered as OTEL observable instruments — no Prometheus bridge needed.
  • Namespace isolation. Each tenant gets its own prefix in object storage with independent Pebble DB, checkpoints, and WAL sequences. No cross-tenant data mixing.
  • Crash recovery with cold-miss self-healing. Nodes restart from object-storage checkpoints and replay uncommitted WALs. A cold-miss detector triggers background recovery if consecutive cache misses suggest stale or missing local data.
  • Async WAL mode for persistent SSD. An optional inverted write path: writes are fsync'd to a local WAL file on SSD and acknowledged immediately. The object-storage upload happens asynchronously in the background. On crash, local WAL files are replayed — zero data loss if the SSD survives.
                        ╔═══════════ cloudpebble ═══════════════════╗
╔════════════╗          ║                                           ║
║   client   ║───API──▶ ║  ┏━━━━━━━━━━━━━━━━┓    ┏━━━━━━━━━━━━━━┓  ║
║            ║          ║  ┃  Local Pebble   ┃───▶┃  GCS / S3 /  ┃  ║
╚════════════╝          ║  ┃ (Memory + SSD)  ┃    ┃  Azure Blob  ┃  ║
                        ║  ┗━━━━━━━━━━━━━━━━┛    ┗━━━━━━━━━━━━━━┛  ║
                        ╚══════════════════════════════════════════╝

Architecture

Write Path (Strong Consistency)

Every write creates an immutable WAL object in object storage. Once the WAL is durably committed, the write is acknowledged. The batch is also applied to a local Pebble instance for fast reads.

                    Set(key, value)
                         │
                         ▼
              ┌──────────────────────┐
              │  Encode Pebble batch │
              └──────────┬───────────┘
                         │
              ┌──────────▼───────────┐
              │  Write WAL to object │  ── durability barrier (~100ms)
              │  storage             │
              │  {ns}/wal/{seq}.wal  │
              └──────────┬───────────┘
                         │
              ┌──────────▼───────────┐
              │  Apply to local      │
              │  Pebble memtable     │  ── visible immediately (~µs)
              └──────────┬───────────┘
                         │
              ┌──────────▼───────────┐
              │  Acknowledge write   │
              └──────────────────────┘

Write Path (Async WAL)

When AsyncWAL is enabled, writes are fsync'd to a local WAL file on persistent SSD first, applied to the in-memory memtable, and acknowledged immediately. The WAL upload to object storage happens asynchronously in the background via the batch window.

                    Set(key, value)
                         │
                         ▼
              ┌──────────────────────┐
              │  Encode Pebble batch │
              └──────────┬───────────┘
                         │
              ┌──────────▼───────────┐
              │  Write local WAL     │  ── fsync to SSD (~µs)
              │  localwal/{seq}.wal  │
              └──────────┬───────────┘
                         │
              ┌──────────▼───────────┐
              │  Apply to local      │
              │  Pebble memtable     │  ── in-memory (~µs)
              └──────────┬───────────┘
                         │
              ┌──────────▼───────────┐
              │  Acknowledge write   │  ── data safe on SSD
              └──────────────────────┘
                         │  ... background ...
                         ▼
              ┌──────────────────────┐
              │  Upload WAL to       │  ── async via batch window
              │  object storage      │
              └──────────┬───────────┘
                         │ on success:
                         ▼
              ┌──────────────────────┐
              │  Delete local WAL    │
              └──────────────────────┘

On crash recovery, local WAL files are replayed on top of the object-storage checkpoint, so no data is lost — provided the local SSD survives the crash (not safe on ephemeral storage like Cloud Run).

When batching is enabled (BatchWindow > 0, default 200ms), concurrent writes within the same window are coalesced into a single GCS WAL object.

                            Time ──────────────────────────────▶

  Write A ──┐
  Write B ──┤─── Batch window (200ms) ───┐
  Write C ──┘                         │
                                       ▼
                              ┌────────────────┐
                              │  Single WAL     │
                              │  object {seq}   │──▶ GCS
                              │  (A + B + C)    │
                              └────────────────┘

Read Path

  Get(key)
       │
       ▼
  ┌─────────────┐    miss    ┌──────────────────┐
  │ Local Pebble │──────────▶│ Cold-miss recovery│
  │ (mem + SSTs) │           │ (download SSTs +  │
  │              │           │  replay WALs from │
  └──────┬───────┘           │  object storage)  │
         │ hit               └────────┬─────────┘
         ▼                             │
  ┌─────────────┐                      │
  │ Return value │                     ▼
  │   (~ms)      │            ┌───────────────┐
  └──────────────┘            │ Return value   │
                              │   (~400ms)     │
                              └───────────────┘

Async Flush + Upload (Background Indexer)

A background goroutine periodically flushes memtables to local SSTs, uploads them to object storage, and garbage-collects old WAL entries.

  syncLoop (every 30s)
       │
       ▼
  ┌──────────────┐
  │  AsyncFlush() │      New writes continue into next memtable
  │  (mem → SST)  │
  └──────┬───────┘
         │
         ▼
  ┌──────────────┐
  │ Checkpoint()  │      Consistent snapshot of LSM state
  │ (MANIFEST +   │
  │  SSTs)        │
  └──────┬───────┘
         │
         ▼
  ┌──────────────┐
  │ Upload new    │      Incremental: only upload changed SSTs
  │ SSTs to GCS   │
  └──────┬───────┘
         │
         ▼
  ┌──────────────┐
  │ Write manifest│      {version, max_wal_seq, files with checksums}
  │ to GCS        │
  └──────┬───────┘
         │
         ▼
  ┌──────────────┐
  │ GC old WALs   │      Delete WAL objects covered by checkpoint
  │ + orphans     │
  └──────────────┘

Recovery (Cold Start / Node Restart)

  Open(namespace)
       │
       ▼
  ┌─────────────────┐
  │ Read manifest    │
  │ from GCS         │
  └────────┬────────┘
           │
    ┌──────▼──────┐
    │ Has          │
    │ manifest?    │
    └──┬──────┬───┘
       │ no   │ yes
       ▼      ▼
  ┌────────┐  ┌───────────────────┐
  │ Start  │  │ Download SSTs +    │
  │ fresh  │  │ MANIFEST from GCS  │
  └───┬────┘  │ Verify checksums   │
      │       └────────┬──────────┘
      │                │
      └───────┬────────┘
              ▼
  ┌───────────────────────┐
  │ Open local Pebble     │
  └───────────┬───────────┘
              │
  ┌───────────▼───────────┐
  │ Consistency mode?     │
  └──┬────────────────┬───┘
     │ Strong         │ Eventual
     ▼                ▼
  ┌──────────────┐  ┌──────────────┐
  │ Replay all    │  │ Skip WAL      │
  │ unflushed WALs│  │ replay. Serve │
  │ from GCS      │  │ from checkpoint│
  └──────┬───────┘  │ Self-heal via  │
         │          │ background loop│
         ▼          └───────────────┘
  ┌──────────────┐
  │ Start serving │
  └──────────────┘

GCS Object Layout

{namespace}/
├── manifest                        # Current manifest: {version, max_wal_seq, files[]}
├── manifests/
│   ├── 000001.json                 # Version history (last 10 kept)
│   └── 000002.json
├── data/
│   ├── MANIFEST-000003             # Pebble MANIFEST
│   ├── 000004.sst                  # SST files
│   ├── 000005.sst
│   ├── OPTIONS-000002              # Pebble options
│   └── marker.*                    # Pebble version markers
└── wal/
    ├── 00000000000000000001.wal    # Immutable WAL objects
    └── 00000000000000000002.wal    # (zero-padded 20-digit seq num)

Consistency Models

Mode Open latency Startup WAL replay Staleness window Self-healing
ConsistencyStrong Higher Yes — replay all WALs None (current) Immediate
ConsistencyEventual Lower No — skip WALs Up to last checkpoint Background walReplayLoop

Eventual consistency converges to strong over time as the background WAL replay loop catches up and the periodic Sync uploads new checkpoints.

Bigtable API (gRPC)

CloudPebble includes a Google Cloud Bigtable v2 compatible gRPC server that maps Bigtable's wide-column data model onto Pebble's key-value store. Each Bigtable table is backed by a separate CloudPebble namespace with its own Pebble DB + object storage durability.

The implementation follows the same key-encoding approach described in Pinterest's Rockstorewidecolumn.

Data Model Mapping

Bigtable:  table / row_key / column_family / column_qualifier / timestamp → value
Pebble:    [escaped_row_key][00][00][family_len:1][family][00][qual_len:2][qualifier][00][inverted_ts:8] → value

Row keys use null-escape encoding (0x000x00 0xFF) with a 0x00 0x00 terminator, preserving lexicographic ordering for correct forward/reverse scans and prefix ranges. Timestamps are inverted (math.MaxInt64 - ts) so the newest cells sort first under Pebble's ascending lexicographic order.

Supported RPCs

RPC Status Notes
ReadRows Done Server-streaming CellChunk protocol, RowSet/RowRange support, rows_limit
MutateRow Done Atomic row-level mutations via Pebble Batch
MutateRows Done Batch mutations with per-entry google.rpc.Status
CheckAndMutateRow Done Atomic conditional mutate with predicate filter
SampleRowKeys Done Split-key sampling from SST file boundaries
PingAndWarm Done Health check + cache warming
OpenTable Done Session-based bidirectional streaming protocol
ReadModifyWriteRow Done Atomic append/increment via IndexedBatch
Change stream RPCs Stub Returns Unimplemented
SQL RPCs Stub Returns Unimplemented

RowFilter Support

Filter Status
chain Done
interleave Done (with duplicate suppression)
condition Done
pass_all_filter / block_all_filter Done
row_key_regex_filter Done
family_name_regex_filter Done
column_qualifier_regex_filter Done
column_range_filter Done
timestamp_range_filter Done
cells_per_row_offset_filter Done
cells_per_row_limit_filter Done
cells_per_column_limit_filter Done
strip_value_transformer Done
apply_label_transformer Done
value_regex_filter Done
value_range_filter Done
row_sample_filter Done (crypto-random)
sink No (treated as pass_all)

Running the Bigtable Server

# Strong consistency (WAL → object storage first, then acknowledge)
go run ./cmd/pebble-bigtable/ --addr :9000 --data-dir /tmp/btdb --object-dir /tmp/btobj

# Async WAL (local SSD first, acknowledge immediately, async object-storage upload)
go run ./cmd/pebble-bigtable/ --addr :9000 --async-wal --data-dir /tmp/btdb --object-dir /tmp/btobj

Using with a Bigtable Client

import (
    "github.com/mishudark/cloudpebble/pkg/bigtable/bigtablepb"
    "google.golang.org/grpc"
    "google.golang.org/grpc/credentials/insecure"
)

conn, _ := grpc.NewClient("localhost:9000", grpc.WithTransportCredentials(insecure.NewCredentials()))
client := bigtablepb.NewBigtableClient(conn)

// Direct RPCs
resp, _ := client.MutateRow(ctx, &bigtablepb.MutateRowRequest{
    TableName: "my-table",
    RowKey:    []byte("user123"),
    Mutations: []*bigtablepb.Mutation{
        {Mutation: &bigtablepb.Mutation_SetCell_{
            SetCell: &bigtablepb.Mutation_SetCell{
                FamilyName:      "profile",
                ColumnQualifier: []byte("name"),
                TimestampMicros: -1,
                Value:           []byte("Alice"),
            },
        }},
    },
})

Architecture

Bigtable gRPC client
       │
       ▼
┌──────────────────────────────────────┐
│  pkg/bigtable/Server                  │
│  ├─ MutateRow → pebble.Batch.Apply    │
│  ├─ ReadRows  → pebble.Iterator       │
│  ├─ OpenTable → bidi session stream   │
│  └─ RowFilter → filter evaluator tree │
└──────────────┬───────────────────────┘
               │
    ┌──────────▼──────────┐
    │  pkg/engine/Engine   │  per-table namespace
    │  ├─ Pebble (cache)   │
    │  └─ GCS/local (WAL)  │
    └─────────────────────┘

Usage (Embedded Engine)

package main

import (
    "context"
    "log"

    "cloud.google.com/go/storage"
    "github.com/mishudark/cloudpebble/pkg/engine"
    "github.com/mishudark/cloudpebble/pkg/objstore/gcs"
)

func main() {
    store, _ := gcs.New("my-bucket", "cloudpebble/")

    e, err := engine.Open(engine.Options{
        Dir:       "/nvme/cloudpebble",
        Store:     store,
        Namespace: "my-namespace",
    })
    if err != nil {
        log.Fatal(err)
    }
    defer e.Close()

    // Write — durably committed to GCS before returning.
    e.Set(context.Background(), []byte("hello"), []byte("world"))

    // Read — served from local Pebble cache.
    val, err := e.Get([]byte("hello"))
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("hello = %s\n", val)

    // Access the underlying Pebble DB for advanced operations.
    iter, _ := e.DB().NewIter(nil)
    for iter.First(); iter.Valid(); iter.Next() {
        fmt.Printf("%s = %s\n", iter.Key(), iter.Value())
    }
    iter.Close()

    // Metrics.
    snap := e.Metrics().Snapshot()
    fmt.Printf("Sets: %d, Gets: %d, Hits: %d, Misses: %d\n",
        snap.Sets, snap.Gets, snap.GetHits, snap.GetMisses)
}

Using the Local Store (for development)

import "github.com/mishudark/cloudpebble/pkg/objstore/local"

store, _ := local.New("/tmp/cloudpebble-objstore")
e, _ := engine.Open(engine.Options{
    Dir:       "/tmp/cloudpebble-cache",
    Store:     store,
    Namespace: "dev",
})

Configuration

Option Default Description
Dir os.TempDir() Local directory for the Pebble cache
Store required Object storage backend (gcs.Store, local.Store, ...)
Namespace "default" Tenant/namespace prefix in object storage
SyncInterval 30s Background checkpoint upload interval
BatchWindow 200ms WAL batching window (negative = disabled)
ColdMissThreshold 3 Consecutive misses before triggering recovery
Consistency Strong Strong or Eventual
AsyncWAL false Fsync to local WAL on SSD first, acknowledge immediately, upload to object storage asynchronously. Safe only on persistent SSD.
OrphanWALTTL 1h Delete orphan WAL objects older than this
MaxLocalBytes 0 (unlimited) Soft limit on local Pebble cache size
PebbleOptions defaults Passed through to pebble.Open()

Object Storage Interface

Backends implement the Store interface:

type Store interface {
    io.Closer
    Put(ctx context.Context, path string, data []byte) error
    Get(ctx context.Context, path string) ([]byte, error)
    Delete(ctx context.Context, path string) error
    List(ctx context.Context, prefix string) ([]string, error)
    Exists(ctx context.Context, path string) (bool, error)
    Attrs(ctx context.Context, path string) (ObjectInfo, error)
}
Backend Package Requires
Google Cloud Storage objstore/gcs GCP credentials
Local filesystem objstore/local None (dev/test)

To add a new backend, implement the Store interface and pass it to engine.Options.Store.

Project Structure

cloudpebble/
├── BIGTABLE_CLIENT_GUIDE.md           # Using the official Bigtable Go client
├── BIGTABLE_PLAN.md                   # Bigtable API implementation plan
├── DESIGN.md                         # Architecture design document
├── PLAN.md                           # Implementation plan (shortcuts → production)
├── cmd/
│   ├── cloudpebble/main.go           # Demo CLI
│   ├── pebble-bigtable/main.go       # Bigtable gRPC server
│   ├── test-bigtable-client/main.go   # Official Bigtable client integration tests
│   ├── test-recovery/main.go         # Crash recovery integration test
│   ├── test-incremental/main.go      # Incremental upload test
│   ├── test-eventual/main.go         # Eventual consistency test
│   └── test-coldmiss/main.go         # Cold miss recovery test
├── pkg/
│   ├── bigtable/
│   │   ├── server.go                 # gRPC server + table registry
│   │   ├── session.go                # OpenTable bidi stream handler
│   │   ├── encoding.go               # Bigtable → Pebble key encoding
│   │   ├── mutate.go                 # MutateRow / MutateRows / CheckAndMutateRow
│   │   ├── readrows.go               # ReadRows: CellChunk streaming
│   │   ├── filter.go                 # RowFilter evaluation engine
│   │   ├── samplekeys.go             # SampleRowKeys
│   │   ├── readmodifywrite.go         # ReadModifyWriteRow
│   │   ├── types.go                  # RowProcessor callback type
│   │   └── bigtablepb/               # Generated proto Go code
│   ├── objstore/
│   │   ├── store.go                  # Store interface + ObjectInfo
│   │   ├── gcs/gcs.go                # Google Cloud Storage backend
│   │   ├── local/local.go            # Local filesystem backend
│   │   └── testutil/testutil.go      # Contract test harness
│   ├── walcloud/
│   │   └── manager.go                # WAL manager (write/read/list/GC/batch)
│   └── engine/
│       ├── engine.go                 # Core engine (Open/Set/Get/Sync/Close)
│       ├── metrics.go                # Metrics counters and latency histograms
│       └── engine_test.go            # Unit tests
└── go.mod / go.work / go.sum

Testing

# Run all unit tests (no credentials needed)
go test ./pkg/...

# Run Bigtable client integration test (33 tests using official client)
go run ./cmd/test-bigtable-client/
go run ./cmd/test-bigtable-client/ --fuzz 500

# Run storage integration tests (require local filesystem only)
go run ./cmd/test-recovery/ step1 && go run ./cmd/test-recovery/ step2
go run ./cmd/test-incremental/
go run ./cmd/test-eventual/ step1 && go run ./cmd/test-eventual/ step2
go run ./cmd/test-coldmiss/

Unit tests use objstore/local (no network, no credentials). The GCS backend has a contract test harness (testutil.RunContractTests) that all backends must pass.

Benchmarks

Results on i7-8565U (4c/8t), local objstore backend, -benchtime=3s.

Single-goroutine throughput (sequential keys, 200ms WAL batch window)

Benchmark µs/op Ops/sec Bytes/op Allocs/op
MutateRow (1 cell) 30.1 33,217 1,002 15
MutateRow (10 cells) 32.7 30,595 1,520 24
MutateRows/batch=10 36.8 27,159 batches 3,805 76
MutateRows/batch=50 66.2 15,105 batches 14,660 318
MutateRows/batch=100 99.8 10,025 batches 28,445 619
ReadModifyWriteRow 35.3 28,308 2,052 32
ReadRows (10k rows) 5,859 171 scans 4.9MB 90,225
SequentialWrites 29.6 33,821
ParallelWrites 19.3 51,927

MutateRows scaling with batching

Concurrency 100ms window ops/sec 200ms window ops/sec
1 10 ~20
100 980 ~1,960
1,000 9,700 ~19,400
50,000 341,000 ~682,000

The right column is extrapolated: halving the batch window doubles the coalescing frequency, proportionally increasing batching throughput.

Key takeaways

  • No batching is CPU-bound on store.Put at ~28-35k ops/sec regardless of concurrency — each write does synchronous file I/O after an atomic seq allocation.
  • 200ms batching (current default) coalesces writes within each window. At 50k goroutines this yields ~682k ops/sec extrapolated.
  • ReadRows is allocation-heavy: 90k allocs per 10k-row scan comes from CellChunk construction per cell.

About

CloudPebble is a Bigtable compatible wide-column database with durable object storage implementing an architecture similar to turbopuffer

Topics

Resources

License

Contributing

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages