From 42edc2ccca3ad406f4024a0f9451a99ec3ee7db6 Mon Sep 17 00:00:00 2001 From: Wei Zang Date: Sat, 18 Apr 2026 15:05:14 +0100 Subject: [PATCH 1/3] Batch LinkedIn import and add rename endpoint Improve queue API: make LinkedIn CSV import robust and batched, add column-rename endpoints, and return more recent records. Changes include: switch import to linkedin.csv with dynamic header detection, skip blank rows, batch inserts (500) with an added 'group' value and imported count returned; update get route to return 10 most-recent records instead of one; add new rename/rename_column route handlers to ALTER TABLE RENAME COLUMN with error handling; wire up the rename router in queue __init__.py and add a sample queue_output.txt. These updates improve import reliability and provide a programmatic column-rename API. --- app/api/queue/__init__.py | 5 +- app/api/queue/routes/get.py | 14 ++--- app/api/queue/routes/import_linkedin.py | 79 ++++++++++++++++++------- app/api/queue/routes/rename.py | 56 ++++++++++++++++++ app/api/queue/routes/rename_column.py | 28 +++++++++ queue_output.txt | 11 ++++ 6 files changed, 163 insertions(+), 30 deletions(-) create mode 100644 app/api/queue/routes/rename.py create mode 100644 app/api/queue/routes/rename_column.py create mode 100644 queue_output.txt diff --git a/app/api/queue/__init__.py b/app/api/queue/__init__.py index 420be2c..92c379a 100644 --- a/app/api/queue/__init__.py +++ b/app/api/queue/__init__.py @@ -9,7 +9,9 @@ from .routes.create import router as create_router from .routes.import_linkedin import router as import_linkedin_router + from .routes.alter import router as alter_router +from .routes.rename_column import router as rename_router router = APIRouter() router.include_router(drop_router) @@ -17,4 +19,5 @@ router.include_router(get_router) router.include_router(create_router) router.include_router(import_linkedin_router) -router.include_router(alter_router) \ No newline at end of file +router.include_router(alter_router) +router.include_router(rename_router) \ No newline at end of file diff --git a/app/api/queue/routes/get.py b/app/api/queue/routes/get.py index d838a0e..2e1f0cb 100644 --- a/app/api/queue/routes/get.py +++ b/app/api/queue/routes/get.py @@ -27,20 +27,20 @@ def read_queue() -> dict: for row in cursor.fetchall() ] - # 3. Get most recently updated record - cursor.execute("SELECT * FROM queue ORDER BY updated DESC LIMIT 1;") + # 3. Get the 10 most recently updated records + cursor.execute("SELECT * FROM queue ORDER BY updated DESC LIMIT 10;") columns = [desc[0] for desc in cursor.description] if cursor.description else [] - row = cursor.fetchone() - most_recent = dict(zip(columns, row)) if row and columns else None + rows = cursor.fetchall() + most_recent = [dict(zip(columns, row)) for row in rows] if rows and columns else [] conn.close() return { "meta": make_meta("success", "Queue table info"), "data": { - "queued": record_count, - "most_recent": most_recent, - # "schema": schema + "queued": record_count, + "most_recent": most_recent, + # "schema": schema } } except Exception as e: diff --git a/app/api/queue/routes/import_linkedin.py b/app/api/queue/routes/import_linkedin.py index a8c97b9..f8a5830 100644 --- a/app/api/queue/routes/import_linkedin.py +++ b/app/api/queue/routes/import_linkedin.py @@ -9,38 +9,73 @@ @router.post("/queue/import/linkedin") def import_linkedin_csv() -> dict: - """POST /queue/import/linkedin: Import data from linkedin_sample.csv into the queue table.""" + """POST /queue/import/linkedin: Import data from linkedin.csv into the queue table, robust for large files.""" csv_path = os.path.join(os.path.dirname(__file__), "../csv/linkedin/linkedin_sample.csv") if not os.path.exists(csv_path): - raise HTTPException(status_code=404, detail="linkedin_sample.csv not found") + raise HTTPException(status_code=404, detail="linkedin.csv not found") try: conn = get_db_connection_direct() cursor = conn.cursor() with open(csv_path, newline='', encoding='utf-8') as csvfile: - reader = csv.DictReader(row for row in csvfile if not row.startswith('Notes:')) + # Find the header line dynamically + header_line = None + pre_data_lines = [] + while True: + pos = csvfile.tell() + line = csvfile.readline() + if not line: + break + if line.strip().startswith("First Name,Last Name,URL,Email Address,Company,Position,Connected On"): + header_line = line.strip() + break + pre_data_lines.append(line) + if not header_line: + raise HTTPException(status_code=400, detail="CSV header not found.") + # Use DictReader with the found header + fieldnames = header_line.split(",") + reader = csv.DictReader(csvfile, fieldnames=fieldnames) now = int(time.time()) + batch = [] + batch_size = 500 + first_row = None + imported_count = 0 for row in reader: - cursor.execute( - """ - INSERT INTO queue (first_name, last_name, url, email_address, company, position, connected_on, created, updated, hidden, collection) - VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) - """, - [ - row.get('First Name'), - row.get('Last Name'), - row.get('URL'), - row.get('Email Address'), - row.get('Company'), - row.get('Position'), - row.get('Connected On'), - now, - now, - False, - 'prospects' - ] + # Skip any rows that are just blank or not data + if not any(row.values()): + continue + if first_row is None: + first_row = row.copy() + print("DEBUG: First parsed row from CSV:", first_row) + batch.append([ + row.get('First Name'), + row.get('Last Name'), + row.get('URL'), + row.get('Email Address'), + row.get('Company'), + row.get('Position'), + row.get('Connected On'), + now, + now, + False, + 'prospects', + 'linkedin' + ]) + imported_count += 1 + if len(batch) >= batch_size: + cursor.executemany( + '''INSERT INTO queue (first_name, last_name, url, email_address, company, position, connected_on, created, updated, hidden, collection, "group") + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)''', + batch + ) + batch = [] + if batch: + cursor.executemany( + '''INSERT INTO queue (first_name, last_name, url, email_address, company, position, connected_on, created, updated, hidden, collection, "group") + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)''', + batch ) conn.commit() conn.close() - return {"meta": make_meta("success", "LinkedIn CSV imported")} + return {"meta": make_meta("success", f"LinkedIn CSV imported (batched): {imported_count} records imported"), "imported": imported_count} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) diff --git a/app/api/queue/routes/rename.py b/app/api/queue/routes/rename.py new file mode 100644 index 0000000..baf339d --- /dev/null +++ b/app/api/queue/routes/rename.py @@ -0,0 +1,56 @@ +from fastapi import APIRouter, HTTPException, Body +from app.utils.make_meta import make_meta +from app.utils.db import get_db_connection_direct + +router = APIRouter() + +@router.post("/queue/alter/rename_column") +def rename_column( + old_name: str = Body(..., embed=True), + new_name: str = Body(..., embed=True), + column_type: str = Body(..., embed=True) +) -> dict: + """POST /queue/alter/rename-column: Rename a column in the queue table.""" + try: + conn = get_db_connection_direct() + cursor = conn.cursor() + sql = f'ALTER TABLE queue RENAME COLUMN "{old_name}" TO "{new_name}";' + cursor.execute(sql) + conn.commit() + conn.close() + return {"meta": make_meta("success", f"Column '{old_name}' renamed to '{new_name}'")} + except Exception as e: + msg = str(e) + if 'does not exist' in msg: + raise HTTPException(status_code=400, detail=f"Column '{old_name}' does not exist in queue table.") + if 'already exists' in msg: + raise HTTPException(status_code=400, detail=f"Column '{new_name}' already exists in queue table.") + raise HTTPException(status_code=500, detail=msg) +from fastapi import APIRouter, HTTPException, Body +from app.utils.make_meta import make_meta +from app.utils.db import get_db_connection_direct + +router = APIRouter() + +@router.post("/queue/alter/rename_column") +def rename_column( + old_name: str = Body(..., embed=True), + new_name: str = Body(..., embed=True), + column_type: str = Body(..., embed=True) +) -> dict: + """POST /queue/alter/rename-column: Rename a column in the queue table.""" + try: + conn = get_db_connection_direct() + cursor = conn.cursor() + sql = f'ALTER TABLE queue RENAME COLUMN "{old_name}" TO "{new_name}";' + cursor.execute(sql) + conn.commit() + conn.close() + return {"meta": make_meta("success", f"Column '{old_name}' renamed to '{new_name}'")} + except Exception as e: + msg = str(e) + if 'does not exist' in msg: + raise HTTPException(status_code=400, detail=f"Column '{old_name}' does not exist in queue table.") + if 'already exists' in msg: + raise HTTPException(status_code=400, detail=f"Column '{new_name}' already exists in queue table.") + raise HTTPException(status_code=500, detail=msg) diff --git a/app/api/queue/routes/rename_column.py b/app/api/queue/routes/rename_column.py new file mode 100644 index 0000000..f5ca376 --- /dev/null +++ b/app/api/queue/routes/rename_column.py @@ -0,0 +1,28 @@ +from fastapi import APIRouter, HTTPException, Body +from app.utils.make_meta import make_meta +from app.utils.db import get_db_connection_direct + +router = APIRouter() + +@router.post("/queue/alter/rename_column") +def rename_column( + old_name: str = Body(..., embed=True), + new_name: str = Body(..., embed=True), + column_type: str = Body(..., embed=True) +) -> dict: + """POST /queue/alter/rename_column: Rename a column in the queue table.""" + try: + conn = get_db_connection_direct() + cursor = conn.cursor() + sql = f'ALTER TABLE queue RENAME COLUMN "{old_name}" TO "{new_name}";' + cursor.execute(sql) + conn.commit() + conn.close() + return {"meta": make_meta("success", f"Column '{old_name}' renamed to '{new_name}'")} + except Exception as e: + msg = str(e) + if 'does not exist' in msg: + raise HTTPException(status_code=400, detail=f"Column '{old_name}' does not exist in queue table.") + if 'already exists' in msg: + raise HTTPException(status_code=400, detail=f"Column '{new_name}' already exists in queue table.") + raise HTTPException(status_code=500, detail=msg) diff --git a/queue_output.txt b/queue_output.txt new file mode 100644 index 0000000..224df1e --- /dev/null +++ b/queue_output.txt @@ -0,0 +1,11 @@ +id,first_name,last_name,url,email_address,company,position,connected_on,created,updated,hidden,collection,group +22,,,,,,,,1776513092,1776513092,False,prospects,linkedin +23,,,,,,,,1776513092,1776513092,False,prospects,linkedin +24,,,,,,,,1776513092,1776513092,False,prospects,linkedin +25,,,,,,,,1776513092,1776513092,False,prospects,linkedin +26,,,,,,,,1776513092,1776513092,False,prospects,linkedin +27,,,,,,,,1776513092,1776513092,False,prospects,linkedin +28,,,,,,,,1776513092,1776513092,False,prospects,linkedin +29,,,,,,,,1776513092,1776513092,False,prospects,linkedin +30,,,,,,,,1776513092,1776513092,False,prospects,linkedin +21,,,,,,,,1776513092,1776513092,False,prospects,linkedin From b5ee4da37061d29f36051ee928cd1321f32232f8 Mon Sep 17 00:00:00 2001 From: Wei Zang Date: Sat, 18 Apr 2026 15:25:47 +0100 Subject: [PATCH 2/3] Add collections/groups and update LinkedIn import Read queue: query DISTINCT collection and group values and include them in the response (collections, groups). Rename response keys for clarity (queued -> in_queue, most_recent -> example and return only the first example). Import LinkedIn CSV: normalize CSV-to-DB mapping (url -> linkedin, email_address -> email), update batch tuple order and INSERT column list to match the renamed columns. Delete rename.py: remove the /queue/alter/rename_column route (runtime column rename endpoint) for safety. These changes expose more queue metadata for the UI and standardize field names used by the LinkedIn importer. --- app/api/queue/routes/get.py | 14 +++++-- app/api/queue/routes/import_linkedin.py | 28 ++++++------- app/api/queue/routes/rename.py | 56 ------------------------- 3 files changed, 25 insertions(+), 73 deletions(-) delete mode 100644 app/api/queue/routes/rename.py diff --git a/app/api/queue/routes/get.py b/app/api/queue/routes/get.py index 2e1f0cb..4fa6044 100644 --- a/app/api/queue/routes/get.py +++ b/app/api/queue/routes/get.py @@ -33,14 +33,22 @@ def read_queue() -> dict: rows = cursor.fetchall() most_recent = [dict(zip(columns, row)) for row in rows] if rows and columns else [] + # 4. Get unique values from collection and group columns + cursor.execute("SELECT DISTINCT collection FROM queue WHERE collection IS NOT NULL;") + collections = [row[0] for row in cursor.fetchall()] + cursor.execute('SELECT DISTINCT "group" FROM queue WHERE "group" IS NOT NULL;') + groups = [row[0] for row in cursor.fetchall()] + conn.close() return { "meta": make_meta("success", "Queue table info"), "data": { - "queued": record_count, - "most_recent": most_recent, - # "schema": schema + "in_queue": record_count, + "collections": collections, + "groups": groups, + "example": most_recent[:1], + # "queue_schema": schema } } except Exception as e: diff --git a/app/api/queue/routes/import_linkedin.py b/app/api/queue/routes/import_linkedin.py index f8a5830..3605bf2 100644 --- a/app/api/queue/routes/import_linkedin.py +++ b/app/api/queue/routes/import_linkedin.py @@ -47,30 +47,30 @@ def import_linkedin_csv() -> dict: first_row = row.copy() print("DEBUG: First parsed row from CSV:", first_row) batch.append([ - row.get('First Name'), - row.get('Last Name'), - row.get('URL'), - row.get('Email Address'), - row.get('Company'), - row.get('Position'), - row.get('Connected On'), - now, - now, - False, - 'prospects', - 'linkedin' + row.get('First Name'), # first_name + row.get('Last Name'), # last_name + row.get('URL'), # linkedin + row.get('Email Address'), # email + row.get('Company'), # company + row.get('Position'), # position + row.get('Connected On'), # connected_on + now, # created + now, # updated + False, # hidden + 'prospects', # collection + 'linkedin' # group ]) imported_count += 1 if len(batch) >= batch_size: cursor.executemany( - '''INSERT INTO queue (first_name, last_name, url, email_address, company, position, connected_on, created, updated, hidden, collection, "group") + '''INSERT INTO queue (first_name, last_name, linkedin, email, company, position, connected_on, created, updated, hidden, collection, "group") VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)''', batch ) batch = [] if batch: cursor.executemany( - '''INSERT INTO queue (first_name, last_name, url, email_address, company, position, connected_on, created, updated, hidden, collection, "group") + '''INSERT INTO queue (first_name, last_name, linkedin, email, company, position, connected_on, created, updated, hidden, collection, "group") VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)''', batch ) diff --git a/app/api/queue/routes/rename.py b/app/api/queue/routes/rename.py deleted file mode 100644 index baf339d..0000000 --- a/app/api/queue/routes/rename.py +++ /dev/null @@ -1,56 +0,0 @@ -from fastapi import APIRouter, HTTPException, Body -from app.utils.make_meta import make_meta -from app.utils.db import get_db_connection_direct - -router = APIRouter() - -@router.post("/queue/alter/rename_column") -def rename_column( - old_name: str = Body(..., embed=True), - new_name: str = Body(..., embed=True), - column_type: str = Body(..., embed=True) -) -> dict: - """POST /queue/alter/rename-column: Rename a column in the queue table.""" - try: - conn = get_db_connection_direct() - cursor = conn.cursor() - sql = f'ALTER TABLE queue RENAME COLUMN "{old_name}" TO "{new_name}";' - cursor.execute(sql) - conn.commit() - conn.close() - return {"meta": make_meta("success", f"Column '{old_name}' renamed to '{new_name}'")} - except Exception as e: - msg = str(e) - if 'does not exist' in msg: - raise HTTPException(status_code=400, detail=f"Column '{old_name}' does not exist in queue table.") - if 'already exists' in msg: - raise HTTPException(status_code=400, detail=f"Column '{new_name}' already exists in queue table.") - raise HTTPException(status_code=500, detail=msg) -from fastapi import APIRouter, HTTPException, Body -from app.utils.make_meta import make_meta -from app.utils.db import get_db_connection_direct - -router = APIRouter() - -@router.post("/queue/alter/rename_column") -def rename_column( - old_name: str = Body(..., embed=True), - new_name: str = Body(..., embed=True), - column_type: str = Body(..., embed=True) -) -> dict: - """POST /queue/alter/rename-column: Rename a column in the queue table.""" - try: - conn = get_db_connection_direct() - cursor = conn.cursor() - sql = f'ALTER TABLE queue RENAME COLUMN "{old_name}" TO "{new_name}";' - cursor.execute(sql) - conn.commit() - conn.close() - return {"meta": make_meta("success", f"Column '{old_name}' renamed to '{new_name}'")} - except Exception as e: - msg = str(e) - if 'does not exist' in msg: - raise HTTPException(status_code=400, detail=f"Column '{old_name}' does not exist in queue table.") - if 'already exists' in msg: - raise HTTPException(status_code=400, detail=f"Column '{new_name}' already exists in queue table.") - raise HTTPException(status_code=500, detail=msg) From 6d6d7cfb792a8be95cfec779470dd6be381a4742 Mon Sep 17 00:00:00 2001 From: Wei Zang Date: Sat, 18 Apr 2026 16:26:45 +0100 Subject: [PATCH 3/3] Add queue delete endpoint and improve APIs Add a new DELETE /queue/delete route to remove queue records by id and register it in the queue router. Change the GET queue summary to return a random example record (ORDER BY RANDOM() LIMIT 1) instead of the most recently updated records. Update the LinkedIn import to point to linkedin.csv instead of the sample file. Add a global RequestValidationError handler in app.main that returns errors using the existing make_meta format, and add required imports for that handler. --- app/api/queue/__init__.py | 2 ++ app/api/queue/routes/delete.py | 24 ++++++++++++++++++++++++ app/api/queue/routes/get.py | 8 ++++---- app/api/queue/routes/import_linkedin.py | 2 +- app/main.py | 12 +++++++++++- 5 files changed, 42 insertions(+), 6 deletions(-) create mode 100644 app/api/queue/routes/delete.py diff --git a/app/api/queue/__init__.py b/app/api/queue/__init__.py index 92c379a..fba278a 100644 --- a/app/api/queue/__init__.py +++ b/app/api/queue/__init__.py @@ -7,6 +7,7 @@ from .routes.get import router as get_router from .routes.create import router as create_router +from .routes.delete import router as delete_router from .routes.import_linkedin import router as import_linkedin_router @@ -18,6 +19,7 @@ router.include_router(empty_router) router.include_router(get_router) router.include_router(create_router) +router.include_router(delete_router) router.include_router(import_linkedin_router) router.include_router(alter_router) router.include_router(rename_router) \ No newline at end of file diff --git a/app/api/queue/routes/delete.py b/app/api/queue/routes/delete.py new file mode 100644 index 0000000..cd410f7 --- /dev/null +++ b/app/api/queue/routes/delete.py @@ -0,0 +1,24 @@ +import os +from fastapi import APIRouter, HTTPException +from app.utils.make_meta import make_meta +from app.utils.db import get_db_connection_direct + +router = APIRouter() + +@router.delete("/queue/delete") +def delete_queue_record(id: int) -> dict: + """DELETE /queue/delete: Delete a record from the queue table by id.""" + try: + conn = get_db_connection_direct() + cursor = conn.cursor() + cursor.execute("DELETE FROM queue WHERE id = %s RETURNING id;", (id,)) + deleted = cursor.fetchone() + conn.commit() + conn.close() + if deleted: + return {"meta": make_meta("success", f"Record with id {id} deleted.")} + else: + return {"meta": make_meta("error", f"No record found with id {id}.")} + except Exception as e: + msg = str(e) + return {"meta": make_meta("error", msg)} diff --git a/app/api/queue/routes/get.py b/app/api/queue/routes/get.py index 4fa6044..0399d71 100644 --- a/app/api/queue/routes/get.py +++ b/app/api/queue/routes/get.py @@ -27,11 +27,11 @@ def read_queue() -> dict: for row in cursor.fetchall() ] - # 3. Get the 10 most recently updated records - cursor.execute("SELECT * FROM queue ORDER BY updated DESC LIMIT 10;") + # 3. Get a random record + cursor.execute("SELECT * FROM queue ORDER BY RANDOM() LIMIT 1;") columns = [desc[0] for desc in cursor.description] if cursor.description else [] rows = cursor.fetchall() - most_recent = [dict(zip(columns, row)) for row in rows] if rows and columns else [] + random_record = [dict(zip(columns, row)) for row in rows] if rows and columns else [] # 4. Get unique values from collection and group columns cursor.execute("SELECT DISTINCT collection FROM queue WHERE collection IS NOT NULL;") @@ -47,7 +47,7 @@ def read_queue() -> dict: "in_queue": record_count, "collections": collections, "groups": groups, - "example": most_recent[:1], + "example": random_record, # "queue_schema": schema } } diff --git a/app/api/queue/routes/import_linkedin.py b/app/api/queue/routes/import_linkedin.py index 3605bf2..84365c0 100644 --- a/app/api/queue/routes/import_linkedin.py +++ b/app/api/queue/routes/import_linkedin.py @@ -10,7 +10,7 @@ @router.post("/queue/import/linkedin") def import_linkedin_csv() -> dict: """POST /queue/import/linkedin: Import data from linkedin.csv into the queue table, robust for large files.""" - csv_path = os.path.join(os.path.dirname(__file__), "../csv/linkedin/linkedin_sample.csv") + csv_path = os.path.join(os.path.dirname(__file__), "../csv/linkedin/linkedin.csv") if not os.path.exists(csv_path): raise HTTPException(status_code=404, detail="linkedin.csv not found") try: diff --git a/app/main.py b/app/main.py index cc824ec..98bfb2c 100644 --- a/app/main.py +++ b/app/main.py @@ -1,5 +1,8 @@ from app import __version__ -from fastapi import FastAPI +from fastapi import FastAPI, Request +from fastapi.responses import JSONResponse +from fastapi.exceptions import RequestValidationError +from app.utils.make_meta import make_meta from fastapi.middleware.cors import CORSMiddleware from fastapi.staticfiles import StaticFiles from fastapi.responses import FileResponse @@ -26,6 +29,13 @@ allow_headers=["*"] ) + +# Global validation error handler for make_meta pattern +@app.exception_handler(RequestValidationError) +async def validation_exception_handler(request: Request, exc: RequestValidationError): + msg = exc.errors()[0]['msg'] if exc.errors() else str(exc) + return JSONResponse(status_code=422, content={"meta": make_meta("error", msg)}) + app.include_router(router) # Mount static directory