diff --git a/README.md b/README.md index ed61994..eac66c7 100644 --- a/README.md +++ b/README.md @@ -53,7 +53,8 @@ FastAPI auto-generates interactive docs: #### Notable Endpoints - `GET /health` — Health check -- `GET/POST /prompt` — LLM prompt completion (formerly `/llm`) +- `GET /prompt` or `GET /prompts` — Prompt table metadata (`record_count`, `columns`) +- `POST /prompt` — LLM prompt completion (formerly `/llm`) - `GET/POST /resend` — Send email via Resend API (see implementation in `app/utils/notify/resend.py`) - `GET /prospects` — Paginated prospects - `POST /prospects/process` — Bulk CSV ingestion diff --git a/app/api/prompt/__init__.py b/app/api/prompt/__init__.py index 76637df..b35910c 100644 --- a/app/api/prompt/__init__.py +++ b/app/api/prompt/__init__.py @@ -1,6 +1,5 @@ """Prompt Routes""" - from .prompt import router as prompt_router from .linkedin import router as linkedin_router -from .drop import router as drop_router +from .empty import router as empty_router diff --git a/app/api/prompt/drop.py b/app/api/prompt/empty.py similarity index 68% rename from app/api/prompt/drop.py rename to app/api/prompt/empty.py index f016e33..a3a698c 100644 --- a/app/api/prompt/drop.py +++ b/app/api/prompt/empty.py @@ -1,6 +1,4 @@ -import os - -from fastapi import APIRouter, Depends, HTTPException +from fastapi import APIRouter, Depends from app.utils.api_key_auth import get_api_key from app.utils.db import get_db_connection_direct @@ -8,10 +6,10 @@ router = APIRouter() -# PATCH /prompt/drop: empties the prompt table -@router.patch("/prompt/drop") -def drop_prompt_table(api_key: str = Depends(get_api_key)) -> dict: - """PATCH /prompt/drop: empties the prompt table.""" +# PATCH /prompt/empty: empties the prompt table +@router.patch("/prompt/empty") +def empty_prompt_table(api_key: str = Depends(get_api_key)) -> dict: + """PATCH /prompt/empty: empties the prompt table.""" try: conn = get_db_connection_direct() cur = conn.cursor() diff --git a/app/api/prompt/prompt.py b/app/api/prompt/prompt.py index 90a8117..9e5f7a4 100644 --- a/app/api/prompt/prompt.py +++ b/app/api/prompt/prompt.py @@ -1,5 +1,5 @@ import os -from fastapi import APIRouter, HTTPException, Query, Request, Depends +from fastapi import APIRouter, HTTPException, Depends from app.utils.make_meta import make_meta from app.utils.db import get_db_connection_direct from app.utils.api_key_auth import get_api_key @@ -7,49 +7,32 @@ router = APIRouter() @router.get("/prompt") -def get_prompt_records( - request: Request, - page: int = Query(1, ge=1, description="Page number (1-based)"), - page_size: int = Query(10, ge=1, le=100, description="Records per page"), - api_key: str = Depends(get_api_key) -) -> dict: - """GET /prompt: Paginated list of prompt completions.""" +@router.get("/prompts") +def get_prompt_table_metadata(api_key: str = Depends(get_api_key)) -> dict: + """GET /prompt: Return prompt table metadata.""" try: conn = get_db_connection_direct() cur = conn.cursor() - offset = (page - 1) * page_size cur.execute("SELECT COUNT(*) FROM prompt;") count_row = cur.fetchone() - total = count_row[0] if count_row and count_row[0] is not None else 0 - cur.execute(""" - SELECT id, prompt, completion, duration, time, data, model - FROM prompt - ORDER BY id DESC - LIMIT %s OFFSET %s; - """, (page_size, offset)) - records = [ - { - "id": row[0], - "prompt": row[1], - "completion": row[2], - "duration": row[3], - "time": row[4].isoformat() if row[4] else None, - "data": row[5], - "model": row[6], - } - for row in cur.fetchall() - ] + record_count = count_row[0] if count_row and count_row[0] is not None else 0 + cur.execute( + """ + SELECT column_name + FROM information_schema.columns + WHERE table_schema = 'public' AND table_name = 'prompt' + ORDER BY ordinal_position; + """ + ) + columns = [row[0] for row in cur.fetchall()] cur.close() conn.close() - meta = make_meta("success", f"Prompt {len(records)} records (page {page})") + meta = make_meta("success", "Prompt table metadata") return { "meta": meta, "data": { - "page": page, - "page_size": page_size, - "total": total, - "pages": (total + page_size - 1) // page_size, - "data": records, + "record_count": record_count, + "columns": columns, }, } except Exception as e: diff --git a/app/api/queue/__init__.py b/app/api/queue/__init__.py index 9705e10..57782d6 100644 --- a/app/api/queue/__init__.py +++ b/app/api/queue/__init__.py @@ -6,7 +6,6 @@ from .routes.empty import router as empty_router from .routes.get import router as get_router -from .routes.next import router as next_router from .routes.create import router as create_router from .routes.delete import router as delete_router @@ -19,7 +18,6 @@ router.include_router(drop_router) router.include_router(empty_router) router.include_router(get_router) -router.include_router(next_router) router.include_router(create_router) router.include_router(delete_router) router.include_router(linkedin_import_router.router) diff --git a/app/api/queue/csv/apollo.py b/app/api/queue/csv/apollo.py index 65c7349..39e9069 100644 --- a/app/api/queue/csv/apollo.py +++ b/app/api/queue/csv/apollo.py @@ -10,20 +10,61 @@ @router.post("/queue/csv/apollo") def import_apollo_csv() -> dict: """POST /queue/csv/apollo: Import data from apollo.csv into the queue table (template).""" - csv_path = os.path.join(os.path.dirname(__file__), "../csv/apollo/seed.csv") + csv_path = os.path.join(os.path.dirname(__file__), "../csv/apollo/sample.csv") if not os.path.exists(csv_path): - raise HTTPException(status_code=404, detail="seed.csv not found") + raise HTTPException(status_code=404, detail="sample.csv not found") try: conn = get_db_connection_direct() cursor = conn.cursor() - # TODO: Implement CSV parsing and DB insertion logic for Apollo format - # Example placeholder for batch import logic: - # with open(csv_path, newline='', encoding='utf-8') as csvfile: - # reader = csv.DictReader(csvfile) - # for row in reader: - # pass # Process each row + with open(csv_path, newline='', encoding='utf-8') as csvfile: + reader = csv.DictReader(csvfile) + now = int(time.time()) + batch = [] + batch_size = 500 + imported_count = 0 + for row in reader: + if not any(row.values()): + continue + batch.append([ + now, # updated + False, # hidden + now, # created + row.get('Email', ''), # email + row.get('Company Name', ''), # company + row.get('Title', ''), # job + row.get('Person Linkedin Url', ''), # linkedin + row.get('First Name', ''), # first_name + row.get('Last Name', ''), # last_name + row.get('Seniority', None), # seniority + row.get('Sub Departments', None), # department + row.get('Corporate Phone', None), # phone + row.get('Country', None), # country + None, # connected (not present, set None) + 'apollo', # collection + 'magento' # group + ]) + imported_count += 1 + if len(batch) >= batch_size: + cursor.executemany( + '''INSERT INTO queue ( + updated, hidden, created, email, company, job, linkedin, first_name, last_name, seniority, department, phone, country, connected, collection, "group" + ) VALUES ( + %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s + )''', + batch + ) + batch = [] + if batch: + cursor.executemany( + '''INSERT INTO queue ( + updated, hidden, created, email, company, job, linkedin, first_name, last_name, seniority, department, phone, country, connected, collection, "group" + ) VALUES ( + %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s + )''', + batch + ) conn.commit() conn.close() - return {"meta": make_meta("success", "Apollo CSV import template executed"), "imported": 0} + return {"meta": make_meta("success", f"Apollo CSV imported: {imported_count} records imported"), "imported": imported_count} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) diff --git a/app/api/queue/csv/apollo/seed.csv b/app/api/queue/csv/apollo/sample.csv similarity index 100% rename from app/api/queue/csv/apollo/seed.csv rename to app/api/queue/csv/apollo/sample.csv diff --git a/app/api/queue/csv/linkedin.py b/app/api/queue/csv/linkedin.py index 77759ed..5d1c43a 100644 --- a/app/api/queue/csv/linkedin.py +++ b/app/api/queue/csv/linkedin.py @@ -10,14 +10,13 @@ @router.post("/queue/csv/linkedin") def import_linkedin_csv() -> dict: """POST /queue/csv/linkedin: Import data from linkedin.csv into the queue table, robust for large files.""" - csv_path = os.path.join(os.path.dirname(__file__), "../csv/linkedin/linkedin.csv") + csv_path = os.path.join(os.path.dirname(__file__), "../csv/linkedin/linkedin_sample.csv") if not os.path.exists(csv_path): raise HTTPException(status_code=404, detail="linkedin.csv not found") try: conn = get_db_connection_direct() cursor = conn.cursor() with open(csv_path, newline='', encoding='utf-8') as csvfile: - # Find the header line dynamically header_line = None pre_data_lines = [] while True: @@ -25,14 +24,17 @@ def import_linkedin_csv() -> dict: line = csvfile.readline() if not line: break - if line.strip().startswith("First Name,Last Name,URL,Email Address,Company,Position,Connected On"): + if line.strip().startswith("First Name,Last Name,URL,Email Address,Company,Job,Connected On") or \ + line.strip().startswith("First Name,Last Name,URL,Email Address,Company,Position,Connected On"): header_line = line.strip() break pre_data_lines.append(line) if not header_line: raise HTTPException(status_code=400, detail="CSV header not found.") - # Use DictReader with the found header + fieldnames = header_line.split(",") + # Map 'Position' to 'Job' for backward compatibility + fieldnames = [fn if fn != 'Position' else 'Job' for fn in fieldnames] reader = csv.DictReader(csvfile, fieldnames=fieldnames) now = int(time.time()) batch = [] @@ -40,37 +42,37 @@ def import_linkedin_csv() -> dict: first_row = None imported_count = 0 for row in reader: - # Skip any rows that are just blank or not data + if not any(row.values()): continue if first_row is None: first_row = row.copy() print("DEBUG: First parsed row from CSV:", first_row) batch.append([ - row.get('First Name'), # first_name - row.get('Last Name'), # last_name - row.get('URL'), # linkedin - row.get('Email Address'), # email - row.get('Company'), # company - row.get('Position'), # position - row.get('Connected On'), # connected_on - now, # created - now, # updated - False, # hidden - 'prospects', # collection - 'linkedin' # group - ]) + now, # updated + False, # hidden + now, # created + row.get('Email Address'), # email + row.get('Company'), # company + row.get('Job') or row.get('Position'),# job (support both) + row.get('Connected On'), # connected + 'prospects', # collection + 'linkedin', # group + row.get('First Name'), # first_name + row.get('Last Name'), # last_name + row.get('URL') # linkedin + ]) imported_count += 1 if len(batch) >= batch_size: cursor.executemany( - '''INSERT INTO queue (first_name, last_name, linkedin, email, company, position, connected_on, created, updated, hidden, collection, "group") + '''INSERT INTO queue (updated, hidden, created, email, company, job, connected, collection, "group", first_name, last_name, linkedin) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)''', batch ) batch = [] if batch: cursor.executemany( - '''INSERT INTO queue (first_name, last_name, linkedin, email, company, position, connected_on, created, updated, hidden, collection, "group") + '''INSERT INTO queue (updated, hidden, created, email, company, job, connected, collection, "group", first_name, last_name, linkedin) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)''', batch ) diff --git a/app/api/queue/routes/get.py b/app/api/queue/routes/get.py index 9ad55c5..5f08743 100644 --- a/app/api/queue/routes/get.py +++ b/app/api/queue/routes/get.py @@ -57,9 +57,13 @@ def read_queue( "data": { "total": total_count, "filtered": filtered_count, - "collections": collections, - "groups": groups, - "next": next_record + "filters": { + "collectionFilter": collection, + "groupFilter": group, + "collections": collections, + "groups": groups, + }, + "next": next_record, } } except Exception as e: diff --git a/app/api/queue/routes/next.py b/app/api/queue/routes/next.py deleted file mode 100644 index add3078..0000000 --- a/app/api/queue/routes/next.py +++ /dev/null @@ -1,78 +0,0 @@ -import os -from fastapi import APIRouter, HTTPException, Query -from app.utils.make_meta import make_meta -from app.utils.db import get_db_connection_direct - -router = APIRouter() - - -# Route: /queue/next?collection=prospects&group=linkedin -@router.get("/queue/next") -def get_next_queue( - collection: str = Query(None, description="Filter by collection name"), - group: str = Query(None, description="Filter by group name") -) -> dict: - """Return the next queue record filtered by collection/group, ordered by latest updated.""" - try: - conn = get_db_connection_direct() - cursor = conn.cursor() - - # Build query with optional filters - base_query = "SELECT * FROM queue" - filters = [] - params = [] - if collection: - filters.append("collection = %s") - params.append(collection) - if group: - filters.append('"group" = %s') - params.append(group) - where_clause = (" WHERE " + " AND ".join(filters)) if filters else "" - - # 1. Get the next record - query = base_query + where_clause + " ORDER BY updated DESC LIMIT 1;" - cursor.execute(query, params) - row = cursor.fetchone() - columns = [desc[0] for desc in cursor.description] if cursor.description else [] - record = dict(zip(columns, row)) if row and columns else None - - # 2. Get count of records matching filters - count_query = "SELECT COUNT(*) FROM queue" + where_clause + ";" - cursor.execute(count_query, params) - filtered_row = cursor.fetchone() - filtered_count = filtered_row[0] if filtered_row else 0 - - # 3. Get total count - cursor.execute("SELECT COUNT(*) FROM queue;") - total_row = cursor.fetchone() - total_count = total_row[0] if total_row else 0 - - # 4. Get table schema - cursor.execute("SELECT column_name, data_type, is_nullable FROM information_schema.columns WHERE table_name = 'queue';") - schema = [ - {"name": row[0], "type": row[1], "nullable": row[2]} for row in cursor.fetchall() - ] - - conn.close() - - # Build a dynamic title with filters - filter_labels = [] - if collection: - filter_labels.append(f"collection='{collection}'") - if group: - filter_labels.append(f"group='{group}'") - filter_str = f" (filtered by {', '.join(filter_labels)})" if filter_labels else "" - title = f"Next queue record found{filter_str}" if record else "No queue record to show" - - return { - "meta": make_meta("success" if record else "info", title), - "data": { - "record": record, - "filtered_count": filtered_count, - "total_count": total_count, - "schema": schema, - "message": None if record else "Nothing to show for the given filters." - } - } - except Exception as e: - raise HTTPException(status_code=500, detail=str(e)) diff --git a/app/api/routes.py b/app/api/routes.py index b10e210..e933002 100644 --- a/app/api/routes.py +++ b/app/api/routes.py @@ -10,7 +10,7 @@ from app.utils.notify.resend import router as resend_router from app.api.prompt.prompt import router as prompt_router from app.api.prompt.linkedin import router as linkedin_router -from app.api.prompt.drop import router as drop_router +from app.api.prompt.empty import router as prompts_empty_router from app.api.prospects.prospects import router as prospects_router from app.api.orders.orders import router as orders_router from app.api.queue import router as queue_router @@ -20,7 +20,7 @@ router.include_router(health_router) router.include_router(prompt_router) router.include_router(linkedin_router) -router.include_router(drop_router) +router.include_router(prompts_empty_router) router.include_router(prospects_router) router.include_router(orders_router) router.include_router(queue_router) \ No newline at end of file diff --git a/tests/test_queue.py b/tests/test_queue.py index 574efe8..4f2cd33 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -15,7 +15,9 @@ def test_get_queue(): assert "in_queue" in queue_data assert "collections" in queue_data assert "groups" in queue_data - assert "example" in queue_data + assert "filtered" in queue_data + assert "total" in queue_data + assert "next" in queue_data meta = data["meta"] assert meta["severity"] == "success" assert meta["title"] == "Queue table info"