diff --git a/main.py b/main.py index 9f17efc..378fbd4 100644 --- a/main.py +++ b/main.py @@ -13,7 +13,7 @@ OUTPUT_DIR = Path("output") # TODO (Task 7): replace with your GitHub username before running the pipeline. -GITHUB_USERNAME = "" +GITHUB_USERNAME = "hannahwn" def run() -> None: diff --git a/score.json b/score.json new file mode 100644 index 0000000..653deae --- /dev/null +++ b/score.json @@ -0,0 +1,6 @@ +{ + "score": 0, + "pass": false, + "passingScore": 60, + "ai_assist_present": false +} diff --git a/src/clean.py b/src/clean.py index b4036fd..c47d988 100644 --- a/src/clean.py +++ b/src/clean.py @@ -8,21 +8,46 @@ def load_and_explore(data_dir: Path) -> tuple[pd.DataFrame, pd.DataFrame]: """Task 2: Load both CSV files and explore their contents before cleaning.""" # TODO: Read messy_sales.csv and messy_customers.csv with pd.read_csv(). + sales_df = pd.read_csv(data_dir / "messy_sales.csv") + customers_df = pd.read_csv(data_dir / "messy_customers.csv") # TODO: For each DataFrame call .info(), .describe(), .head(20), and .isna().sum(). + logging.info("Sales DataFrame info:\n%s", sales_df.info()) + logging.info("Sales DataFrame description:\n%s", sales_df.describe()) + logging.info("Sales DataFrame head:\n%s", sales_df.head(20)) + logging.info("Sales DataFrame null counts:\n%s", sales_df.isna().sum()) + logging.info("Customers DataFrame info:\n%s", customers_df.info()) + logging.info("Customers DataFrame description:\n%s", customers_df.describe()) + logging.info("Customers DataFrame head:\n%s", customers_df.head(20)) + logging.info("Customers DataFrame null counts:\n%s", customers_df.isna().sum()) # TODO: Log what you discover (e.g. which columns have nulls, any suspicious values). - raise NotImplementedError("Task 2: implement load_and_explore") + logging.info("Sales DataFrame has %d rows and %d columns", sales_df.shape[0], sales_df.shape[1]) + logging.info("Customers DataFrame has %d rows and %d columns", customers_df.shape[0], customers_df.shape[1]) + return sales_df, customers_df def clean_sales(sales: pd.DataFrame) -> pd.DataFrame: """Task 3: Clean the sales DataFrame using vectorized Pandas operations.""" # TODO: Normalize product_name with .str.strip().str.title(). + sales["product_name"] = sales["product_name"].str.strip().str.title() # TODO: Normalize customer_email with .str.lower().str.strip(). + sales["customer_email"] = sales["customer_email"].str.lower().str.strip() # TODO: Convert price to numeric with pd.to_numeric(errors="coerce"). + sales["price"] = pd.to_numeric(sales["price"], errors="coerce") # TODO: Parse date with pd.to_datetime(errors="coerce"). + sales["date"] = pd.to_datetime(sales["date"], errors="coerce") # TODO: Drop rows where product_name is missing. + sales = sales.dropna(subset=["product_name"]) # TODO: Drop rows where price is negative. + sales = sales.dropna(subset=["price"]) + # TODO: Drop rows where quantity is zero. + sales = sales.dropna(subset=["quantity"]) + # TODO: Drop rows where date is NaT (invalid after parsing). + sales = sales.dropna(subset=["date"]) # TODO: Remove duplicate transactions: .drop_duplicates(subset="transaction_id", keep="first"). + sales = sales.drop_duplicates(subset="transaction_id", keep="first") # TODO: Decide what to do with outlier prices (clip, flag, or leave) and add a comment explaining why. - raise NotImplementedError("Task 3: implement clean_sales") + # For simplicity, we'll leave outlier prices as they are, but in a real scenario, we might want to investigate them further or apply business rules to handle them. + return sales + \ No newline at end of file diff --git a/src/ingest.py b/src/ingest.py index 01fe28f..38592d9 100644 --- a/src/ingest.py +++ b/src/ingest.py @@ -15,10 +15,21 @@ def download_inputs(data_dir: Path) -> None: """Task 1: Download input CSV files from Azure Blob Storage.""" # TODO: Create a BlobServiceClient using DefaultAzureCredential and ACCOUNT_URL. + credential = DefaultAzureCredential() + blob_service_client = BlobServiceClient(account_url=ACCOUNT_URL, credential=credential) # TODO: Get a container client for SOURCE_CONTAINER. + container_client = blob_service_client.get_container_client(SOURCE_CONTAINER) # TODO: For each filename in FILES, download the blob and write it to data_dir/. + for filename in FILES: + Path("data").mkdir(exist_ok=True) + for name in FILES: + blob = container_client.get_blob_client(name) + with open(f"data/{name}", "wb") as f: + f.write(blob.download_blob().readall()) + # TODO: Log a message for each downloaded file. - raise NotImplementedError("Task 1: implement download_inputs") + for filename in FILES: + logging.info("Downloaded %s", name) def upload_outputs(output_dir: Path, github_username: str) -> None: @@ -31,4 +42,4 @@ def upload_outputs(output_dir: Path, github_username: str) -> None: # TODO: Upload every .parquet file in output_dir to the container. # TODO: Download customer_summary.parquet back and assert its row count matches the local file. # TODO: Log the container name and number of files uploaded. - raise NotImplementedError("Task 7: implement upload_outputs") + diff --git a/src/report.py b/src/report.py index a002b7b..95e664b 100644 --- a/src/report.py +++ b/src/report.py @@ -8,13 +8,43 @@ def build_reports(enriched: pd.DataFrame) -> dict[str, pd.DataFrame]: """Task 5: Build four summary tables using groupby and named aggregations.""" # TODO: Add a week column using .dt.isocalendar().week. + enriched["week"] = enriched["date"].dt.isocalendar().week # TODO: Build weekly_revenue: group by week and region, columns week/region/total_revenue/order_count. + enriched["revenue"] = enriched["price"] * enriched["quantity"] + weekly_revenue = enriched.groupby(["week", "region"]).agg( + total_revenue=pd.NamedAgg(column="revenue", aggfunc="sum"), + order_count=pd.NamedAgg(column="transaction_id", aggfunc="count"), + ).reset_index() # TODO: Build customer_summary: group by customer_email, columns customer_email/customer_name/ # region/loyalty_tier/total_spent/avg_order/order_count. # Use ("customer_name", "first") to pick the constant-per-group string columns. + customer_summary = enriched.groupby("customer_email").agg( + customer_name=pd.NamedAgg(column="customer_name", aggfunc="first"), + region=pd.NamedAgg(column="region", aggfunc="first"), + loyalty_tier=pd.NamedAgg(column="loyalty_tier", aggfunc="first"), + total_spent=pd.NamedAgg(column="revenue", aggfunc="sum"), + avg_order=pd.NamedAgg(column="revenue", aggfunc="mean"), + order_count=pd.NamedAgg(column="transaction_id", aggfunc="count"), + ).reset_index() # TODO: Build category_performance: group by category, columns category/total_revenue/order_count. + category_performance = enriched.groupby("category").agg( + total_revenue=pd.NamedAgg(column="revenue", aggfunc="sum"), + order_count=pd.NamedAgg(column="transaction_id", aggfunc="count"), + ).reset_index() # TODO: Build loyalty_analysis: group by loyalty_tier, columns loyalty_tier/avg_spent/customer_count. - raise NotImplementedError("Task 5: implement build_reports") + loyalty_analysis = enriched.groupby("loyalty_tier").agg( + avg_spent=pd.NamedAgg(column="revenue", aggfunc="mean"), + customer_count=pd.NamedAgg(column="customer_email", aggfunc="nunique"), + ).reset_index() + return { + "weekly_revenue": weekly_revenue, + "customer_summary": customer_summary, + "category_performance": category_performance, + "loyalty_analysis": loyalty_analysis, + } + + + def write_outputs(reports: dict[str, pd.DataFrame], output_dir: Path) -> None: @@ -22,10 +52,26 @@ def write_outputs(reports: dict[str, pd.DataFrame], output_dir: Path) -> None: output_dir.mkdir(exist_ok=True) # TODO: Write reports["weekly_revenue"] to weekly_revenue.csv with index=False. + reports["weekly_revenue"].to_csv(output_dir / "weekly_revenue.csv", index=False) # TODO: Write reports["customer_summary"] to customer_summary.parquet with index=False. + reports["customer_summary"].to_parquet(output_dir / "customer_summary.parquet", index=False) # TODO: Write reports["category_performance"] to category_performance.csv with index=False. + reports["category_performance"].to_csv(output_dir / "category_performance.csv", index=False) # TODO: Sort category_performance by total_revenue descending. + reports["category_performance"] = reports["category_performance"].sort_values("total_revenue", ascending=False) # TODO: Plot a bar chart (x="category", y="total_revenue") and save to category_revenue.png # using plt.savefig(output_dir / "category_revenue.png", bbox_inches="tight"). # Use matplotlib.use("Agg") before importing pyplot for headless environments. - raise NotImplementedError("Task 6: implement write_outputs") + import matplotlib + matplotlib.use("Agg") + import matplotlib.pyplot as plt + plt.figure(figsize=(10, 6)) + plt.bar(reports["category_performance"]["category"], reports["category_performance"]["total_revenue"]) + plt.xlabel("Category") + plt.ylabel("Total Revenue") + plt.title("Total Revenue by Category") + plt.xticks(rotation=45) + plt.tight_layout() + plt.savefig(output_dir / "category_revenue.png", bbox_inches="tight") + logging.info(f"Reports written to {output_dir}.") + \ No newline at end of file diff --git a/src/transform.py b/src/transform.py index 18f82e5..3c403a4 100644 --- a/src/transform.py +++ b/src/transform.py @@ -7,7 +7,12 @@ def join_customers(sales: pd.DataFrame, customers: pd.DataFrame) -> pd.DataFrame: """Task 4: Normalize join keys, merge, and add a derived boolean flag.""" # TODO: Normalize customer_email in both DataFrames with .str.lower().str.strip(). + sales["customer_email"] = sales["customer_email"].str.lower().str.strip() + customers["customer_email"] = customers["customer_email"].str.lower().str.strip() # TODO: Merge sales with customers on customer_email using an inner join. + merged_df = pd.merge(sales, customers, on="customer_email", how="inner") # TODO: Add a vectorized boolean column is_high_value: True where price * quantity >= 150. + merged_df["is_high_value"] = merged_df["price"] * merged_df["quantity"] >= 150 # TODO: (Optional hands-on) Try a left join instead and inspect rows where customer_name is NaN. - raise NotImplementedError("Task 4: implement join_customers") + return merged_df +