Skip to content

PyDevDeep/G-Mind

Repository files navigation

AI Email Assistant πŸ€–πŸ“§

Intelligent Gmail automation with AI-powered email classification, response generation, and draft creation using FastAPI, Celery, and LLM integration.

Python FastAPI License Build Status

πŸš€ Overview

AI Email Assistant is a production-grade email automation system that leverages Large Language Models to intelligently process incoming Gmail messages. The system automatically classifies emails, generates contextually relevant AI-powered responses, and creates draft replies β€” all while maintaining high reliability through distributed task processing and comprehensive monitoring.

Key Capabilities:

  • Real-time email processing via Google Cloud Pub/Sub webhooks
  • AI-powered email classification (needs_reply, spam, informational)
  • Context-aware response generation using OpenAI GPT-4 or Anthropic Claude
  • Automatic draft creation in Gmail
  • Distributed task processing with Celery workers
  • Production-ready observability stack (Prometheus, Grafana, Loki)

πŸ“‹ Table of Contents

πŸ— Architecture

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚   Gmail     │─────▢│  Pub/Sub     │─────▢│   FastAPI   β”‚
β”‚   Inbox     β”‚      β”‚  Webhook     β”‚      β”‚   Webhook   β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜
                                                   β”‚
                                                   β–Ό
                                           β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
                                           β”‚  PostgreSQL   β”‚
                                           β”‚  (Task Queue) β”‚
                                           β””β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜
                                                   β”‚
                          β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
                          β”‚                        β”‚                         β”‚
                          β–Ό                        β–Ό                         β–Ό
                    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”           β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”            β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
                    β”‚  Celery  β”‚           β”‚  Celery  β”‚            β”‚  Celery  β”‚
                    β”‚  Worker  β”‚           β”‚  Worker  β”‚            β”‚  Worker  β”‚
                    β”‚ (classify)β”‚           β”‚ (generate)β”‚           β”‚  (send)  β”‚
                    β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”˜           β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”˜            β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”˜
                         β”‚                      β”‚                       β”‚
                         β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                                        β–Ό
                              β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
                              β”‚   OpenAI API    β”‚
                              β”‚  Claude API     β”‚
                              β”‚   Gmail API     β”‚
                              β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Processing Pipeline:

  1. Webhook Reception: Gmail sends notification via Pub/Sub to FastAPI endpoint
  2. Task Creation: FastAPI validates webhook, stores email metadata in PostgreSQL
  3. Classification: Celery worker fetches email content, sends to LLM for categorization
  4. Response Generation: If email needs reply, separate worker generates AI response
  5. Draft Creation: Final worker creates Gmail draft using generated content
  6. Monitoring: All steps tracked via Prometheus metrics, logs aggregated in Loki

πŸ›  Tech Stack

Backend & API:

  • FastAPI 0.104+ β€” Async web framework
  • Python 3.11+ β€” Core language
  • Uvicorn β€” ASGI server

Database & Storage:

  • PostgreSQL 15+ β€” Primary database with SQLAlchemy 2.0 ORM
  • Alembic β€” Database migrations
  • Redis 7+ β€” Message broker and result backend (AOF persistence)

Task Processing:

  • Celery 5.3+ β€” Distributed task queue
  • Retry logic with exponential backoff
  • Dead-letter queue for failed tasks

AI & External APIs:

Observability:

Infrastructure:

  • Docker & Docker Compose β€” Containerization
  • Pre-commit hooks with Ruff β€” Code quality

✨ Features

Core Functionality

  • βœ… Real-time Email Processing β€” Webhook-triggered immediate processing
  • βœ… AI Classification β€” Automatic categorization (needs_reply, informational, spam)
  • βœ… Context-Aware Responses β€” LLM analyzes email content and generates replies
  • βœ… Gmail Draft Creation β€” Automatic draft creation in Gmail UI
  • βœ… Distributed Workers β€” Celery-based horizontal scaling

Reliability & Error Handling

  • βœ… Exponential Backoff β€” Automatic retry on API failures (5 attempts)
  • βœ… Dead-Letter Queue β€” Failed tasks stored in failed_tasks table
  • βœ… Deduplication β€” Prevents duplicate processing via email_id tracking
  • βœ… Gmail Watch Renewal β€” Scheduled job maintains Pub/Sub subscription (every 6 days)
  • βœ… Circuit Breaker β€” Fallback to Claude on OpenAI outage

Monitoring & Observability

  • βœ… Custom Metrics β€” Task duration, API latency, queue depth tracking
  • βœ… Structured Logs β€” JSON logs with correlation IDs across services
  • βœ… Pre-built Dashboards β€” Grafana dashboards for system health
  • βœ… Alerting β€” Configurable alerts for quota exhaustion, worker starvation

Security

  • βœ… OAuth 2.0 Integration β€” Secure Gmail authentication
  • βœ… Rate Limiting β€” Protection against webhook abuse
  • βœ… Input Sanitization β€” HTML stripping with BeautifulSoup
  • βœ… Pub/Sub Token Verification β€” OIDC token validation

πŸ“¦ Prerequisites

  • Python 3.11 or higher
  • Docker 20.10+ and Docker Compose 2.0+
  • Gmail Account with API access enabled
  • Google Cloud Project with Pub/Sub API enabled
  • OpenAI API Key or Anthropic API Key

Optional for production:

  • PostgreSQL 15+ (if not using Docker)
  • Redis 7+ (if not using Docker)

πŸ”§ Installation

1. Clone Repository

git clone https://github.com/[YOUR_USERNAME]/ai-email-assistant.git
cd ai-email-assistant

2. Gmail API Setup

  1. Go to Google Cloud Console
  2. Create new project or select existing
  3. Enable Gmail API and Cloud Pub/Sub API
  4. Create OAuth 2.0 credentials:
    • Application type: Desktop app
    • Download credentials.json β†’ place in project root
  5. Set up Pub/Sub topic and subscription:
    gcloud pubsub topics create gmail-notifications
    gcloud pubsub subscriptions create gmail-push-subscription \
      --topic=gmail-notifications \
      --push-endpoint=https://[YOUR_DOMAIN]/api/webhook/gmail

3. Configure OAuth Flow

python scripts/oauth_flow.py

This generates token.json with refresh token. Follow browser prompts to authorize.

4. Environment Configuration

cp .env.example .env

Edit .env:

# Database
DATABASE_URL=postgresql+asyncpg://postgres:password@postgres:5432/email_assistant

# Redis
REDIS_URL=redis://redis:6379/0

# Gmail API
GMAIL_CLIENT_ID=your-client-id.apps.googleusercontent.com
GMAIL_CLIENT_SECRET=your-client-secret
GMAIL_USER_EMAIL=your-email@gmail.com

# LLM Providers
OPENAI_API_KEY=sk-...
ANTHROPIC_API_KEY=sk-ant-...  # Optional fallback
USE_ANTHROPIC_FALLBACK=true

# Google Cloud
PUBSUB_PROJECT_ID=your-gcp-project-id
PUBSUB_TOPIC_NAME=gmail-notifications
PUBSUB_SUBSCRIPTION_NAME=gmail-push-subscription

# Application
LOG_LEVEL=INFO
ENVIRONMENT=development

5. Start Services

docker-compose up --build

Services will be available at:

6. Run Migrations

docker-compose exec api alembic upgrade head

7. Set Up Gmail Watch

docker-compose exec api python scripts/setup_watch.py

βš™οΈ Configuration

Environment Variables

Variable Description Default Required
DATABASE_URL PostgreSQL connection string β€” βœ…
REDIS_URL Redis connection string β€” βœ…
GMAIL_CLIENT_ID OAuth 2.0 client ID β€” βœ…
GMAIL_CLIENT_SECRET OAuth 2.0 client secret β€” βœ…
GMAIL_USER_EMAIL Target Gmail account β€” βœ…
OPENAI_API_KEY OpenAI API key β€” βœ…
ANTHROPIC_API_KEY Anthropic API key (fallback) β€” ❌
USE_ANTHROPIC_FALLBACK Enable Claude fallback false ❌
PUBSUB_PROJECT_ID GCP project ID β€” βœ…
PUBSUB_TOPIC_NAME Pub/Sub topic gmail-notifications βœ…
LOG_LEVEL Logging verbosity INFO ❌
CELERY_CONCURRENCY Worker concurrency 4 ❌

LLM Configuration

Edit src/services/ai_service.py to customize:

CLASSIFICATION_PROMPT = """
Analyze the email and categorize it:
- needs_reply: Requires response
- informational: FYI only
- spam: Unsolicited content
"""

REPLY_GENERATION_PROMPT = """
Generate a professional response to:
Subject: {subject}
From: {sender}
Content: {body}
"""

Rate Limiting

Modify src/utils/limiter.py:

gmail_limiter = RateLimiter(
    max_requests=50,  # Max requests per window
    window_seconds=60  # Time window in seconds
)

πŸš€ Usage

Manual Email Processing

Trigger processing for specific email:

curl -X POST http://localhost:8000/api/process-email \
  -H "Content-Type: application/json" \
  -d '{"email_id": "18f3a2b1c5d4e6f7"}'

View Processing Status

curl http://localhost:8000/api/task-status/{task_id}

Response:

{
  "task_id": "abc-123",
  "email_id": "18f3a2b1c5d4e6f7",
  "status": "completed",
  "stage": "draft_created",
  "created_at": "2026-04-16T10:30:00Z",
  "completed_at": "2026-04-16T10:30:15Z",
  "ai_response": {
    "category": "needs_reply",
    "confidence": 0.95,
    "generated_reply": "Thank you for your inquiry..."
  }
}

Failed Tasks

Query dead-letter queue:

curl http://localhost:8000/api/failed-tasks

πŸ“Š Monitoring

Grafana Dashboards

Access pre-configured dashboards at http://localhost:3000:

  1. System Overview

    • API request rate and latency (p50, p95, p99)
    • Celery queue depth
    • Worker health status
  2. Email Pipeline

    • Emails processed per hour
    • Classification distribution
    • AI response time
  3. External APIs

    • Gmail API quota usage
    • OpenAI API latency
    • Error rate by provider

Key Metrics

# Queue depth alert
celery_queue_length{queue="default"} > 100

# Gmail quota usage
gmail_api_quota_used / gmail_api_quota_limit > 0.8

# Worker starvation
rate(celery_worker_heartbeat[5m]) == 0

Logs

View structured logs:

docker-compose logs -f api
docker-compose logs -f celery-worker

Query Loki via Grafana Explore:

{service="email-assistant"} |= "error" | json

πŸ“š API Documentation

Interactive API docs available at http://localhost:8000/docs

Core Endpoints

POST /api/webhook/gmail

Receives Gmail Pub/Sub notifications.

Request:

{
  "message": {
    "data": "base64_encoded_payload",
    "messageId": "123456"
  }
}

Response: 204 No Content

GET /api/health

Health check endpoint.

Response:

{
  "status": "healthy",
  "database": "connected",
  "redis": "connected",
  "celery_workers": 3
}

GET /api/metrics

Prometheus metrics endpoint.

πŸ“ Project Structure

ai-email-assistant/
β”œβ”€β”€ docker-compose.yml          # Service orchestration
β”œβ”€β”€ Dockerfile                  # Application container
β”œβ”€β”€ alembic.ini                 # Database migration config
β”œβ”€β”€ pyproject.toml              # Python dependencies
β”œβ”€β”€ .env.example                # Environment template
β”‚
β”œβ”€β”€ monitoring/                 # Observability stack
β”‚   β”œβ”€β”€ prometheus/
β”‚   β”‚   └── prometheus.yml      # Scrape configuration
β”‚   β”œβ”€β”€ grafana/
β”‚   β”‚   β”œβ”€β”€ dashboards/         # Pre-built dashboards
β”‚   β”‚   └── datasources/        # Data source configs
β”‚   β”œβ”€β”€ loki/
β”‚   β”‚   └── loki-config.yml
β”‚   β”œβ”€β”€ promtail/
β”‚   β”‚   └── promtail-config.yml
β”‚   └── alertmanager/
β”‚       └── alertmanager.yml
β”‚
β”œβ”€β”€ scripts/
β”‚   β”œβ”€β”€ oauth_flow.py           # OAuth token generation
β”‚   └── setup_watch.py          # Gmail watch initialization
β”‚
└── src/
    β”œβ”€β”€ main.py                 # FastAPI application
    β”œβ”€β”€ config.py               # Settings management
    β”œβ”€β”€ dependencies.py         # Dependency injection
    β”‚
    β”œβ”€β”€ alembic/                # Database migrations
    β”‚   └── versions/
    β”‚       β”œβ”€β”€ 9617af0fa031_initial_schema.py
    β”‚       └── [future migrations]
    β”‚
    β”œβ”€β”€ api/                    # HTTP layer
    β”‚   β”œβ”€β”€ router.py           # Route registration
    β”‚   └── webhook.py          # Pub/Sub webhook handler
    β”‚
    β”œβ”€β”€ models/                 # SQLAlchemy ORM
    β”‚   β”œβ”€β”€ base.py             # Base model class
    β”‚   β”œβ”€β”€ email.py            # Email metadata
    β”‚   β”œβ”€β”€ task.py             # Processing tasks
    β”‚   β”œβ”€β”€ response.py         # AI responses
    β”‚   └── failed_task.py      # Dead-letter queue
    β”‚
    β”œβ”€β”€ schemas/                # Pydantic schemas
    β”‚   β”œβ”€β”€ webhook.py          # Pub/Sub payload
    β”‚   └── ai.py               # AI request/response
    β”‚
    β”œβ”€β”€ services/               # Business logic
    β”‚   β”œβ”€β”€ email_service.py    # Email CRUD operations
    β”‚   β”œβ”€β”€ ai_service.py       # LLM integration
    β”‚   β”œβ”€β”€ webhook_service.py  # Webhook processing
    β”‚   β”œβ”€β”€ watch_service.py    # Gmail watch management
    β”‚   β”œβ”€β”€ worker_service.py   # Celery task orchestration
    β”‚   └── storage_service.py  # File storage abstraction
    β”‚
    β”œβ”€β”€ utils/                  # Shared utilities
    β”‚   β”œβ”€β”€ gmail.py            # Gmail API client
    β”‚   β”œβ”€β”€ limiter.py          # Rate limiting
    β”‚   β”œβ”€β”€ logging.py          # Structured logging
    β”‚   β”œβ”€β”€ metrics.py          # Prometheus metrics
    β”‚   β”œβ”€β”€ pubsub.py           # Pub/Sub utilities
    β”‚   └── sanitizer.py        # HTML sanitization
    β”‚
    └── workers/                # Celery tasks
        β”œβ”€β”€ celery_app.py       # Celery configuration
        β”œβ”€β”€ tasks.py            # Task definitions
        └── callbacks.py        # Task failure handlers

πŸ”¨ Development

Setup Development Environment

# Install pre-commit hooks
pre-commit install

# Run linting
ruff check src/
ruff format src/

# Type checking
mypy src/

Database Migrations

Create new migration:

alembic revision --autogenerate -m "description"

Apply migrations:

alembic upgrade head

Rollback:

alembic downgrade -1

Adding New Tasks

  1. Define task in src/workers/tasks.py:

    @celery_app.task(bind=True, autoretry_for=(Exception,))
    def my_new_task(self, email_id: str):
        # Task logic
        pass
  2. Register in pipeline in src/services/worker_service.py

  3. Add tests in tests/workers/test_my_task.py

πŸ§ͺ Testing

# Run all tests
pytest

# With coverage
pytest --cov=src --cov-report=html

# Specific test file
pytest tests/test_ai_service.py

# Integration tests
pytest tests/integration/ -v

Coverage Target: >80% (enforced in CI)

🚒 Deployment

Production Checklist

  • Environment variables properly configured (not committed to git)
  • ENVIRONMENT=production set
  • Rate limiting enabled
  • CORS restricted to production domains
  • PostgreSQL automated backups configured
  • Redis persistence (AOF) enabled
  • Monitoring alerts configured
  • Gmail watch renewal cron job scheduled
  • TLS certificates for webhook endpoint
  • Horizontal scaling tested (multiple workers)

Docker Production Build

docker build -t ai-email-assistant:latest .
docker-compose -f docker-compose.yml -f docker-compose.prod.yml up -d

Cloud Deployment

Google Cloud Run (recommended for webhook):

gcloud run deploy email-assistant \
  --image gcr.io/[PROJECT_ID]/email-assistant \
  --platform managed \
  --region us-central1 \
  --allow-unauthenticated \
  --set-env-vars DATABASE_URL=[CLOUD_SQL_URL]

AWS ECS/Fargate:

[INSERT AWS-SPECIFIC DEPLOYMENT INSTRUCTIONS]

πŸ› Troubleshooting

Common Issues

1. Webhook Not Receiving Events

Check Pub/Sub subscription status:

gcloud pubsub subscriptions describe gmail-push-subscription

Verify webhook endpoint is publicly accessible and returns 200.

2. OAuth Token Expired

Re-run OAuth flow:

python scripts/oauth_flow.py

Check token.json has valid refresh_token.

3. Celery Workers Not Picking Up Tasks

Check Redis connectivity:

redis-cli -h localhost ping

View worker logs:

docker-compose logs celery-worker

Verify queue:

celery -A src.workers.celery_app inspect active

4. Gmail API Quota Exceeded

Check quota in Google Cloud Console.

Implement exponential backoff (already included in gmail.py).

5. Database Connection Pool Exhausted

Increase pool size in config.py:

SQLALCHEMY_POOL_SIZE = 20
SQLALCHEMY_MAX_OVERFLOW = 10

Debug Mode

Enable verbose logging:

export LOG_LEVEL=DEBUG
docker-compose restart api celery-worker

Health Checks

# API health
curl http://localhost:8000/api/health

# Celery workers
celery -A src.workers.celery_app inspect ping

# Database
psql $DATABASE_URL -c "SELECT 1"

🀝 Contributing

Contributions welcome! Please follow these steps:

  1. Fork the repository
  2. Create feature branch (git checkout -b feature/amazing-feature)
  3. Commit changes (git commit -m 'Add amazing feature')
  4. Push to branch (git push origin feature/amazing-feature)
  5. Open Pull Request

Code Standards:

  • Use Ruff for linting and formatting
  • Add tests for new features (pytest)
  • Update documentation for user-facing changes
  • Follow semantic commit messages

πŸ“„ License

[INSERT LICENSE TYPE] β€” See LICENSE file for details.

πŸ™ Acknowledgments

  • FastAPI team for excellent async framework
  • Celery maintainers
  • OpenAI and Anthropic for LLM APIs
  • Google for Gmail API and Pub/Sub

πŸ“ž Support

πŸ—Ί Roadmap

Completed:

  • βœ… Core email classification pipeline
  • βœ… AI response generation
  • βœ… Gmail draft creation
  • βœ… Monitoring stack

In Progress:

  • 🚧 Multi-user support
  • 🚧 Web UI dashboard

Planned:

  • Fine-tuned classification model
  • Slack/Teams integration
  • Email templates library
  • A/B testing for prompts
  • Analytics dashboard

Built with ❀️ using FastAPI, Celery, and LLMs

About

AI-powered Gmail assistant built with FastAPI, Celery, and Claude/OpenAI. Automatically classifies incoming emails, generates smart drafts using thread context, and manages processing tasks via Redis. Features a robust event-driven architecture with comprehensive monitoring (Prometheus/Grafana)

Resources

Contributing

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages