Skip to content

feat: parquet sharding#4

Merged
JakubPekar merged 6 commits into
masterfrom
feature/parquet-sharding
Apr 15, 2026
Merged

feat: parquet sharding#4
JakubPekar merged 6 commits into
masterfrom
feature/parquet-sharding

Conversation

@JakubPekar

@JakubPekar JakubPekar commented Apr 13, 2026

Copy link
Copy Markdown
Collaborator

Summary by CodeRabbit

  • New Features
    • Parquet sharding: split large Parquet files into smaller, configurable shards to ease processing and reduce memory use. Supports adjustable shard and batch sizes, reports progress and completion, and ensures writers are safely closed on errors.

@JakubPekar JakubPekar requested review from a team and Copilot April 13, 2026 15:40
@coderabbitai

coderabbitai Bot commented Apr 13, 2026

Copy link
Copy Markdown

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: ad741d7c-6da9-46a4-9e78-8a50992c53ae

📥 Commits

Reviewing files that changed from the base of the PR and between 8c24682 and 55cff1a.

📒 Files selected for processing (2)
  • rationai/mlkit/data/__init__.py
  • rationai/mlkit/data/shard_parquet.py
✅ Files skipped from review due to trivial changes (1)
  • rationai/mlkit/data/init.py
🚧 Files skipped from review as they are similar to previous changes (1)
  • rationai/mlkit/data/shard_parquet.py

📝 Walkthrough

Walkthrough

Adds a new function to split a Parquet file into multiple shard Parquet files by iterating input row-group batches, lazily creating per-shard writers, writing batches until a per-shard row limit is reached, and ensuring writers are closed.

Changes

Cohort / File(s) Summary
Parquet sharding implementation
rationai/mlkit/data/shard_parquet.py
New shard_parquet() function: validates rows_per_shard and row_group_size, ensures output dir, reads source with pyarrow.parquet.ParquetFile, iterates via iter_batches, lazily initializes ParquetWriter per shard, writes batches, rotates/closes writers when shard row limits reached, and uses try/finally to guarantee writer closure.
Package export
rationai/mlkit/data/__init__.py
Exports shard_parquet via __all__, enabling from rationai.mlkit.data import shard_parquet.

Sequence Diagram(s)

sequenceDiagram
    participant Caller
    participant ParquetFile
    participant ShardWriter as ParquetWriter
    participant FS as Filesystem

    Caller->>ParquetFile: open(input_file)
    ParquetFile-->>Caller: metadata (row_count)
    loop for each row_group batch
        ParquetFile->>Caller: iter_batches -> batch
        alt shard writer not created
            Caller->>FS: ensure output_dir exists
            Caller->>ShardWriter: create writer (shard_x.parquet, schema)
        end
        Caller->>ShardWriter: write(batch)
        Caller->>Caller: accumulate rows_written
        alt rows_written >= rows_per_shard
            Caller->>ShardWriter: close()
            Caller->>Caller: shard_idx++, reset rows_written
        end
    end
    opt open writer remaining
        Caller->>ShardWriter: close()
    end
    Caller-->>FS: log completion
Loading

Estimated code review effort

🎯 2 (Simple) | ⏱️ ~10 minutes

Poem

🐇 I hop through rows and split the stack,
Carrots of Parquet packed in a pack,
Writers blink awake, then softly close,
Shards in neat lines where the data goes,
A rabbit's nibble—tidy files that stack.

🚥 Pre-merge checks | ✅ 3
✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'feat: parquet sharding' directly and concisely summarizes the main change: adding a new parquet sharding feature with the shard_parquet function.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feature/parquet-sharding

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a shard_parquet utility designed to split large Parquet files into smaller, manageable shards using memory-efficient streaming. The review feedback suggests several improvements to align with standard practices and enhance robustness: transitioning from os and print to pathlib and logging, refining the function signature to support Path objects, and implementing better resource management using try...finally blocks to ensure file writers are properly closed in case of errors. Additionally, a redundant parameter in the write_batch call was identified for removal.

Comment thread rationai/mlkit/data/shard_parquet.py Outdated
Comment thread rationai/mlkit/data/shard_parquet.py Outdated
Comment thread rationai/mlkit/data/shard_parquet.py Outdated

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@rationai/mlkit/data/shard_parquet.py`:
- Line 7: The code currently uses os.makedirs(output_dir, exist_ok=True) and
deterministic shard filenames like shard_00000.parquet which lets reruns
silently overwrite existing shards; change this by adding a pre-check for
existing shard files in output_dir (e.g., glob for "shard_*.parquet") and
fail/raise if any are found or create a uniquely-named output directory
(timestamp/suffix) instead, and remove or set exist_ok=False on
os.makedirs(output_dir, ...) so the guard triggers; apply the same protection
around the shard-writing loop that emits shard_*.parquet (lines that write
shards) to prevent accidental overwrites.
- Line 6: Validate sharding parameters at the start of shard_parquet: check that
rows_per_shard and row_group_size are positive integers (greater than 0) and
raise a clear ValueError if not; include the parameter name and offending value
in the error message so callers immediately see which argument is invalid and
why before any I/O or processing occurs.
- Around line 18-41: The loop over parquet_file.iter_batches currently can skip
writer.close() if iter_batches or writer.write_batch raises; wrap the entire
batching logic in a try/finally (or use a context manager for pq.ParquetWriter)
so that writer.close() is always called when writer is not None; ensure you
reference the same writer variable used when creating
out_path/shard_{shard_idx:05d}.parquet and that current_shard_rows/shard_idx
semantics remain unchanged so partial shards are closed in the finally block on
any exception.
- Around line 27-36: The code currently writes whole batches with
writer.write_batch(batch) then checks current_shard_rows, allowing a shard to
exceed rows_per_shard by a full batch; change write logic in the loop that
handles `batch` so that before calling writer.write_batch you check whether
batch.num_rows + current_shard_rows > rows_per_shard and, if so, split the batch
into smaller pieces and write only up to the remaining rows for the current
shard, close the writer (writer.close(); writer = None), increment shard_idx and
reset current_shard_rows, then create/open a new writer and continue writing the
rest of the batch until the entire batch is consumed; use the existing symbols
`batch`, `batch.num_rows`, `current_shard_rows`, `rows_per_shard`,
`writer.write_batch`, `writer.close`, and `shard_idx` to implement the looped
slicing/writing until all rows are written and no shard exceeds rows_per_shard.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 731d9160-ed15-40c7-b918-46097ee49135

📥 Commits

Reviewing files that changed from the base of the PR and between 91156fa and 14ad435.

📒 Files selected for processing (1)
  • rationai/mlkit/data/shard_parquet.py

Comment thread rationai/mlkit/data/shard_parquet.py Outdated
Comment thread rationai/mlkit/data/shard_parquet.py Outdated
Comment thread rationai/mlkit/data/shard_parquet.py Outdated
Comment thread rationai/mlkit/data/shard_parquet.py Outdated

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds a utility for splitting (“sharding”) a large Parquet file into multiple smaller Parquet files by streaming record batches from the source and writing them into sequential shard files.

Changes:

  • Introduces shard_parquet(...) to stream-read a Parquet file and write out multiple shard_XXXXX.parquet outputs.
  • Adds basic progress reporting and shard rollover logic based on rows_per_shard.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread rationai/mlkit/data/shard_parquet.py Outdated
Comment thread rationai/mlkit/data/shard_parquet.py Outdated
Comment thread rationai/mlkit/data/shard_parquet.py Outdated
Comment thread rationai/mlkit/data/shard_parquet.py Outdated
Comment thread rationai/mlkit/data/shard_parquet.py Outdated

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 1 out of 1 changed files in this pull request and generated 5 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread rationai/mlkit/data/shard_parquet.py Outdated
Comment thread rationai/mlkit/data/shard_parquet.py Outdated
Comment thread rationai/mlkit/data/shard_parquet.py Outdated
Comment thread rationai/mlkit/data/shard_parquet.py Outdated
Comment thread rationai/mlkit/data/shard_parquet.py
@JakubPekar JakubPekar requested a review from Copilot April 13, 2026 20:38
@JakubPekar

Copy link
Copy Markdown
Collaborator Author

\gemini review

@JakubPekar

Copy link
Copy Markdown
Collaborator Author

@CodeRabbit review

@coderabbitai

coderabbitai Bot commented Apr 13, 2026

Copy link
Copy Markdown
✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 1 out of 1 changed files in this pull request and generated 4 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread rationai/mlkit/data/shard_parquet.py
Comment thread rationai/mlkit/data/shard_parquet.py
Comment thread rationai/mlkit/data/shard_parquet.py Outdated
Comment thread rationai/mlkit/data/shard_parquet.py
Comment thread rationai/mlkit/data/shard_parquet.py
@JakubPekar JakubPekar requested a review from Adames4 April 14, 2026 09:00
@JakubPekar JakubPekar merged commit cdf212b into master Apr 15, 2026
2 of 3 checks passed
@JakubPekar JakubPekar deleted the feature/parquet-sharding branch April 15, 2026 09:01
@coderabbitai coderabbitai Bot mentioned this pull request Jun 24, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants