Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 71 additions & 0 deletions AI_ASSIST.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

TODO: paste your prompt here.

() File "C:\Users\Bader\Desktop\w6\c55-data-week-6\src\pipeline.py", line 167, in run row_count = write_to_postgres(records, config["postgres_url"]) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "C:\Users\Bader\Desktop\w6\c55-data-week-6\src\pipeline.py", line 136, in write_to_postgres cur.execute(CREATE_WEATHER_READINGS_SQL) psycopg2.errors.SyntaxError: syntax error at or near "(" LINE 7: UNIQUE(station, timestamp)

## The code or suggestion it returned

<!-- Paste the suggestion verbatim — code, shell commands, or both. -->
Expand All @@ -18,8 +20,77 @@ TODO: paste your prompt here.
TODO: paste the AI output here.
```

The error is occurring when PostgreSQL parses your CREATE TABLE statement:

psycopg2.errors.SyntaxError: syntax error at or near "("
LINE 7: UNIQUE(station, timestamp)

This usually means the line before UNIQUE(...) is missing a comma.

For example, this is invalid:

CREATE TABLE weather_readings (
station TEXT NOT NULL,
timestamp TIMESTAMP NOT NULL,
temperature NUMERIC NOT NULL
UNIQUE(station, timestamp)
);

Notice the missing comma after temperature NUMERIC NOT NULL.

It should be:

CREATE TABLE weather_readings (
station TEXT NOT NULL,
timestamp TIMESTAMP NOT NULL,
temperature NUMERIC NOT NULL,
UNIQUE(station, timestamp)
);

Other common causes:

1. Missing comma before a table constraint


humidity NUMERIC
UNIQUE(station, timestamp)


humidity NUMERIC,
UNIQUE(station, timestamp)
2. Using a reserved keyword

timestamp is a PostgreSQL type name and can sometimes cause confusion as a column name.

Safer:

recorded_at TIMESTAMP NOT NULL

or quote it:

"timestamp" TIMESTAMP NOT NULL
3. Trailing syntax issue earlier in the statement

For example:


temperature NUMERIC NOT NULL,
humidity NUMERIC,
,
UNIQUE(station, timestamp)

The parser may only complain when it reaches UNIQUE.

Please paste the full value of CREATE_WEATHER_READINGS_SQL (the entire SQL string around line 136), and I can point to the exact syntax error.

## What I changed after reviewing it

<!-- Describe what you accepted, rejected, or modified, and why. -->

TODO: describe your review here.


found it immediatly it was an extra comma. fixed and moved on
6 changes: 4 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ WORKDIR /app
# TODO Task 4: install dependencies with pip

# TODO Task 4: copy the src/ folder

COPY requirements.txt .
RUN pip install -r requirements.txt
COPY src/ ./src/
# TODO Task 4: set the CMD to run the pipeline (python -m src.pipeline)
CMD ["python", "-c", "raise SystemExit('Dockerfile not finished: Task 4 still pending')"]
CMD ["python", "-m", "src.pipeline"]
29 changes: 29 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,3 +108,32 @@ repo (`Data Track/Week 6/week_6__8_assignment.md`). The auto-grader checks
code shape, not live Azure deployment, because the GitHub Actions runner has
no Azure credentials. To rebuild from a fresh scaffold, follow
`.agents/workflows/build_assignment_repo.md` in the curriculum repo.

## Verification

run using python -m src.pipeline
verify using:

cur = conn.cursor()

# Count rows
cur.execute("SELECT COUNT(*) FROM weather_readings")
count = cur.fetchone()[0]
print(f"Total rows: {count}")

# Sample recent rows
cur.execute("""
SELECT station, timestamp, temperature_c
FROM weather_readings
ORDER BY ingested_at DESC
LIMIT 2
""")
for row in cur.fetchall():
print(row)

cur.close()

OR using databeaver/bash using postgres link
Blob can be found in azure

expect 2 rows written to postgres
Binary file added docs/execution_history.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
3 changes: 3 additions & 0 deletions docs/execution_history.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Name StartTime Status
---------------------------- ------------------------- ---------
noneeeed-weather-job-17t4zca 2026-06-10T14:55:41+00:00 Succeeded
7 changes: 2 additions & 5 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,5 @@
# The pipeline needs the Azure Blob SDK and a Postgres driver. Add them below
# with explicit pins.

# TODO: pin azure-storage-blob (uncomment and add a version)
# azure-storage-blob==

# TODO: pin psycopg2-binary (uncomment and add a version)
# psycopg2-binary==
azure-storage-blob==12.29.0
psycopg2-binary==2.9.10
92 changes: 83 additions & 9 deletions src/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,37 @@
Database for PostgreSQL. When you finish the assignment it will run as a
Container App Job triggered from the Azure Portal or the CLI.

Replace every `raise NotImplementedError` below with a real implementation.

Reference chapters:
- Blob upload: Data Track/Week 6/week_6__3_azure_blob_storage.md
- Postgres connect: Data Track/Week 6/week_6__4_azure_postgresql.md
- Container Job: Data Track/Week 6/week_6__5_container_apps_jobs.md
"""

import json
import logging
import os
from datetime import date
import psycopg2
from azure.storage.blob import BlobServiceClient
from contextlib import closing


logging.basicConfig(level=logging.INFO, format="%(levelname)s %(message)s")
logger = logging.getLogger(__name__)
logging.getLogger("azure").setLevel(logging.WARNING)

CREATE_WEATHER_READINGS_SQL = """
CREATE TABLE IF NOT EXISTS weather_readings (
id SERIAL PRIMARY KEY,
station TEXT NOT NULL,
timestamp TIMESTAMPTZ NOT NULL,
temperature_c DOUBLE PRECISION NOT NULL,
humidity_pct INTEGER NOT NULL,
ingested_at TIMESTAMPTZ DEFAULT NOW(),
UNIQUE(station, timestamp)
)
"""

# TASK 3 hint: quiet the Azure SDK so its DEBUG output does not drown your own
# pipeline logs. The right call lives in Chapter 5 (Viewing logs).
Expand All @@ -35,21 +52,50 @@ def get_config() -> dict:
- SOURCE_NAME: logical source label, default "weather".
- LOG_LEVEL: not parsed here; the orchestrator sets it via env var.

Raise RuntimeError with a clear message if a required variable is missing.
"""
raise NotImplementedError(
"Task 3: read POSTGRES_URL and AZURE_STORAGE_CONNECTION_STRING from os.environ"
)
POSTGRES_URL = os.environ.get("POSTGRES_URL")
AZURE_STORAGE_CONNECTION_STRING = os.environ.get("AZURE_STORAGE_CONNECTION_STRING")
SOURCE_NAME = os.environ.get("SOURCE_NAME", "weather")

missing = []
if not POSTGRES_URL:
missing.append("POSTGRES_URL")
if not AZURE_STORAGE_CONNECTION_STRING:
missing.append("AZURE_STORAGE_CONNECTION_STRING")
if not SOURCE_NAME:

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SOURCE_NAME always has default "weather", so this never fires

missing.append("SOURCE_NAME")
if missing:
raise RuntimeError(
f"Error: missing environment variable(s): {', '.join(missing)}"
)
Comment on lines +67 to +70

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this runtime error should be returned earlier in the code or removed from this position

return {
"postgres_url": POSTGRES_URL,
"azure_storage_connection_string": AZURE_STORAGE_CONNECTION_STRING,
"source_name": SOURCE_NAME,
}


def fetch_records() -> list[dict]:
"""Return a small batch of records to ingest.

In a real pipeline you would call an API here. Return a list of at least
one dict with a stable key set (for example: station, timestamp,
temperature_c, humidity_pct).
temperature_c).
"""
raise NotImplementedError("Task 3: return a list of at least one record")
return [
{
"station": "STATION_1",
"timestamp": "2024-06-01T12:00:00Z",
"temperature_c": 25.0,
"humidity_pct": 60,
},
{
"station": "STATION_2",
"timestamp": "2024-06-01T12:05:00Z",
"temperature_c": 26.5,
"humidity_pct": 65,
},
]
Comment on lines +83 to +98

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Uses date not timestamp, you can align it with the assignment example



def upload_raw_to_blob(records: list[dict], blob_conn_str: str, source: str) -> str:
Expand All @@ -62,7 +108,12 @@ def upload_raw_to_blob(records: list[dict], blob_conn_str: str, source: str) ->
teacher has pre-created it). Overwrite if the blob already exists so
same-day reruns succeed.
"""
raise NotImplementedError("Task 1 + Task 3: upload records to blob storage")
service = BlobServiceClient.from_connection_string(blob_conn_str)
container = service.get_container_client("raw")
data = json.dumps(records)
blob_name = f"{source}/{date.today().isoformat()}.json"
container.upload_blob(name=blob_name, data=data, overwrite=True)
return blob_name


def write_to_postgres(records: list[dict], postgres_url: str) -> int:
Expand All @@ -78,7 +129,30 @@ def write_to_postgres(records: list[dict], postgres_url: str) -> int:

See Chapter 4 for the connection-and-cursor pattern this is based on.
"""
raise NotImplementedError("Task 2 + Task 3: insert rows into Azure Postgres")
with closing(psycopg2.connect(postgres_url)) as conn:
with conn.cursor() as cur:
cur.execute("CREATE SCHEMA IF NOT EXISTS dev_bader;")
cur.execute("SET search_path TO dev_bader;")
cur.execute(CREATE_WEATHER_READINGS_SQL)
for r in records:
cur.execute(
"""
INSERT INTO weather_readings (station, timestamp, temperature_c, humidity_pct)
VALUES (%s, %s, %s, %s)
ON CONFLICT (station, timestamp) DO UPDATE SET
temperature_c = EXCLUDED.temperature_c,
humidity_pct = EXCLUDED.humidity_pct,
ingested_at = NOW()
""",
(
r["station"],
r["timestamp"],
r["temperature_c"],
r["humidity_pct"],
),
)
conn.commit()
return len(records)


def run() -> None:
Expand Down