Intelligent Gmail automation with AI-powered email classification, response generation, and draft creation using FastAPI, Celery, and LLM integration.
AI Email Assistant is a production-grade email automation system that leverages Large Language Models to intelligently process incoming Gmail messages. The system automatically classifies emails, generates contextually relevant AI-powered responses, and creates draft replies β all while maintaining high reliability through distributed task processing and comprehensive monitoring.
Key Capabilities:
- Real-time email processing via Google Cloud Pub/Sub webhooks
- AI-powered email classification (needs_reply, spam, informational)
- Context-aware response generation using OpenAI GPT-4 or Anthropic Claude
- Automatic draft creation in Gmail
- Distributed task processing with Celery workers
- Production-ready observability stack (Prometheus, Grafana, Loki)
- Architecture
- Tech Stack
- Features
- Prerequisites
- Installation
- Configuration
- Usage
- Monitoring
- API Documentation
- Project Structure
- Development
- Testing
- Deployment
- Troubleshooting
- Contributing
- License
βββββββββββββββ ββββββββββββββββ βββββββββββββββ
β Gmail βββββββΆβ Pub/Sub βββββββΆβ FastAPI β
β Inbox β β Webhook β β Webhook β
βββββββββββββββ ββββββββββββββββ ββββββββ¬βββββββ
β
βΌ
βββββββββββββββββ
β PostgreSQL β
β (Task Queue) β
βββββββββ¬ββββββββ
β
ββββββββββββββββββββββββββΌββββββββββββββββββββββββββ
β β β
βΌ βΌ βΌ
ββββββββββββ ββββββββββββ ββββββββββββ
β Celery β β Celery β β Celery β
β Worker β β Worker β β Worker β
β (classify)β β (generate)β β (send) β
ββββββ¬ββββββ ββββββ¬ββββββ ββββββ¬ββββββ
β β β
ββββββββββββββββ¬ββββββββ΄ββββββββββββββββββββββββ
βΌ
βββββββββββββββββββ
β OpenAI API β
β Claude API β
β Gmail API β
βββββββββββββββββββ
Processing Pipeline:
- Webhook Reception: Gmail sends notification via Pub/Sub to FastAPI endpoint
- Task Creation: FastAPI validates webhook, stores email metadata in PostgreSQL
- Classification: Celery worker fetches email content, sends to LLM for categorization
- Response Generation: If email needs reply, separate worker generates AI response
- Draft Creation: Final worker creates Gmail draft using generated content
- Monitoring: All steps tracked via Prometheus metrics, logs aggregated in Loki
Backend & API:
Database & Storage:
- PostgreSQL 15+ β Primary database with SQLAlchemy 2.0 ORM
- Alembic β Database migrations
- Redis 7+ β Message broker and result backend (AOF persistence)
Task Processing:
- Celery 5.3+ β Distributed task queue
- Retry logic with exponential backoff
- Dead-letter queue for failed tasks
AI & External APIs:
- OpenAI API β GPT-4 for email processing (primary)
- Anthropic Claude API β Fallback LLM provider
- Gmail API β Email fetching and draft creation
- Google Cloud Pub/Sub β Real-time webhook notifications
Observability:
- Prometheus β Metrics collection
- Grafana β Visualization and dashboards
- Loki β Log aggregation
- Promtail β Log shipping
- Alertmanager β Alert routing
- structlog β Structured JSON logging
Infrastructure:
- β Real-time Email Processing β Webhook-triggered immediate processing
- β AI Classification β Automatic categorization (needs_reply, informational, spam)
- β Context-Aware Responses β LLM analyzes email content and generates replies
- β Gmail Draft Creation β Automatic draft creation in Gmail UI
- β Distributed Workers β Celery-based horizontal scaling
- β Exponential Backoff β Automatic retry on API failures (5 attempts)
- β
Dead-Letter Queue β Failed tasks stored in
failed_taskstable - β Deduplication β Prevents duplicate processing via email_id tracking
- β Gmail Watch Renewal β Scheduled job maintains Pub/Sub subscription (every 6 days)
- β Circuit Breaker β Fallback to Claude on OpenAI outage
- β Custom Metrics β Task duration, API latency, queue depth tracking
- β Structured Logs β JSON logs with correlation IDs across services
- β Pre-built Dashboards β Grafana dashboards for system health
- β Alerting β Configurable alerts for quota exhaustion, worker starvation
- β OAuth 2.0 Integration β Secure Gmail authentication
- β Rate Limiting β Protection against webhook abuse
- β Input Sanitization β HTML stripping with BeautifulSoup
- β Pub/Sub Token Verification β OIDC token validation
- Python 3.11 or higher
- Docker 20.10+ and Docker Compose 2.0+
- Gmail Account with API access enabled
- Google Cloud Project with Pub/Sub API enabled
- OpenAI API Key or Anthropic API Key
Optional for production:
- PostgreSQL 15+ (if not using Docker)
- Redis 7+ (if not using Docker)
git clone https://github.com/[YOUR_USERNAME]/ai-email-assistant.git
cd ai-email-assistant- Go to Google Cloud Console
- Create new project or select existing
- Enable Gmail API and Cloud Pub/Sub API
- Create OAuth 2.0 credentials:
- Application type: Desktop app
- Download
credentials.jsonβ place in project root
- Set up Pub/Sub topic and subscription:
gcloud pubsub topics create gmail-notifications gcloud pubsub subscriptions create gmail-push-subscription \ --topic=gmail-notifications \ --push-endpoint=https://[YOUR_DOMAIN]/api/webhook/gmail
python scripts/oauth_flow.pyThis generates token.json with refresh token. Follow browser prompts to authorize.
cp .env.example .envEdit .env:
# Database
DATABASE_URL=postgresql+asyncpg://postgres:password@postgres:5432/email_assistant
# Redis
REDIS_URL=redis://redis:6379/0
# Gmail API
GMAIL_CLIENT_ID=your-client-id.apps.googleusercontent.com
GMAIL_CLIENT_SECRET=your-client-secret
GMAIL_USER_EMAIL=your-email@gmail.com
# LLM Providers
OPENAI_API_KEY=sk-...
ANTHROPIC_API_KEY=sk-ant-... # Optional fallback
USE_ANTHROPIC_FALLBACK=true
# Google Cloud
PUBSUB_PROJECT_ID=your-gcp-project-id
PUBSUB_TOPIC_NAME=gmail-notifications
PUBSUB_SUBSCRIPTION_NAME=gmail-push-subscription
# Application
LOG_LEVEL=INFO
ENVIRONMENT=developmentdocker-compose up --buildServices will be available at:
- FastAPI API: http://localhost:8000
- API Docs: http://localhost:8000/docs
- Grafana: http://localhost:3000 (admin/admin)
- Prometheus: http://localhost:9090
docker-compose exec api alembic upgrade headdocker-compose exec api python scripts/setup_watch.py| Variable | Description | Default | Required |
|---|---|---|---|
DATABASE_URL |
PostgreSQL connection string | β | β |
REDIS_URL |
Redis connection string | β | β |
GMAIL_CLIENT_ID |
OAuth 2.0 client ID | β | β |
GMAIL_CLIENT_SECRET |
OAuth 2.0 client secret | β | β |
GMAIL_USER_EMAIL |
Target Gmail account | β | β |
OPENAI_API_KEY |
OpenAI API key | β | β |
ANTHROPIC_API_KEY |
Anthropic API key (fallback) | β | β |
USE_ANTHROPIC_FALLBACK |
Enable Claude fallback | false |
β |
PUBSUB_PROJECT_ID |
GCP project ID | β | β |
PUBSUB_TOPIC_NAME |
Pub/Sub topic | gmail-notifications |
β |
LOG_LEVEL |
Logging verbosity | INFO |
β |
CELERY_CONCURRENCY |
Worker concurrency | 4 |
β |
Edit src/services/ai_service.py to customize:
CLASSIFICATION_PROMPT = """
Analyze the email and categorize it:
- needs_reply: Requires response
- informational: FYI only
- spam: Unsolicited content
"""
REPLY_GENERATION_PROMPT = """
Generate a professional response to:
Subject: {subject}
From: {sender}
Content: {body}
"""Modify src/utils/limiter.py:
gmail_limiter = RateLimiter(
max_requests=50, # Max requests per window
window_seconds=60 # Time window in seconds
)Trigger processing for specific email:
curl -X POST http://localhost:8000/api/process-email \
-H "Content-Type: application/json" \
-d '{"email_id": "18f3a2b1c5d4e6f7"}'curl http://localhost:8000/api/task-status/{task_id}Response:
{
"task_id": "abc-123",
"email_id": "18f3a2b1c5d4e6f7",
"status": "completed",
"stage": "draft_created",
"created_at": "2026-04-16T10:30:00Z",
"completed_at": "2026-04-16T10:30:15Z",
"ai_response": {
"category": "needs_reply",
"confidence": 0.95,
"generated_reply": "Thank you for your inquiry..."
}
}Query dead-letter queue:
curl http://localhost:8000/api/failed-tasksAccess pre-configured dashboards at http://localhost:3000:
-
System Overview
- API request rate and latency (p50, p95, p99)
- Celery queue depth
- Worker health status
-
Email Pipeline
- Emails processed per hour
- Classification distribution
- AI response time
-
External APIs
- Gmail API quota usage
- OpenAI API latency
- Error rate by provider
# Queue depth alert
celery_queue_length{queue="default"} > 100
# Gmail quota usage
gmail_api_quota_used / gmail_api_quota_limit > 0.8
# Worker starvation
rate(celery_worker_heartbeat[5m]) == 0
View structured logs:
docker-compose logs -f api
docker-compose logs -f celery-workerQuery Loki via Grafana Explore:
{service="email-assistant"} |= "error" | json
Interactive API docs available at http://localhost:8000/docs
Receives Gmail Pub/Sub notifications.
Request:
{
"message": {
"data": "base64_encoded_payload",
"messageId": "123456"
}
}Response: 204 No Content
Health check endpoint.
Response:
{
"status": "healthy",
"database": "connected",
"redis": "connected",
"celery_workers": 3
}Prometheus metrics endpoint.
ai-email-assistant/
βββ docker-compose.yml # Service orchestration
βββ Dockerfile # Application container
βββ alembic.ini # Database migration config
βββ pyproject.toml # Python dependencies
βββ .env.example # Environment template
β
βββ monitoring/ # Observability stack
β βββ prometheus/
β β βββ prometheus.yml # Scrape configuration
β βββ grafana/
β β βββ dashboards/ # Pre-built dashboards
β β βββ datasources/ # Data source configs
β βββ loki/
β β βββ loki-config.yml
β βββ promtail/
β β βββ promtail-config.yml
β βββ alertmanager/
β βββ alertmanager.yml
β
βββ scripts/
β βββ oauth_flow.py # OAuth token generation
β βββ setup_watch.py # Gmail watch initialization
β
βββ src/
βββ main.py # FastAPI application
βββ config.py # Settings management
βββ dependencies.py # Dependency injection
β
βββ alembic/ # Database migrations
β βββ versions/
β βββ 9617af0fa031_initial_schema.py
β βββ [future migrations]
β
βββ api/ # HTTP layer
β βββ router.py # Route registration
β βββ webhook.py # Pub/Sub webhook handler
β
βββ models/ # SQLAlchemy ORM
β βββ base.py # Base model class
β βββ email.py # Email metadata
β βββ task.py # Processing tasks
β βββ response.py # AI responses
β βββ failed_task.py # Dead-letter queue
β
βββ schemas/ # Pydantic schemas
β βββ webhook.py # Pub/Sub payload
β βββ ai.py # AI request/response
β
βββ services/ # Business logic
β βββ email_service.py # Email CRUD operations
β βββ ai_service.py # LLM integration
β βββ webhook_service.py # Webhook processing
β βββ watch_service.py # Gmail watch management
β βββ worker_service.py # Celery task orchestration
β βββ storage_service.py # File storage abstraction
β
βββ utils/ # Shared utilities
β βββ gmail.py # Gmail API client
β βββ limiter.py # Rate limiting
β βββ logging.py # Structured logging
β βββ metrics.py # Prometheus metrics
β βββ pubsub.py # Pub/Sub utilities
β βββ sanitizer.py # HTML sanitization
β
βββ workers/ # Celery tasks
βββ celery_app.py # Celery configuration
βββ tasks.py # Task definitions
βββ callbacks.py # Task failure handlers
# Install pre-commit hooks
pre-commit install
# Run linting
ruff check src/
ruff format src/
# Type checking
mypy src/Create new migration:
alembic revision --autogenerate -m "description"Apply migrations:
alembic upgrade headRollback:
alembic downgrade -1-
Define task in
src/workers/tasks.py:@celery_app.task(bind=True, autoretry_for=(Exception,)) def my_new_task(self, email_id: str): # Task logic pass
-
Register in pipeline in
src/services/worker_service.py -
Add tests in
tests/workers/test_my_task.py
# Run all tests
pytest
# With coverage
pytest --cov=src --cov-report=html
# Specific test file
pytest tests/test_ai_service.py
# Integration tests
pytest tests/integration/ -vCoverage Target: >80% (enforced in CI)
- Environment variables properly configured (not committed to git)
-
ENVIRONMENT=productionset - Rate limiting enabled
- CORS restricted to production domains
- PostgreSQL automated backups configured
- Redis persistence (AOF) enabled
- Monitoring alerts configured
- Gmail watch renewal cron job scheduled
- TLS certificates for webhook endpoint
- Horizontal scaling tested (multiple workers)
docker build -t ai-email-assistant:latest .
docker-compose -f docker-compose.yml -f docker-compose.prod.yml up -dGoogle Cloud Run (recommended for webhook):
gcloud run deploy email-assistant \
--image gcr.io/[PROJECT_ID]/email-assistant \
--platform managed \
--region us-central1 \
--allow-unauthenticated \
--set-env-vars DATABASE_URL=[CLOUD_SQL_URL]AWS ECS/Fargate:
[INSERT AWS-SPECIFIC DEPLOYMENT INSTRUCTIONS]
1. Webhook Not Receiving Events
Check Pub/Sub subscription status:
gcloud pubsub subscriptions describe gmail-push-subscriptionVerify webhook endpoint is publicly accessible and returns 200.
2. OAuth Token Expired
Re-run OAuth flow:
python scripts/oauth_flow.pyCheck token.json has valid refresh_token.
3. Celery Workers Not Picking Up Tasks
Check Redis connectivity:
redis-cli -h localhost pingView worker logs:
docker-compose logs celery-workerVerify queue:
celery -A src.workers.celery_app inspect active4. Gmail API Quota Exceeded
Check quota in Google Cloud Console.
Implement exponential backoff (already included in gmail.py).
5. Database Connection Pool Exhausted
Increase pool size in config.py:
SQLALCHEMY_POOL_SIZE = 20
SQLALCHEMY_MAX_OVERFLOW = 10Enable verbose logging:
export LOG_LEVEL=DEBUG
docker-compose restart api celery-worker# API health
curl http://localhost:8000/api/health
# Celery workers
celery -A src.workers.celery_app inspect ping
# Database
psql $DATABASE_URL -c "SELECT 1"Contributions welcome! Please follow these steps:
- Fork the repository
- Create feature branch (
git checkout -b feature/amazing-feature) - Commit changes (
git commit -m 'Add amazing feature') - Push to branch (
git push origin feature/amazing-feature) - Open Pull Request
Code Standards:
- Use Ruff for linting and formatting
- Add tests for new features (pytest)
- Update documentation for user-facing changes
- Follow semantic commit messages
[INSERT LICENSE TYPE] β See LICENSE file for details.
- FastAPI team for excellent async framework
- Celery maintainers
- OpenAI and Anthropic for LLM APIs
- Google for Gmail API and Pub/Sub
- Issues: GitHub Issues
- Discussions: GitHub Discussions
- Email: [INSERT SUPPORT EMAIL]
Completed:
- β Core email classification pipeline
- β AI response generation
- β Gmail draft creation
- β Monitoring stack
In Progress:
- π§ Multi-user support
- π§ Web UI dashboard
Planned:
- Fine-tuned classification model
- Slack/Teams integration
- Email templates library
- A/B testing for prompts
- Analytics dashboard
Built with β€οΈ using FastAPI, Celery, and LLMs