Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion main.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
OUTPUT_DIR = Path("output")

# TODO (Task 7): replace with your GitHub username before running the pipeline.
GITHUB_USERNAME = "<your-github-username>"
GITHUB_USERNAME = "hannahwn"


def run() -> None:
Expand Down
6 changes: 6 additions & 0 deletions score.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"score": 0,
"pass": false,
"passingScore": 60,
"ai_assist_present": false
}
29 changes: 27 additions & 2 deletions src/clean.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,46 @@
def load_and_explore(data_dir: Path) -> tuple[pd.DataFrame, pd.DataFrame]:
"""Task 2: Load both CSV files and explore their contents before cleaning."""
# TODO: Read messy_sales.csv and messy_customers.csv with pd.read_csv().
sales_df = pd.read_csv(data_dir / "messy_sales.csv")
customers_df = pd.read_csv(data_dir / "messy_customers.csv")
# TODO: For each DataFrame call .info(), .describe(), .head(20), and .isna().sum().
logging.info("Sales DataFrame info:\n%s", sales_df.info())
logging.info("Sales DataFrame description:\n%s", sales_df.describe())
logging.info("Sales DataFrame head:\n%s", sales_df.head(20))
logging.info("Sales DataFrame null counts:\n%s", sales_df.isna().sum())
logging.info("Customers DataFrame info:\n%s", customers_df.info())
logging.info("Customers DataFrame description:\n%s", customers_df.describe())
logging.info("Customers DataFrame head:\n%s", customers_df.head(20))
logging.info("Customers DataFrame null counts:\n%s", customers_df.isna().sum())
# TODO: Log what you discover (e.g. which columns have nulls, any suspicious values).
raise NotImplementedError("Task 2: implement load_and_explore")
logging.info("Sales DataFrame has %d rows and %d columns", sales_df.shape[0], sales_df.shape[1])
logging.info("Customers DataFrame has %d rows and %d columns", customers_df.shape[0], customers_df.shape[1])
return sales_df, customers_df


def clean_sales(sales: pd.DataFrame) -> pd.DataFrame:
"""Task 3: Clean the sales DataFrame using vectorized Pandas operations."""
# TODO: Normalize product_name with .str.strip().str.title().
sales["product_name"] = sales["product_name"].str.strip().str.title()
# TODO: Normalize customer_email with .str.lower().str.strip().
sales["customer_email"] = sales["customer_email"].str.lower().str.strip()
# TODO: Convert price to numeric with pd.to_numeric(errors="coerce").
sales["price"] = pd.to_numeric(sales["price"], errors="coerce")
# TODO: Parse date with pd.to_datetime(errors="coerce").
sales["date"] = pd.to_datetime(sales["date"], errors="coerce")
# TODO: Drop rows where product_name is missing.
sales = sales.dropna(subset=["product_name"])
# TODO: Drop rows where price is negative.
sales = sales.dropna(subset=["price"])

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Negative prices aren't being removed here. The task is "drop rows where price is negative," but dropna() only drops blank values. I ran your pipeline and the cleaned data still contains a price of -149.99. You want a boolean filter here:

sales = sales[sales["price"] >= 0]


# TODO: Drop rows where quantity is zero.
sales = sales.dropna(subset=["quantity"])

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same thing here — "drop rows where quantity is zero" needs a filter, not dropna(). The cleaned data still has quantity == 0 rows, which then inflate your order counts. Try:

sales = sales[sales["quantity"] != 0]


# TODO: Drop rows where date is NaT (invalid after parsing).
sales = sales.dropna(subset=["date"])
# TODO: Remove duplicate transactions: .drop_duplicates(subset="transaction_id", keep="first").
sales = sales.drop_duplicates(subset="transaction_id", keep="first")
# TODO: Decide what to do with outlier prices (clip, flag, or leave) and add a comment explaining why.
raise NotImplementedError("Task 3: implement clean_sales")
# For simplicity, we'll leave outlier prices as they are, but in a real scenario, we might want to investigate them further or apply business rules to handle them.
return sales

15 changes: 13 additions & 2 deletions src/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,21 @@
def download_inputs(data_dir: Path) -> None:
"""Task 1: Download input CSV files from Azure Blob Storage."""
# TODO: Create a BlobServiceClient using DefaultAzureCredential and ACCOUNT_URL.
credential = DefaultAzureCredential()
blob_service_client = BlobServiceClient(account_url=ACCOUNT_URL, credential=credential)
# TODO: Get a container client for SOURCE_CONTAINER.
container_client = blob_service_client.get_container_client(SOURCE_CONTAINER)
# TODO: For each filename in FILES, download the blob and write it to data_dir/<filename>.
for filename in FILES:
Path("data").mkdir(exist_ok=True)
for name in FILES:
blob = container_client.get_blob_client(name)
with open(f"data/{name}", "wb") as f:
f.write(blob.download_blob().readall())

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's an indentation bug in the download loop. The with open(...) block sits outside the for loop, so only the last file (messy_customers.csv) gets written, while messy_sales.csv would be skipped. It also hardcodes "data/" instead of using the data_dir argument. Restructure into one loop:

data_dir.mkdir(exist_ok=True)
for name in FILES:
    blob = container_client.get_blob_client(name)
    with open(data_dir / name, "wb") as f:
        f.write(blob.download_blob().readall())
    logging.info("Downloaded %s", name)


# TODO: Log a message for each downloaded file.
raise NotImplementedError("Task 1: implement download_inputs")
for filename in FILES:
logging.info("Downloaded %s", name)


def upload_outputs(output_dir: Path, github_username: str) -> None:
Expand All @@ -31,4 +42,4 @@ def upload_outputs(output_dir: Path, github_username: str) -> None:
# TODO: Upload every .parquet file in output_dir to the container.
# TODO: Download customer_summary.parquet back and assert its row count matches the local file.
# TODO: Log the container name and number of files uploaded.
raise NotImplementedError("Task 7: implement upload_outputs")

50 changes: 48 additions & 2 deletions src/report.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,70 @@
def build_reports(enriched: pd.DataFrame) -> dict[str, pd.DataFrame]:
"""Task 5: Build four summary tables using groupby and named aggregations."""
# TODO: Add a week column using .dt.isocalendar().week.
enriched["week"] = enriched["date"].dt.isocalendar().week
# TODO: Build weekly_revenue: group by week and region, columns week/region/total_revenue/order_count.
enriched["revenue"] = enriched["price"] * enriched["quantity"]
weekly_revenue = enriched.groupby(["week", "region"]).agg(
total_revenue=pd.NamedAgg(column="revenue", aggfunc="sum"),
order_count=pd.NamedAgg(column="transaction_id", aggfunc="count"),
).reset_index()
# TODO: Build customer_summary: group by customer_email, columns customer_email/customer_name/
# region/loyalty_tier/total_spent/avg_order/order_count.
# Use ("customer_name", "first") to pick the constant-per-group string columns.
customer_summary = enriched.groupby("customer_email").agg(
customer_name=pd.NamedAgg(column="customer_name", aggfunc="first"),
region=pd.NamedAgg(column="region", aggfunc="first"),
loyalty_tier=pd.NamedAgg(column="loyalty_tier", aggfunc="first"),
total_spent=pd.NamedAgg(column="revenue", aggfunc="sum"),
avg_order=pd.NamedAgg(column="revenue", aggfunc="mean"),
order_count=pd.NamedAgg(column="transaction_id", aggfunc="count"),
).reset_index()
# TODO: Build category_performance: group by category, columns category/total_revenue/order_count.
category_performance = enriched.groupby("category").agg(
total_revenue=pd.NamedAgg(column="revenue", aggfunc="sum"),
order_count=pd.NamedAgg(column="transaction_id", aggfunc="count"),
).reset_index()
# TODO: Build loyalty_analysis: group by loyalty_tier, columns loyalty_tier/avg_spent/customer_count.
raise NotImplementedError("Task 5: implement build_reports")
loyalty_analysis = enriched.groupby("loyalty_tier").agg(
avg_spent=pd.NamedAgg(column="revenue", aggfunc="mean"),
customer_count=pd.NamedAgg(column="customer_email", aggfunc="nunique"),
).reset_index()
return {
"weekly_revenue": weekly_revenue,
"customer_summary": customer_summary,
"category_performance": category_performance,
"loyalty_analysis": loyalty_analysis,
}





def write_outputs(reports: dict[str, pd.DataFrame], output_dir: Path) -> None:
"""Task 6: Write report tables to CSV/Parquet and save a bar chart."""
output_dir.mkdir(exist_ok=True)

# TODO: Write reports["weekly_revenue"] to weekly_revenue.csv with index=False.
reports["weekly_revenue"].to_csv(output_dir / "weekly_revenue.csv", index=False)
# TODO: Write reports["customer_summary"] to customer_summary.parquet with index=False.
reports["customer_summary"].to_parquet(output_dir / "customer_summary.parquet", index=False)
# TODO: Write reports["category_performance"] to category_performance.csv with index=False.
reports["category_performance"].to_csv(output_dir / "category_performance.csv", index=False)
# TODO: Sort category_performance by total_revenue descending.
reports["category_performance"] = reports["category_performance"].sort_values("total_revenue", ascending=False)
# TODO: Plot a bar chart (x="category", y="total_revenue") and save to category_revenue.png
# using plt.savefig(output_dir / "category_revenue.png", bbox_inches="tight").
# Use matplotlib.use("Agg") before importing pyplot for headless environments.
raise NotImplementedError("Task 6: implement write_outputs")
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
plt.figure(figsize=(10, 6))
plt.bar(reports["category_performance"]["category"], reports["category_performance"]["total_revenue"])
plt.xlabel("Category")
plt.ylabel("Total Revenue")
plt.title("Total Revenue by Category")
plt.xticks(rotation=45)
plt.tight_layout()
plt.savefig(output_dir / "category_revenue.png", bbox_inches="tight")
logging.info(f"Reports written to {output_dir}.")

7 changes: 6 additions & 1 deletion src/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,12 @@
def join_customers(sales: pd.DataFrame, customers: pd.DataFrame) -> pd.DataFrame:
"""Task 4: Normalize join keys, merge, and add a derived boolean flag."""
# TODO: Normalize customer_email in both DataFrames with .str.lower().str.strip().
sales["customer_email"] = sales["customer_email"].str.lower().str.strip()
customers["customer_email"] = customers["customer_email"].str.lower().str.strip()
# TODO: Merge sales with customers on customer_email using an inner join.
merged_df = pd.merge(sales, customers, on="customer_email", how="inner")
# TODO: Add a vectorized boolean column is_high_value: True where price * quantity >= 150.
merged_df["is_high_value"] = merged_df["price"] * merged_df["quantity"] >= 150
# TODO: (Optional hands-on) Try a left join instead and inspect rows where customer_name is NaN.
raise NotImplementedError("Task 4: implement join_customers")
return merged_df