Skip to content

keploy/orderflow

Folders and files

NameName
Last commit message
Last commit date

Latest commit

Β 

History

1 Commit
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 

Repository files navigation

OrderFlow β€” Real-time Order Processing Pipeline

A production-grade Go application demonstrating a real-time e-commerce order processing system with:

  • Producer API β†’ writes to sharded Postgres + S3 + publishes to Kafka
  • Consumer Service β†’ reads from Kafka only (no S3/Postgres access by design)
  • Frontend Dashboard β†’ real-time order placement and monitoring

Architecture

Frontend (index.html)
       β”‚
       β”‚  POST /api/orders
       β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  Producer API   β”‚  (Go HTTP Server, :8080)
β”‚  (main.go)      β”‚
β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜
     β”‚    β”‚    β”‚
     β”‚    β”‚    β”‚ 1. INSERT
     β”‚    β”‚    β–Ό
     β”‚    β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
     β”‚    β”‚  β”‚ Postgres Shard1 β”‚  β”‚ Postgres Shard2 β”‚
     β”‚    β”‚  β”‚ Users A-M :5432 β”‚  β”‚ Users N-Z :5433 β”‚
     β”‚    β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
     β”‚    β”‚
     β”‚    β”‚ 2. PUT (receipt .txt)
     β”‚    β–Ό
     β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
     β”‚  β”‚  S3 (LocalStack)     β”‚
     β”‚  β”‚  bucket:order-receiptsβ”‚
     β”‚  β”‚  port :4566          β”‚
     β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
     β”‚
     β”‚ 3. PUBLISH (order.created event)
     β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     CONSUME     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  Kafka          β”‚ ──────────────► β”‚  Consumer       β”‚
β”‚  topic: orders  β”‚                 β”‚  (Go Service)   β”‚
β”‚  port: :9092    β”‚                 β”‚  Kafka only     β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                 β”‚  ❌ No S3/Postgresβ”‚
                                    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Sharding Strategy

Orders are sharded by the first letter of user_id:

  • Shard 1 (port 5432): Users whose ID starts with A–M
  • Shard 2 (port 5433): Users whose ID starts with N–Z

Folder Structure

orderflow/
β”œβ”€β”€ docker-compose.yml          # All services orchestration
β”œβ”€β”€ frontend/
β”‚   └── index.html              # Dashboard UI
β”œβ”€β”€ producer/                   # Producer API (Go)
β”‚   β”œβ”€β”€ main.go                 # Entry point, HTTP server
β”‚   β”œβ”€β”€ Dockerfile
β”‚   β”œβ”€β”€ go.mod
β”‚   β”œβ”€β”€ config/
β”‚   β”‚   └── config.go           # ENV-based config
β”‚   β”œβ”€β”€ models/
β”‚   β”‚   └── order.go            # Order structs + Kafka event
β”‚   β”œβ”€β”€ handlers/
β”‚   β”‚   └── order.go            # HTTP handlers (create, list)
β”‚   β”œβ”€β”€ kafka/
β”‚   β”‚   └── producer.go         # Kafka producer client
β”‚   └── storage/
β”‚       β”œβ”€β”€ postgres.go         # Sharded Postgres (shard1 + shard2)
β”‚       └── s3.go               # S3 receipt upload
β”œβ”€β”€ consumer/                   # Consumer Service (Go)
β”‚   β”œβ”€β”€ main.go                 # Kafka consumer loop
β”‚   β”œβ”€β”€ Dockerfile
β”‚   β”œβ”€β”€ go.mod
β”‚   β”œβ”€β”€ config/
β”‚   β”‚   └── config.go
β”‚   └── handlers/
β”‚       └── processor.go        # Event processor (Kafka only, no S3/PG)
└── scripts/
    β”œβ”€β”€ init_shard1.sql         # Postgres schema for shard 1
    β”œβ”€β”€ init_shard2.sql         # Postgres schema for shard 2
    └── init_localstack.sh      # Creates S3 bucket on startup

Prerequisites

  • Docker and Docker Compose installed
  • Go 1.21+ (only for local dev without Docker)
  • librdkafka (only for local dev β€” brew install librdkafka on Mac)

Running with Docker Compose (Recommended)

Step 1 β€” Clone and navigate

cd orderflow

Step 2 β€” Make the LocalStack init script executable

chmod +x scripts/init_localstack.sh

Step 3 β€” Start all services

docker-compose up --build

Wait ~30 seconds for all services to initialize. You'll see:

producer  | βœ“ Connected to sharded Postgres (shard1: A-M, shard2: N-Z)
producer  | βœ“ S3 client initialized (bucket: order-receipts)
producer  | βœ“ Kafka producer connected to kafka:9092
producer  | βœ“ Producer API listening on :8080
consumer  | βœ“ Subscribed to topic 'orders' with group 'order-consumer-group'
consumer  | βœ“ Waiting for messages...

Step 4 β€” Open the Frontend

Open frontend/index.html in your browser (just double-click the file), or:

open frontend/index.html   # Mac
xdg-open frontend/index.html  # Linux

Running Locally (Without Docker)

Start dependencies only

docker-compose up zookeeper kafka postgres_shard1 postgres_shard2 localstack

Run Producer

cd producer
go mod tidy
export PG_SHARD1_DSN="postgres://orderuser:orderpass@localhost:5432/orders_shard1?sslmode=disable"
export PG_SHARD2_DSN="postgres://orderuser:orderpass@localhost:5433/orders_shard2?sslmode=disable"
export S3_ENDPOINT="http://localhost:4566"
export KAFKA_BROKERS="localhost:9092"
CGO_ENABLED=1 go run main.go

Run Consumer (separate terminal)

cd consumer
go mod tidy
export KAFKA_BROKERS="localhost:9092"
CGO_ENABLED=1 go run main.go

API Reference

Health Check

curl http://localhost:8080/health

Create Order

curl -X POST http://localhost:8082/api/orders \
  -H "Content-Type: application/json" \
  -d '{
    "user_id": "alice",
    "product_name": "Mechanical Keyboard",
    "quantity": 2,
    "price": 149.99
  }'

Response:

{
  "order": {
    "id": "uuid-here",
    "user_id": "alice",
    "product_name": "Mechanical Keyboard",
    "quantity": 2,
    "price": 149.99,
    "status": "created"
  },
  "shard": "shard1",
  "shard_info": "shard1 (A-M)",
  "receipt": "receipts/alice/uuid-here.txt"
}

List All Orders

curl http://localhost:8080/api/orders

List Orders by User

curl "http://localhost:8080/api/orders?user_id=alice"

Test Sharding

# This goes to Shard 1 (A-M)
curl -X POST http://localhost:8080/api/orders \
  -H "Content-Type: application/json" \
  -d '{"user_id":"alice","product_name":"Monitor","quantity":1,"price":399.99}'

# This goes to Shard 2 (N-Z)
curl -X POST http://localhost:8080/api/orders \
  -H "Content-Type: application/json" \
  -d '{"user_id":"zara","product_name":"Keyboard","quantity":2,"price":99.99}'

# Verify directly in Postgres
docker exec -it orderflow-postgres_shard1-1 psql -U orderuser -d orders_shard1 -c "SELECT id, user_id, product_name FROM orders;"
docker exec -it orderflow-postgres_shard2-1 psql -U orderuser -d orders_shard2 -c "SELECT id, user_id, product_name FROM orders;"

Verify S3 Receipts

aws --endpoint-url=http://localhost:4566 s3 ls s3://order-receipts/receipts/ --recursive

Watch Kafka Events

docker exec -it orderflow-kafka-1 kafka-console-consumer \
  --bootstrap-server localhost:9092 \
  --topic orders \
  --from-beginning

Environment Variables

Producer

Variable Default Description
KAFKA_BROKERS localhost:9092 Kafka broker addresses
PG_SHARD1_DSN local shard1 Postgres DSN for shard 1
PG_SHARD2_DSN local shard2 Postgres DSN for shard 2
S3_ENDPOINT http://localhost:4566 S3/LocalStack endpoint
S3_BUCKET order-receipts S3 bucket name
PORT 8080 HTTP server port

Consumer

Variable Default Description
KAFKA_BROKERS localhost:9092 Kafka broker addresses
KAFKA_GROUP_ID order-consumer-group Consumer group ID
KAFKA_TOPIC orders Topic to consume

Stop Everything

docker-compose down -v

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors