Skip to content
Open
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ env/

# Generated pipeline output (committed templates stay; generated files do not)
output/error_report.json
output/azure_resource_groups.json
weather.db

# Editor and IDE settings
Expand Down
77 changes: 68 additions & 9 deletions AI_DEBUG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,79 @@ Document the debugging session below. If everything worked first try, introduce

## The Error

<!-- Paste the full Python traceback here. Include the error type, message, and the lines that caused it. -->
<!-- Traceback (most recent call last):
File "c:\Users\Bader\Desktop\w3\c55-data-week-3\pipeline.py", line 77, in <module>
run_pipeline()
File "c:\Users\Bader\Desktop\w3\c55-data-week-3\pipeline.py", line 38, in run_pipeline
valid_readings, error_dicts = validate_records(api_records + csv_records)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: validate_records() missing 1 required positional argument: 'source' -->

## The Prompt

<!-- Paste the exact message you sent to the LLM (ChatGPT, Claude, etc.).
Include: the error, the relevant code snippet, and what you asked the AI to help with. -->

<!-- Traceback (most recent call last):
File "c:\Users\Bader\Desktop\w3\c55-data-week-3\pipeline.py", line 77, in <module>
run_pipeline()
File "c:\Users\Bader\Desktop\w3\c55-data-week-3\pipeline.py", line 38, in run_pipeline
valid_readings, error_dicts = validate_records(api_records + csv_records)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: validate_records() missing 1 required positional argument: 'source' -->
## The Solution

<!-- What did the AI suggest?
Did you apply the suggestion as-is, or did you need to adapt it? Explain what changed. -->
<!-- Ah, the classic TypeError. This one is telling us exactly what the mismatch is between how your function was written and how it's being called.
The Problem

Your code on line 38 is calling validate_records() like this:
Python

validate_records(api_records + csv_records)

Here, you are combining two lists (api_records and csv_records) into one single list using the + operator, and passing that single list into the function.

However, the error states that validate_records() is missing a required positional argument named 'source'. This means the function definition looks something like this:
Python

def validate_records(records, source):
# Expects TWO arguments: the list of records AND the source name

Because you only passed the combined list, Python has no idea what to put for the source argument.
How to Fix It

Depending on how your pipeline is supposed to work, you have two main ways to fix this.
Option 1: Pass the source name explicitly (Recommended)

If you want to validate all records but need to tag them with a source name, pass the string as the second argument:
Python

valid_readings, error_dicts = validate_records(api_records + csv_records, source="combined")

Option 2: Run them separately if you want to track the sources accurately

If your pipeline needs to know exactly where an error came from (the API vs. the CSV), you shouldn't combine them before validating. Run them sequentially instead:
Python

# 1. Validate API records
valid_api, api_errors = validate_records(api_records, source="API")

# 2. Validate CSV records
valid_csv, csv_errors = validate_records(csv_records, source="CSV")

# 3. Combine the results afterward
valid_readings = valid_api + valid_csv
error_dicts = api_errors + csv_errors

Option 3: Change the function definition (If 'source' isn't actually needed)

If you wrote validate_records yourself and realize you don't actually care about tracking the source anymore, go to where the function is defined and give source a default value so it becomes optional:
Python

def validate_records(records, source="unknown"):
# Now it won't crash if you leave 'source' out!

super long answer as always i needed to fine the simple solution of added the missing source paramater -->

## Reflection

<!-- Did you understand *why* the code was broken before you got the AI's answer?
After the fix: do you understand why it works now?
What would you do differently next time you hit this type of error? -->
<!-- yes i noticed source missing from traceback but thought its much more complicated than this
After the fix: yes
simplify things on my own before letting AI complicate them or lack of information -->
23 changes: 23 additions & 0 deletions azure.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import json

import requests
from pathlib import Path


OUTPUT_DIR = Path("output")
OUTPUT_DIR.mkdir(exist_ok=True)


token = "token_here"
subscription_id = "1120c89d-2a5f-4a15-a582-2ea34f0bb5c3" # from the portal above

url = f"https://management.azure.com/subscriptions/{subscription_id}/resourcegroups?api-version=2024-03-01"
headers = {"Authorization": f"Bearer {token}"}

response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status()
for rg in response.json()["value"]:
print(f"{rg['name']}: {rg['location']}")

with open(OUTPUT_DIR / "azure_resource_groups.json", "w") as f:
json.dump(response.json(), f, indent=2)
67 changes: 59 additions & 8 deletions database.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,54 @@ def create_tables(conn: sqlite3.Connection) -> None:
weather_readings columns: id, station, timestamp, temperature_c, humidity_pct
+ UNIQUE(station, timestamp) constraint for upserts
"""
# TODO: use conn.execute() with CREATE TABLE IF NOT EXISTS statements
raise NotImplementedError
conn.execute(
"""
CREATE TABLE IF NOT EXISTS raw_weather (
id INTEGER PRIMARY KEY,
station TEXT NOT NULL,
timestamp TEXT NOT NULL,
temperature_c REAL NOT NULL,
humidity_pct REAL NOT NULL,
source TEXT NOT NULL,
ingested_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
"""
)
conn.execute(
"""
CREATE TABLE IF NOT EXISTS weather_readings (
id INTEGER PRIMARY KEY,
station TEXT NOT NULL,
timestamp TEXT NOT NULL,
temperature_c REAL NOT NULL,
humidity_pct REAL NOT NULL,
UNIQUE(station, timestamp)
)
"""
)
conn.commit()


def insert_raw(conn: sqlite3.Connection, records: list[dict], source: str) -> None:
"""Insert raw records (before validation) into raw_weather.

Use parameterized queries with placeholder syntax; do not build SQL via string formatting.
"""
# TODO: implement
raise NotImplementedError
for record in records:

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using a for loop to execute individual INSERT statements creates a row-by-row processing which is not optimal. Every single time conn.execute() runs inside a loop, Python has to make a separate call to the SQLite database, write one row, and close the connection. While this works fine for a test file with 10 rows, this will really cause your pipeline to be extremely slow and not be scalable.

Instead, you can stage your dataset into a list of tuples and utilize conn.executemany()!

conn.execute(
"""
INSERT INTO raw_weather (station, timestamp, temperature_c, humidity_pct, source)
VALUES (?, ?, ?, ?, ?)
""",
(
record["station"],
record["timestamp"],
record["temperature_c"],
record["humidity_pct"],
source,
),
)
conn.commit()


def upsert_readings(conn: sqlite3.Connection, readings: list[WeatherReading]) -> None:
Expand All @@ -43,11 +80,25 @@ def upsert_readings(conn: sqlite3.Connection, readings: list[WeatherReading]) ->
Use the upsert pattern to handle duplicate (station, timestamp) pairs.
Use parameterized queries.
"""
# TODO: implement
raise NotImplementedError
for reading in readings:
conn.execute(
"""
INSERT INTO weather_readings (station, timestamp, temperature_c, humidity_pct)
VALUES (?, ?, ?, ?)
ON CONFLICT(station, timestamp) DO UPDATE SET
temperature_c = excluded.temperature_c,
humidity_pct = excluded.humidity_pct
""",
(
reading.station,
reading.timestamp,
reading.temperature_c,
reading.humidity_pct,
),
)
conn.commit()


def count_readings(conn: sqlite3.Connection) -> int:
"""Return the total number of rows in weather_readings."""
# TODO: implement
raise NotImplementedError
return conn.execute("SELECT COUNT(*) FROM weather_readings").fetchone()[0]
76 changes: 64 additions & 12 deletions ingest_api.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,54 @@
# Step 2 — Tasks 1 & 2: Error Handling + API Ingestion
# fetch_with_retry handles transient network errors (Task 1).
# fetch_api_records calls it and shapes the response into flat dicts (Task 2).
import logging
import time

import requests

logger = logging.getLogger(__name__)

API_URL = "https://api.open-meteo.com/v1/forecast"


def fetch_with_retry(url: str, params: dict, max_retries: int = 3, timeout: int = 10) -> dict:
def fetch_with_retry(
url: str, params: dict, max_retries: int = 3, timeout: int = 10
) -> dict:
"""Fetch url with exponential backoff on transient errors.

Retry on: ConnectionError, Timeout, 5xx status codes.
Fail immediately on: 4xx status codes.
Log each retry attempt with the error and delay.
"""
# TODO: implement retry loop with exponential backoff
raise NotImplementedError
for attempt in range(max_retries):
try:
response = requests.get(url, params=params, timeout=timeout)

response.raise_for_status()
return response.json()

except requests.exceptions.HTTPError as e:
if response.status_code < 500:
logger.error(
f"Client error {response.status_code}. Failing immediately."
)
raise

error_msg = f"Server error {response.status_code}"
error_type = error_msg

except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as e:
error_type = type(e).__name__

if attempt == max_retries - 1:
logger.error(
f"Max retries reached. Final attempt failed with {error_type}."
)
raise

wait_time = 2**attempt
logger.warning(
f"Attempt {attempt + 1} failed ({error_type}). Retrying in {wait_time}s..."
)
time.sleep(wait_time)

raise RuntimeError("unreachable")


def fetch_api_records() -> list[dict]:
Expand All @@ -34,8 +63,31 @@ def fetch_api_records() -> list[dict]:
"hourly": "temperature_2m,relative_humidity_2m",
"forecast_days": 7,
}
# TODO:
# - Call fetch_with_retry with API_URL and params
# - The API returns {"hourly": {"time": [...], "temperature_2m": [...], "relative_humidity_2m": [...]}}
# - Flatten to a list of dicts; set station="Open-Meteo Copenhagen" for all records
raise NotImplementedError

try:
data = fetch_with_retry(API_URL, params=params)
except Exception as e:
logger.error(f"Failed to fetch data from API: {e}")
return []

# Task 2: Robust parsing and flattening
hourly = data.get("hourly", {})
times = hourly.get("time", [])
temps = hourly.get("temperature_2m", [])
humidities = hourly.get("relative_humidity_2m", [])

if not times or not temps or not humidities:
logger.warning(
"API response structure was missing expected hourly data data arrays."
)
return []

return [
{
"station": "Open-Meteo Copenhagen",
"timestamp": ts,
"temperature_c": temp,
"humidity_pct": hum,
}
for ts, temp, hum in zip(times, temps, humidities, strict=True)
]
22 changes: 20 additions & 2 deletions ingest_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,23 @@ def read_csv_records(path: Path) -> list[dict]:
- Convert temperature_c to float and humidity_pct to int where possible.
- Leave unconvertible values (e.g. "N/A", "") as-is so validation can catch them.
"""
# TODO: implement CSV reading and normalization
raise NotImplementedError
with open(path, newline="", encoding="utf-8") as f:
reader = csv.DictReader(f)
records = []
for row in reader:
record = {
"station": row.get("station", "unknown"),
"timestamp": row.get("timestamp", ""),
"temperature_c": row.get("temperature_c", ""),
"humidity_pct": row.get("humidity_pct", ""),
}
try:
record["temperature_c"] = float(record["temperature_c"])
except ValueError:
pass
try:
record["humidity_pct"] = int(record["humidity_pct"])
except ValueError:
pass
records.append(record)
return records
3 changes: 1 addition & 2 deletions models.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,4 @@ class WeatherReading(BaseModel):
@field_validator("station")
@classmethod
def clean_station(cls, v: str) -> str:
# TODO: strip whitespace and convert to title case
raise NotImplementedError
return v.strip().title()
12 changes: 9 additions & 3 deletions output/azure_compare.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,18 @@ Fill in each section below (2-3 sentences each) after completing the Task 7 step

## Auth

<!-- Fill in here -->
<!-- azure call without the token gives 401 client error unauthorized for url.
metro requires no token therefore you cant face this error but much simpler to access.
-->

## Schema verbosity

<!-- Fill in here -->
<!-- azure reply was much more readable.
azure reply was also more nested compared to metro which is one line.
response from metro is much more difficult to read. -->

## api-version in the URL

<!-- Fill in here -->
<!-- azure pins the api-version in the url for stability.
this way your code gets pinned to the day it worked on that version and it wont break overnight because of an update.
if you leave it out you will get a bad request http 400. -->
13 changes: 13 additions & 0 deletions output/azure_resource_groups.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
{
"value": [
{
"id": "/subscriptions/1120c89d-2a5f-4a15-a582-2ea34f0bb5c3/resourceGroups/rg-hyf-students-readonly",
"name": "rg-hyf-students-readonly",
"type": "Microsoft.Resources/resourceGroups",
"location": "westeurope",
"properties": {
"provisioningState": "Succeeded"
}
}
]
}
Loading