Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion app/api/queue/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,19 @@
from .routes.get import router as get_router

from .routes.create import router as create_router
from .routes.delete import router as delete_router

from .routes.import_linkedin import router as import_linkedin_router

from .routes.alter import router as alter_router
from .routes.rename_column import router as rename_router

router = APIRouter()
router.include_router(drop_router)
router.include_router(empty_router)
router.include_router(get_router)
router.include_router(create_router)
router.include_router(delete_router)
router.include_router(import_linkedin_router)
router.include_router(alter_router)
router.include_router(alter_router)
router.include_router(rename_router)
24 changes: 24 additions & 0 deletions app/api/queue/routes/delete.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import os
from fastapi import APIRouter, HTTPException
from app.utils.make_meta import make_meta
from app.utils.db import get_db_connection_direct

router = APIRouter()

@router.delete("/queue/delete")
def delete_queue_record(id: int) -> dict:
"""DELETE /queue/delete: Delete a record from the queue table by id."""
try:
conn = get_db_connection_direct()
cursor = conn.cursor()
cursor.execute("DELETE FROM queue WHERE id = %s RETURNING id;", (id,))
deleted = cursor.fetchone()
conn.commit()
conn.close()
if deleted:
return {"meta": make_meta("success", f"Record with id {id} deleted.")}
else:
return {"meta": make_meta("error", f"No record found with id {id}.")}
except Exception as e:
msg = str(e)
return {"meta": make_meta("error", msg)}
22 changes: 15 additions & 7 deletions app/api/queue/routes/get.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,28 @@ def read_queue() -> dict:
for row in cursor.fetchall()
]

# 3. Get most recently updated record
cursor.execute("SELECT * FROM queue ORDER BY updated DESC LIMIT 1;")
# 3. Get a random record
cursor.execute("SELECT * FROM queue ORDER BY RANDOM() LIMIT 1;")
columns = [desc[0] for desc in cursor.description] if cursor.description else []
row = cursor.fetchone()
most_recent = dict(zip(columns, row)) if row and columns else None
rows = cursor.fetchall()
random_record = [dict(zip(columns, row)) for row in rows] if rows and columns else []

# 4. Get unique values from collection and group columns
cursor.execute("SELECT DISTINCT collection FROM queue WHERE collection IS NOT NULL;")
collections = [row[0] for row in cursor.fetchall()]
cursor.execute('SELECT DISTINCT "group" FROM queue WHERE "group" IS NOT NULL;')
groups = [row[0] for row in cursor.fetchall()]

conn.close()

return {
"meta": make_meta("success", "Queue table info"),
"data": {
"queued": record_count,
"most_recent": most_recent,
# "schema": schema
"in_queue": record_count,
"collections": collections,
"groups": groups,
"example": random_record,
# "queue_schema": schema
}
}
except Exception as e:
Expand Down
81 changes: 58 additions & 23 deletions app/api/queue/routes/import_linkedin.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,38 +9,73 @@

@router.post("/queue/import/linkedin")
def import_linkedin_csv() -> dict:
"""POST /queue/import/linkedin: Import data from linkedin_sample.csv into the queue table."""
csv_path = os.path.join(os.path.dirname(__file__), "../csv/linkedin/linkedin_sample.csv")
"""POST /queue/import/linkedin: Import data from linkedin.csv into the queue table, robust for large files."""
csv_path = os.path.join(os.path.dirname(__file__), "../csv/linkedin/linkedin.csv")
if not os.path.exists(csv_path):
raise HTTPException(status_code=404, detail="linkedin_sample.csv not found")
raise HTTPException(status_code=404, detail="linkedin.csv not found")
try:
conn = get_db_connection_direct()
cursor = conn.cursor()
with open(csv_path, newline='', encoding='utf-8') as csvfile:
reader = csv.DictReader(row for row in csvfile if not row.startswith('Notes:'))
# Find the header line dynamically
header_line = None
pre_data_lines = []
while True:
pos = csvfile.tell()
line = csvfile.readline()
if not line:
break
if line.strip().startswith("First Name,Last Name,URL,Email Address,Company,Position,Connected On"):
header_line = line.strip()
break
pre_data_lines.append(line)
if not header_line:
raise HTTPException(status_code=400, detail="CSV header not found.")
# Use DictReader with the found header
fieldnames = header_line.split(",")
reader = csv.DictReader(csvfile, fieldnames=fieldnames)
now = int(time.time())
batch = []
batch_size = 500
first_row = None
imported_count = 0
for row in reader:
cursor.execute(
"""
INSERT INTO queue (first_name, last_name, url, email_address, company, position, connected_on, created, updated, hidden, collection)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
""",
[
row.get('First Name'),
row.get('Last Name'),
row.get('URL'),
row.get('Email Address'),
row.get('Company'),
row.get('Position'),
row.get('Connected On'),
now,
now,
False,
'prospects'
]
# Skip any rows that are just blank or not data
if not any(row.values()):
continue
if first_row is None:
first_row = row.copy()
print("DEBUG: First parsed row from CSV:", first_row)
batch.append([
row.get('First Name'), # first_name
row.get('Last Name'), # last_name
row.get('URL'), # linkedin
row.get('Email Address'), # email
row.get('Company'), # company
row.get('Position'), # position
row.get('Connected On'), # connected_on
now, # created
now, # updated
False, # hidden
'prospects', # collection
'linkedin' # group
])
imported_count += 1
if len(batch) >= batch_size:
cursor.executemany(
'''INSERT INTO queue (first_name, last_name, linkedin, email, company, position, connected_on, created, updated, hidden, collection, "group")
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)''',
batch
)
batch = []
if batch:
cursor.executemany(
'''INSERT INTO queue (first_name, last_name, linkedin, email, company, position, connected_on, created, updated, hidden, collection, "group")
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)''',
batch
)
conn.commit()
conn.close()
return {"meta": make_meta("success", "LinkedIn CSV imported")}
return {"meta": make_meta("success", f"LinkedIn CSV imported (batched): {imported_count} records imported"), "imported": imported_count}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
28 changes: 28 additions & 0 deletions app/api/queue/routes/rename_column.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from fastapi import APIRouter, HTTPException, Body
from app.utils.make_meta import make_meta
from app.utils.db import get_db_connection_direct

router = APIRouter()

@router.post("/queue/alter/rename_column")
def rename_column(
old_name: str = Body(..., embed=True),
new_name: str = Body(..., embed=True),
column_type: str = Body(..., embed=True)
) -> dict:
"""POST /queue/alter/rename_column: Rename a column in the queue table."""
try:
conn = get_db_connection_direct()
cursor = conn.cursor()
sql = f'ALTER TABLE queue RENAME COLUMN "{old_name}" TO "{new_name}";'
cursor.execute(sql)
conn.commit()
conn.close()
return {"meta": make_meta("success", f"Column '{old_name}' renamed to '{new_name}'")}
except Exception as e:
msg = str(e)
if 'does not exist' in msg:
raise HTTPException(status_code=400, detail=f"Column '{old_name}' does not exist in queue table.")
if 'already exists' in msg:
raise HTTPException(status_code=400, detail=f"Column '{new_name}' already exists in queue table.")
raise HTTPException(status_code=500, detail=msg)
12 changes: 11 additions & 1 deletion app/main.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
from app import __version__
from fastapi import FastAPI
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
from fastapi.exceptions import RequestValidationError
from app.utils.make_meta import make_meta
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse
Expand All @@ -26,6 +29,13 @@
allow_headers=["*"]
)


# Global validation error handler for make_meta pattern
@app.exception_handler(RequestValidationError)
async def validation_exception_handler(request: Request, exc: RequestValidationError):
msg = exc.errors()[0]['msg'] if exc.errors() else str(exc)
return JSONResponse(status_code=422, content={"meta": make_meta("error", msg)})

app.include_router(router)

# Mount static directory
Expand Down
11 changes: 11 additions & 0 deletions queue_output.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
id,first_name,last_name,url,email_address,company,position,connected_on,created,updated,hidden,collection,group
22,,,,,,,,1776513092,1776513092,False,prospects,linkedin
23,,,,,,,,1776513092,1776513092,False,prospects,linkedin
24,,,,,,,,1776513092,1776513092,False,prospects,linkedin
25,,,,,,,,1776513092,1776513092,False,prospects,linkedin
26,,,,,,,,1776513092,1776513092,False,prospects,linkedin
27,,,,,,,,1776513092,1776513092,False,prospects,linkedin
28,,,,,,,,1776513092,1776513092,False,prospects,linkedin
29,,,,,,,,1776513092,1776513092,False,prospects,linkedin
30,,,,,,,,1776513092,1776513092,False,prospects,linkedin
21,,,,,,,,1776513092,1776513092,False,prospects,linkedin
Loading