diff --git a/AI_ASSIST.md b/AI_ASSIST.md index 9b31b94..878c87a 100644 --- a/AI_ASSIST.md +++ b/AI_ASSIST.md @@ -10,6 +10,8 @@ TODO: paste your prompt here. +() File "C:\Users\Bader\Desktop\w6\c55-data-week-6\src\pipeline.py", line 167, in run row_count = write_to_postgres(records, config["postgres_url"]) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "C:\Users\Bader\Desktop\w6\c55-data-week-6\src\pipeline.py", line 136, in write_to_postgres cur.execute(CREATE_WEATHER_READINGS_SQL) psycopg2.errors.SyntaxError: syntax error at or near "(" LINE 7: UNIQUE(station, timestamp) + ## The code or suggestion it returned @@ -18,8 +20,77 @@ TODO: paste your prompt here. TODO: paste the AI output here. ``` +The error is occurring when PostgreSQL parses your CREATE TABLE statement: + +psycopg2.errors.SyntaxError: syntax error at or near "(" +LINE 7: UNIQUE(station, timestamp) + +This usually means the line before UNIQUE(...) is missing a comma. + +For example, this is invalid: + +CREATE TABLE weather_readings ( + station TEXT NOT NULL, + timestamp TIMESTAMP NOT NULL, + temperature NUMERIC NOT NULL + UNIQUE(station, timestamp) +); + +Notice the missing comma after temperature NUMERIC NOT NULL. + +It should be: + +CREATE TABLE weather_readings ( + station TEXT NOT NULL, + timestamp TIMESTAMP NOT NULL, + temperature NUMERIC NOT NULL, + UNIQUE(station, timestamp) +); + +Other common causes: + +1. Missing comma before a table constraint + +❌ + +humidity NUMERIC +UNIQUE(station, timestamp) + +✅ + +humidity NUMERIC, +UNIQUE(station, timestamp) +2. Using a reserved keyword + +timestamp is a PostgreSQL type name and can sometimes cause confusion as a column name. + +Safer: + +recorded_at TIMESTAMP NOT NULL + +or quote it: + +"timestamp" TIMESTAMP NOT NULL +3. Trailing syntax issue earlier in the statement + +For example: + +❌ + +temperature NUMERIC NOT NULL, +humidity NUMERIC, +, +UNIQUE(station, timestamp) + +The parser may only complain when it reaches UNIQUE. + +Please paste the full value of CREATE_WEATHER_READINGS_SQL (the entire SQL string around line 136), and I can point to the exact syntax error. + ## What I changed after reviewing it TODO: describe your review here. + + +found it immediatly it was an extra comma. fixed and moved on \ No newline at end of file diff --git a/Dockerfile b/Dockerfile index ee3c978..3d48dc6 100644 --- a/Dockerfile +++ b/Dockerfile @@ -16,6 +16,8 @@ WORKDIR /app # TODO Task 4: install dependencies with pip # TODO Task 4: copy the src/ folder - +COPY requirements.txt . +RUN pip install -r requirements.txt +COPY src/ ./src/ # TODO Task 4: set the CMD to run the pipeline (python -m src.pipeline) -CMD ["python", "-c", "raise SystemExit('Dockerfile not finished: Task 4 still pending')"] +CMD ["python", "-m", "src.pipeline"] diff --git a/README.md b/README.md index fa94600..b0c8765 100644 --- a/README.md +++ b/README.md @@ -108,3 +108,32 @@ repo (`Data Track/Week 6/week_6__8_assignment.md`). The auto-grader checks code shape, not live Azure deployment, because the GitHub Actions runner has no Azure credentials. To rebuild from a fresh scaffold, follow `.agents/workflows/build_assignment_repo.md` in the curriculum repo. + +## Verification + +run using python -m src.pipeline +verify using: + +cur = conn.cursor() + +# Count rows +cur.execute("SELECT COUNT(*) FROM weather_readings") +count = cur.fetchone()[0] +print(f"Total rows: {count}") + +# Sample recent rows +cur.execute(""" + SELECT station, timestamp, temperature_c + FROM weather_readings + ORDER BY ingested_at DESC + LIMIT 2 +""") +for row in cur.fetchall(): + print(row) + +cur.close() + +OR using databeaver/bash using postgres link +Blob can be found in azure + +expect 2 rows written to postgres \ No newline at end of file diff --git a/docs/execution_history.png b/docs/execution_history.png new file mode 100644 index 0000000..c59da93 Binary files /dev/null and b/docs/execution_history.png differ diff --git a/docs/execution_history.txt b/docs/execution_history.txt new file mode 100644 index 0000000..01e1f45 --- /dev/null +++ b/docs/execution_history.txt @@ -0,0 +1,3 @@ +Name StartTime Status +---------------------------- ------------------------- --------- +noneeeed-weather-job-17t4zca 2026-06-10T14:55:41+00:00 Succeeded diff --git a/requirements.txt b/requirements.txt index b5d18d7..6254d1c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,8 +6,5 @@ # The pipeline needs the Azure Blob SDK and a Postgres driver. Add them below # with explicit pins. -# TODO: pin azure-storage-blob (uncomment and add a version) -# azure-storage-blob== - -# TODO: pin psycopg2-binary (uncomment and add a version) -# psycopg2-binary== +azure-storage-blob==12.29.0 +psycopg2-binary==2.9.10 diff --git a/src/pipeline.py b/src/pipeline.py index ef1fc8a..2f76c21 100644 --- a/src/pipeline.py +++ b/src/pipeline.py @@ -5,7 +5,6 @@ Database for PostgreSQL. When you finish the assignment it will run as a Container App Job triggered from the Azure Portal or the CLI. -Replace every `raise NotImplementedError` below with a real implementation. Reference chapters: - Blob upload: Data Track/Week 6/week_6__3_azure_blob_storage.md @@ -13,12 +12,30 @@ - Container Job: Data Track/Week 6/week_6__5_container_apps_jobs.md """ +import json import logging import os from datetime import date +import psycopg2 +from azure.storage.blob import BlobServiceClient +from contextlib import closing + logging.basicConfig(level=logging.INFO, format="%(levelname)s %(message)s") logger = logging.getLogger(__name__) +logging.getLogger("azure").setLevel(logging.WARNING) + +CREATE_WEATHER_READINGS_SQL = """ + CREATE TABLE IF NOT EXISTS weather_readings ( + id SERIAL PRIMARY KEY, + station TEXT NOT NULL, + timestamp TIMESTAMPTZ NOT NULL, + temperature_c DOUBLE PRECISION NOT NULL, + humidity_pct INTEGER NOT NULL, + ingested_at TIMESTAMPTZ DEFAULT NOW(), + UNIQUE(station, timestamp) + ) + """ # TASK 3 hint: quiet the Azure SDK so its DEBUG output does not drown your own # pipeline logs. The right call lives in Chapter 5 (Viewing logs). @@ -35,11 +52,27 @@ def get_config() -> dict: - SOURCE_NAME: logical source label, default "weather". - LOG_LEVEL: not parsed here; the orchestrator sets it via env var. - Raise RuntimeError with a clear message if a required variable is missing. """ - raise NotImplementedError( - "Task 3: read POSTGRES_URL and AZURE_STORAGE_CONNECTION_STRING from os.environ" - ) + POSTGRES_URL = os.environ.get("POSTGRES_URL") + AZURE_STORAGE_CONNECTION_STRING = os.environ.get("AZURE_STORAGE_CONNECTION_STRING") + SOURCE_NAME = os.environ.get("SOURCE_NAME", "weather") + + missing = [] + if not POSTGRES_URL: + missing.append("POSTGRES_URL") + if not AZURE_STORAGE_CONNECTION_STRING: + missing.append("AZURE_STORAGE_CONNECTION_STRING") + if not SOURCE_NAME: + missing.append("SOURCE_NAME") + if missing: + raise RuntimeError( + f"Error: missing environment variable(s): {', '.join(missing)}" + ) + return { + "postgres_url": POSTGRES_URL, + "azure_storage_connection_string": AZURE_STORAGE_CONNECTION_STRING, + "source_name": SOURCE_NAME, + } def fetch_records() -> list[dict]: @@ -47,9 +80,22 @@ def fetch_records() -> list[dict]: In a real pipeline you would call an API here. Return a list of at least one dict with a stable key set (for example: station, timestamp, - temperature_c, humidity_pct). + temperature_c). """ - raise NotImplementedError("Task 3: return a list of at least one record") + return [ + { + "station": "STATION_1", + "timestamp": "2024-06-01T12:00:00Z", + "temperature_c": 25.0, + "humidity_pct": 60, + }, + { + "station": "STATION_2", + "timestamp": "2024-06-01T12:05:00Z", + "temperature_c": 26.5, + "humidity_pct": 65, + }, + ] def upload_raw_to_blob(records: list[dict], blob_conn_str: str, source: str) -> str: @@ -62,7 +108,12 @@ def upload_raw_to_blob(records: list[dict], blob_conn_str: str, source: str) -> teacher has pre-created it). Overwrite if the blob already exists so same-day reruns succeed. """ - raise NotImplementedError("Task 1 + Task 3: upload records to blob storage") + service = BlobServiceClient.from_connection_string(blob_conn_str) + container = service.get_container_client("raw") + data = json.dumps(records) + blob_name = f"{source}/{date.today().isoformat()}.json" + container.upload_blob(name=blob_name, data=data, overwrite=True) + return blob_name def write_to_postgres(records: list[dict], postgres_url: str) -> int: @@ -78,7 +129,30 @@ def write_to_postgres(records: list[dict], postgres_url: str) -> int: See Chapter 4 for the connection-and-cursor pattern this is based on. """ - raise NotImplementedError("Task 2 + Task 3: insert rows into Azure Postgres") + with closing(psycopg2.connect(postgres_url)) as conn: + with conn.cursor() as cur: + cur.execute("CREATE SCHEMA IF NOT EXISTS dev_bader;") + cur.execute("SET search_path TO dev_bader;") + cur.execute(CREATE_WEATHER_READINGS_SQL) + for r in records: + cur.execute( + """ + INSERT INTO weather_readings (station, timestamp, temperature_c, humidity_pct) + VALUES (%s, %s, %s, %s) + ON CONFLICT (station, timestamp) DO UPDATE SET + temperature_c = EXCLUDED.temperature_c, + humidity_pct = EXCLUDED.humidity_pct, + ingested_at = NOW() + """, + ( + r["station"], + r["timestamp"], + r["temperature_c"], + r["humidity_pct"], + ), + ) + conn.commit() + return len(records) def run() -> None: