-
Notifications
You must be signed in to change notification settings - Fork 8
Mohamad Bader AA #2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
35938c8
9594854
cdcc7d4
4d7fecc
f95da11
1a0c607
eba70d8
f57d525
01bedf6
ff81f18
05bfd68
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,3 @@ | ||
| Name StartTime Status | ||
| ---------------------------- ------------------------- --------- | ||
| noneeeed-weather-job-17t4zca 2026-06-10T14:55:41+00:00 Succeeded |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -5,20 +5,37 @@ | |
| Database for PostgreSQL. When you finish the assignment it will run as a | ||
| Container App Job triggered from the Azure Portal or the CLI. | ||
|
|
||
| Replace every `raise NotImplementedError` below with a real implementation. | ||
|
|
||
| Reference chapters: | ||
| - Blob upload: Data Track/Week 6/week_6__3_azure_blob_storage.md | ||
| - Postgres connect: Data Track/Week 6/week_6__4_azure_postgresql.md | ||
| - Container Job: Data Track/Week 6/week_6__5_container_apps_jobs.md | ||
| """ | ||
|
|
||
| import json | ||
| import logging | ||
| import os | ||
| from datetime import date | ||
| import psycopg2 | ||
| from azure.storage.blob import BlobServiceClient | ||
| from contextlib import closing | ||
|
|
||
|
|
||
| logging.basicConfig(level=logging.INFO, format="%(levelname)s %(message)s") | ||
| logger = logging.getLogger(__name__) | ||
| logging.getLogger("azure").setLevel(logging.WARNING) | ||
|
|
||
| CREATE_WEATHER_READINGS_SQL = """ | ||
| CREATE TABLE IF NOT EXISTS weather_readings ( | ||
| id SERIAL PRIMARY KEY, | ||
| station TEXT NOT NULL, | ||
| timestamp TIMESTAMPTZ NOT NULL, | ||
| temperature_c DOUBLE PRECISION NOT NULL, | ||
| humidity_pct INTEGER NOT NULL, | ||
| ingested_at TIMESTAMPTZ DEFAULT NOW(), | ||
| UNIQUE(station, timestamp) | ||
| ) | ||
| """ | ||
|
|
||
| # TASK 3 hint: quiet the Azure SDK so its DEBUG output does not drown your own | ||
| # pipeline logs. The right call lives in Chapter 5 (Viewing logs). | ||
|
|
@@ -35,21 +52,50 @@ def get_config() -> dict: | |
| - SOURCE_NAME: logical source label, default "weather". | ||
| - LOG_LEVEL: not parsed here; the orchestrator sets it via env var. | ||
|
|
||
| Raise RuntimeError with a clear message if a required variable is missing. | ||
| """ | ||
| raise NotImplementedError( | ||
| "Task 3: read POSTGRES_URL and AZURE_STORAGE_CONNECTION_STRING from os.environ" | ||
| ) | ||
| POSTGRES_URL = os.environ.get("POSTGRES_URL") | ||
| AZURE_STORAGE_CONNECTION_STRING = os.environ.get("AZURE_STORAGE_CONNECTION_STRING") | ||
| SOURCE_NAME = os.environ.get("SOURCE_NAME", "weather") | ||
|
|
||
| missing = [] | ||
| if not POSTGRES_URL: | ||
| missing.append("POSTGRES_URL") | ||
| if not AZURE_STORAGE_CONNECTION_STRING: | ||
| missing.append("AZURE_STORAGE_CONNECTION_STRING") | ||
| if not SOURCE_NAME: | ||
| missing.append("SOURCE_NAME") | ||
| if missing: | ||
| raise RuntimeError( | ||
| f"Error: missing environment variable(s): {', '.join(missing)}" | ||
| ) | ||
|
Comment on lines
+67
to
+70
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this runtime error should be returned earlier in the code or removed from this position |
||
| return { | ||
| "postgres_url": POSTGRES_URL, | ||
| "azure_storage_connection_string": AZURE_STORAGE_CONNECTION_STRING, | ||
| "source_name": SOURCE_NAME, | ||
| } | ||
|
|
||
|
|
||
| def fetch_records() -> list[dict]: | ||
| """Return a small batch of records to ingest. | ||
|
|
||
| In a real pipeline you would call an API here. Return a list of at least | ||
| one dict with a stable key set (for example: station, timestamp, | ||
| temperature_c, humidity_pct). | ||
| temperature_c). | ||
| """ | ||
| raise NotImplementedError("Task 3: return a list of at least one record") | ||
| return [ | ||
| { | ||
| "station": "STATION_1", | ||
| "timestamp": "2024-06-01T12:00:00Z", | ||
| "temperature_c": 25.0, | ||
| "humidity_pct": 60, | ||
| }, | ||
| { | ||
| "station": "STATION_2", | ||
| "timestamp": "2024-06-01T12:05:00Z", | ||
| "temperature_c": 26.5, | ||
| "humidity_pct": 65, | ||
| }, | ||
| ] | ||
|
Comment on lines
+83
to
+98
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Uses date not timestamp, you can align it with the assignment example |
||
|
|
||
|
|
||
| def upload_raw_to_blob(records: list[dict], blob_conn_str: str, source: str) -> str: | ||
|
|
@@ -62,7 +108,12 @@ def upload_raw_to_blob(records: list[dict], blob_conn_str: str, source: str) -> | |
| teacher has pre-created it). Overwrite if the blob already exists so | ||
| same-day reruns succeed. | ||
| """ | ||
| raise NotImplementedError("Task 1 + Task 3: upload records to blob storage") | ||
| service = BlobServiceClient.from_connection_string(blob_conn_str) | ||
| container = service.get_container_client("raw") | ||
| data = json.dumps(records) | ||
| blob_name = f"{source}/{date.today().isoformat()}.json" | ||
| container.upload_blob(name=blob_name, data=data, overwrite=True) | ||
| return blob_name | ||
|
|
||
|
|
||
| def write_to_postgres(records: list[dict], postgres_url: str) -> int: | ||
|
|
@@ -78,7 +129,30 @@ def write_to_postgres(records: list[dict], postgres_url: str) -> int: | |
|
|
||
| See Chapter 4 for the connection-and-cursor pattern this is based on. | ||
| """ | ||
| raise NotImplementedError("Task 2 + Task 3: insert rows into Azure Postgres") | ||
| with closing(psycopg2.connect(postgres_url)) as conn: | ||
| with conn.cursor() as cur: | ||
| cur.execute("CREATE SCHEMA IF NOT EXISTS dev_bader;") | ||
| cur.execute("SET search_path TO dev_bader;") | ||
| cur.execute(CREATE_WEATHER_READINGS_SQL) | ||
| for r in records: | ||
| cur.execute( | ||
| """ | ||
| INSERT INTO weather_readings (station, timestamp, temperature_c, humidity_pct) | ||
| VALUES (%s, %s, %s, %s) | ||
| ON CONFLICT (station, timestamp) DO UPDATE SET | ||
| temperature_c = EXCLUDED.temperature_c, | ||
| humidity_pct = EXCLUDED.humidity_pct, | ||
| ingested_at = NOW() | ||
| """, | ||
| ( | ||
| r["station"], | ||
| r["timestamp"], | ||
| r["temperature_c"], | ||
| r["humidity_pct"], | ||
| ), | ||
| ) | ||
| conn.commit() | ||
| return len(records) | ||
|
|
||
|
|
||
| def run() -> None: | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SOURCE_NAME always has default "weather", so this never fires