A transport-only PostgreSQL logical replication client for receiving raw pgoutput payloads in Ruby.
pgoutput-client connects to PostgreSQL using logical replication, starts a pgoutput replication stream, receives CopyData messages, handles keepalives, sends standby feedback, and yields raw pgoutput payload bytes to downstream gems such as pgoutput-parser and pgoutput-decoder.
It intentionally does not parse row-change messages or decode PostgreSQL values.
- Ruby 3.4+
- PostgreSQL 10+
pggem- PostgreSQL publication and logical replication slot
PostgreSQL logical replication
│
▼
pgoutput-client
│
▼
CopyData / pgoutput payloads
│
▼
pgoutput-parser
│
▼
Protocol messages
│
▼
pgoutput-decoder
│
▼
Decoded row events
pgoutput-client is the transport layer only.
- Opens PostgreSQL logical replication connections
- Builds replication commands
- Supports
CREATE_REPLICATION_SLOT - Supports
DROP_REPLICATION_SLOT - Supports
START_REPLICATION SLOT ... LOGICAL ... - Parses XLogData envelopes
- Parses primary keepalive messages
- Builds standby feedback messages
- Provides LSN parse/format helpers
- Yields raw pgoutput payload bytes
- Includes RBS signatures
- Includes Minitest coverage
- No audit, parser, or decoder concerns
gem "pgoutput-client"Then:
bundle installRequire:
require "pgoutput-client"require "pgoutput-client"
client =
Pgoutput::Client::Runner.new(
database_url: ENV.fetch("DATABASE_URL"),
slot_name: "my_slot",
publication_names: ["my_publication"],
auto_create_slot: true
)
client.start do |payload, metadata|
puts "WAL end: #{metadata.wal_end_lsn}"
puts "Raw pgoutput payload bytes: #{payload.bytesize}"
endrequire "pgoutput-client"
require "pgoutput"
client = Pgoutput::Client::Runner.new(
database_url: ENV.fetch("DATABASE_URL"),
slot_name: "my_slot",
publication_names: ["my_publication"]
)
tracker = Pgoutput::RelationTracker.new
client.start do |payload, metadata|
message = tracker.process(payload)
p [metadata.wal_end_lsn, message]
endrequire "pgoutput-client"
require "pgoutput"
require "pgoutput/decoder"
tracker = Pgoutput::RelationTracker.new
decoder = Pgoutput::Decoder.new
client.start do |payload, metadata|
protocol_message = tracker.process(payload)
event = decoder.decode(protocol_message)
p [metadata.wal_end_lsn, event]
endPostgreSQL replication connection
│
▼
CopyData stream
│
▼
XLogData / Keepalive handling
│
▼
Raw pgoutput payloads
It owns:
- Replication connection setup
- Replication command generation
- CopyData reading
- XLogData envelope parsing
- Keepalive handling
- Standby status feedback
- LSN conversion
It does not:
- Parse pgoutput row messages
- Decode PostgreSQL OIDs
- Build application events
- Group transactions
- Run processor pipelines
- Manage Ractor worker pools
- Store audit records
- Own replay, checkpointing, deduplication, or sink ordering
Those responsibilities belong to higher layers, especially cdc-core and the sink that materializes downstream state.
If the live replication stream loses its connection, pgoutput-client retries a small number of times with a backoff and resumes from the latest confirmed WAL position.
It does not decide replay policy, deduplication strategy, checkpoint storage, or exactly-once delivery. Those concerns belong to the downstream CDC runtime and sink layer.
Example PostgreSQL setup:
ALTER SYSTEM SET wal_level = logical;
CREATE PUBLICATION my_publication FOR TABLE users, posts;Create a slot automatically:
Pgoutput::Client::Runner.new(
database_url: ENV.fetch("DATABASE_URL"),
slot_name: "my_slot",
publication_names: ["my_publication"],
auto_create_slot: true
)Or create the slot yourself:
SELECT * FROM pg_create_logical_replication_slot('my_slot', 'pgoutput');High-level facade.
client = Pgoutput::Client::Runner.new(...)
client.start { |payload, metadata| ... }Immutable configuration object.
Thin wrapper around PG::Connection for replication commands.
Consumes CopyData messages and yields pgoutput payloads.
Pgoutput::Client::LSN.parse("0/16B6C50")
Pgoutput::Client::LSN.format(23_817_296)Represents a WAL data envelope.
Represents a primary keepalive message.
Builds standby status update payloads.
The replication connection itself is stateful and ordered. It should normally run as a single reader.
Downstream parsing, decoding, and processing can be parallelized with Ractors:
pgoutput-client reader
│
▼
Ractor-safe queue
│
▼
parser / decoder / processor pools
Run them all
bundle exec rakebundle exec rake rubocopbundle exec rake testWith coverage:
COVERAGE=true bundle exec rake testbundle exec rbs:validatebundle exec rake yardRun the full Docker-backed E2E flow and clean up afterward:
script/test-e2eKeep PostgreSQL running after the test for debugging:
KEEP_E2E_POSTGRES=1 script/test-e2eYou can also run the steps manually:
script/e2e-up
PGOUTPUT_CLIENT_E2E=1 bundle exec rake test:e2e
script/e2e-downEquivalent Rake task:
bundle exec rake e2e:runpgoutput-client owns PostgreSQL logical replication transport and lifecycle
management. It opens the replication connection, optionally creates the logical
replication slot, starts streaming, sends standby status feedback, and retries
reconnectable failures.
Long-running replication streams can be quiet for long periods when no WAL changes are produced. During those idle periods the client wakes periodically and sends standby status feedback so PostgreSQL does not terminate the walsender for replication timeout.
Control the feedback cadence with feedback_interval:
runner = Pgoutput::Client::Runner.new(
database_url: ENV.fetch("DATABASE_URL"),
slot_name: "mammoth_live",
publication_names: ["mammoth_publication"],
feedback_interval: 10.0
)When auto_create_slot is enabled, the client treats slot creation as
"ensure this slot exists". Missing slots are created before streaming; existing
slots are reused and do not cause startup failure.
runner = Pgoutput::Client::Runner.new(
database_url: ENV.fetch("DATABASE_URL"),
slot_name: "mammoth_live",
publication_names: ["mammoth_publication"],
auto_create_slot: true,
temporary_slot: false
)Publication creation remains outside this gem. Create publications through application migrations, database bootstrap SQL, or infrastructure tooling.
After a stream has connected successfully, transient PostgreSQL outages are retried through the reconnect lifecycle. This includes ordinary container or process restart windows where PostgreSQL temporarily refuses connections or reports that the database system is starting up.
MIT.