From 14ad435b85c7c381a0e95d03ad6dc4ad80be6c87 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20Pek=C3=A1r?= <492788@mail.muni.cz> Date: Mon, 13 Apr 2026 15:32:51 +0000 Subject: [PATCH 1/6] feat: parquet sharding --- rationai/mlkit/data/shard_parquet.py | 43 ++++++++++++++++++++++++++++ 1 file changed, 43 insertions(+) create mode 100644 rationai/mlkit/data/shard_parquet.py diff --git a/rationai/mlkit/data/shard_parquet.py b/rationai/mlkit/data/shard_parquet.py new file mode 100644 index 0000000..c2d7e04 --- /dev/null +++ b/rationai/mlkit/data/shard_parquet.py @@ -0,0 +1,43 @@ +import os + +import pyarrow.parquet as pq + + +def shard_parquet(input_file: str, output_dir: str, rows_per_shard: int = 100_000, row_group_size: int = 5000) -> None: + os.makedirs(output_dir, exist_ok=True) + + # Open the file metadata (does not load data into RAM) + parquet_file = pq.ParquetFile(input_file) + print(f"Total rows in source: {parquet_file.metadata.num_rows}") + + shard_idx = 0 + current_shard_rows = 0 + writer = None + + # Stream the file in tiny, memory-safe batches + for batch in parquet_file.iter_batches(batch_size=row_group_size): + + # Initialize a new file writer if we don't have one open + if writer is None: + out_path = os.path.join(output_dir, f"shard_{shard_idx:05d}.parquet") + # We enforce the smaller row_group_size here for the new files + writer = pq.ParquetWriter(out_path, batch.schema) + + # Write the batch to the current shard + writer.write_batch(batch, row_group_size=row_group_size) + current_shard_rows += batch.num_rows + + # If we hit our row limit for this shard, close it and increment + if current_shard_rows >= rows_per_shard: + writer.close() + writer = None + print(f"Finished writing shard {shard_idx:05d}") + shard_idx += 1 + current_shard_rows = 0 + + # Clean up the last writer + if writer is not None: + writer.close() + print(f"Finished writing final shard {shard_idx:05d}") + + print("Sharding complete!") \ No newline at end of file From 8c246826dd38373342fb654c0a41b73c1d007009 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20Pek=C3=A1r?= <492788@mail.muni.cz> Date: Mon, 13 Apr 2026 20:29:11 +0000 Subject: [PATCH 2/6] feat: shard parquet update --- rationai/mlkit/data/shard_parquet.py | 90 ++++++++++++++++------------ 1 file changed, 51 insertions(+), 39 deletions(-) diff --git a/rationai/mlkit/data/shard_parquet.py b/rationai/mlkit/data/shard_parquet.py index c2d7e04..125d505 100644 --- a/rationai/mlkit/data/shard_parquet.py +++ b/rationai/mlkit/data/shard_parquet.py @@ -1,43 +1,55 @@ -import os +import logging +from pathlib import Path import pyarrow.parquet as pq -def shard_parquet(input_file: str, output_dir: str, rows_per_shard: int = 100_000, row_group_size: int = 5000) -> None: - os.makedirs(output_dir, exist_ok=True) - - # Open the file metadata (does not load data into RAM) - parquet_file = pq.ParquetFile(input_file) - print(f"Total rows in source: {parquet_file.metadata.num_rows}") - - shard_idx = 0 - current_shard_rows = 0 - writer = None - - # Stream the file in tiny, memory-safe batches - for batch in parquet_file.iter_batches(batch_size=row_group_size): - - # Initialize a new file writer if we don't have one open - if writer is None: - out_path = os.path.join(output_dir, f"shard_{shard_idx:05d}.parquet") - # We enforce the smaller row_group_size here for the new files - writer = pq.ParquetWriter(out_path, batch.schema) - - # Write the batch to the current shard - writer.write_batch(batch, row_group_size=row_group_size) - current_shard_rows += batch.num_rows - - # If we hit our row limit for this shard, close it and increment - if current_shard_rows >= rows_per_shard: - writer.close() - writer = None - print(f"Finished writing shard {shard_idx:05d}") - shard_idx += 1 - current_shard_rows = 0 - - # Clean up the last writer - if writer is not None: - writer.close() - print(f"Finished writing final shard {shard_idx:05d}") - - print("Sharding complete!") \ No newline at end of file +_logger = logging.getLogger(__name__) + + +def shard_parquet( + input_file: str | Path, + output_dir: str | Path, + rows_per_shard: int = 100_000, + row_group_size: int = 5000, +) -> None: + + assert rows_per_shard > 0, "rows_per_shard must be grater than 0" + assert row_group_size > 0, "row_group_size must be grater than 0" + assert rows_per_shard % row_group_size > 0, ( + "rows_per_shard must be divisible by row_group_size" + ) + + output_dir = Path(output_dir) + output_dir.mkdir(parents=True) + + with pq.ParquetFile(input_file) as parquet_file: + _logger.info(f"Total rows in source: {parquet_file.metadata.num_rows}") + + shard_idx = 0 + current_shard_rows = 0 + writer = None + + try: + for batch in parquet_file.iter_batches(batch_size=row_group_size): + if writer is None: + out_path = output_dir / f"shard_{shard_idx:05d}.parquet" + writer = pq.ParquetWriter(out_path, batch.schema) + + writer.write_batch(batch) + current_shard_rows += batch.num_rows + + if current_shard_rows >= rows_per_shard: + writer.close() + writer = None + _logger.info(f"Finished writing shard {shard_idx:05d}") + shard_idx += 1 + current_shard_rows = 0 + + if writer is not None: + _logger.info(f"Finished writing final shard {shard_idx:05d}") + finally: + if writer is not None: + writer.close() + + _logger.info("Sharding complete!") From 20341af8d0731ef392f4de220d3163f4d4f341b3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20Pek=C3=A1r?= <492788@mail.muni.cz> Date: Mon, 13 Apr 2026 20:34:16 +0000 Subject: [PATCH 3/6] feat: docs --- rationai/mlkit/data/shard_parquet.py | 60 +++++++++++++++++++++++----- 1 file changed, 50 insertions(+), 10 deletions(-) diff --git a/rationai/mlkit/data/shard_parquet.py b/rationai/mlkit/data/shard_parquet.py index 125d505..39c1f6c 100644 --- a/rationai/mlkit/data/shard_parquet.py +++ b/rationai/mlkit/data/shard_parquet.py @@ -4,6 +4,7 @@ import pyarrow.parquet as pq +# Initialize a module-level logger _logger = logging.getLogger(__name__) @@ -13,43 +14,82 @@ def shard_parquet( rows_per_shard: int = 100_000, row_group_size: int = 5000, ) -> None: + """Splits a large Parquet file into smaller Parquet files (shards). - assert rows_per_shard > 0, "rows_per_shard must be grater than 0" - assert row_group_size > 0, "row_group_size must be grater than 0" - assert rows_per_shard % row_group_size > 0, ( + This function reads a single Parquet file in memory-efficient batches and writes + it out into multiple smaller files. Each output file will contain exactly + `rows_per_shard` rows, except potentially the final shard. + + Args: + input_file (str | Path): The path to the source Parquet file. + output_dir (str | Path): The directory where the output shards will be saved. + rows_per_shard (int, optional): The target number of rows per shard. + Defaults to 100,000. + row_group_size (int, optional): The number of rows to read/write per batch. + Defaults to 5,000. + + Raises: + AssertionError: If `rows_per_shard` or `row_group_size` are not strictly positive, + or if `rows_per_shard` is not perfectly divisible by `row_group_size`. + """ + # --- Input Validation --- + assert rows_per_shard > 0, "rows_per_shard must be greater than 0" + assert row_group_size > 0, "row_group_size must be greater than 0" + + # Ensure exact chunks can be written without remainder + assert rows_per_shard % row_group_size == 0, ( "rows_per_shard must be divisible by row_group_size" ) + # --- Setup Output Directory --- output_dir = Path(output_dir) - output_dir.mkdir(parents=True) + + # Create the target directory and any intermediate directories if they don't exist. + # exist_ok=True prevents crashes if you run the script multiple times. + output_dir.mkdir(parents=True, exist_ok=True) + # --- Read and Shard Process --- + # Open the Parquet file as a context manager so it safely closes when done with pq.ParquetFile(input_file) as parquet_file: _logger.info(f"Total rows in source: {parquet_file.metadata.num_rows}") - shard_idx = 0 - current_shard_rows = 0 - writer = None + # Initialize tracking variables + shard_idx = 0 # Tracks the current shard file number (e.g., 00000) + current_shard_rows = 0 # Tracks how many rows have been written to the current shard + writer = None # Holds the active pyarrow ParquetWriter instance try: + # Iterate through the source file in memory-efficient chunks (batches) for batch in parquet_file.iter_batches(batch_size=row_group_size): + + # If we don't have an active writer, create a new one for the current shard if writer is None: out_path = output_dir / f"shard_{shard_idx:05d}.parquet" writer = pq.ParquetWriter(out_path, batch.schema) + # Write the current batch and update the running row count writer.write_batch(batch) current_shard_rows += batch.num_rows + # Check if the current shard has reached its maximum capacity if current_shard_rows >= rows_per_shard: + # Finalize the current file writer.close() - writer = None + writer = None # Reset the writer so a new one spawns on the next loop iteration + _logger.info(f"Finished writing shard {shard_idx:05d}") + + # Prepare counters for the next shard shard_idx += 1 - current_shard_rows = 0 + current_shard_rows = 0 + # After the loop ends, check if there's a partially filled final shard left open if writer is not None: _logger.info(f"Finished writing final shard {shard_idx:05d}") + finally: + # Ensure the active writer is properly closed even if an unexpected error occurs if writer is not None: writer.close() - _logger.info("Sharding complete!") + _logger.info("Sharding complete!") \ No newline at end of file From e29dfb5d1156297539a3ddc7022096187b645227 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20Pek=C3=A1r?= <492788@mail.muni.cz> Date: Mon, 13 Apr 2026 20:36:34 +0000 Subject: [PATCH 4/6] feat: format --- rationai/mlkit/data/shard_parquet.py | 33 ++++++++++++++-------------- 1 file changed, 17 insertions(+), 16 deletions(-) diff --git a/rationai/mlkit/data/shard_parquet.py b/rationai/mlkit/data/shard_parquet.py index 39c1f6c..1810862 100644 --- a/rationai/mlkit/data/shard_parquet.py +++ b/rationai/mlkit/data/shard_parquet.py @@ -16,26 +16,26 @@ def shard_parquet( ) -> None: """Splits a large Parquet file into smaller Parquet files (shards). - This function reads a single Parquet file in memory-efficient batches and writes - it out into multiple smaller files. Each output file will contain exactly + This function reads a single Parquet file in memory-efficient batches and writes + it out into multiple smaller files. Each output file will contain exactly `rows_per_shard` rows, except potentially the final shard. Args: input_file (str | Path): The path to the source Parquet file. output_dir (str | Path): The directory where the output shards will be saved. - rows_per_shard (int, optional): The target number of rows per shard. + rows_per_shard (int, optional): The target number of rows per shard. Defaults to 100,000. - row_group_size (int, optional): The number of rows to read/write per batch. + row_group_size (int, optional): The number of rows to read/write per batch. Defaults to 5,000. Raises: - AssertionError: If `rows_per_shard` or `row_group_size` are not strictly positive, + AssertionError: If `rows_per_shard` or `row_group_size` are not strictly positive, or if `rows_per_shard` is not perfectly divisible by `row_group_size`. """ # --- Input Validation --- assert rows_per_shard > 0, "rows_per_shard must be greater than 0" assert row_group_size > 0, "row_group_size must be greater than 0" - + # Ensure exact chunks can be written without remainder assert rows_per_shard % row_group_size == 0, ( "rows_per_shard must be divisible by row_group_size" @@ -43,7 +43,7 @@ def shard_parquet( # --- Setup Output Directory --- output_dir = Path(output_dir) - + # Create the target directory and any intermediate directories if they don't exist. # exist_ok=True prevents crashes if you run the script multiple times. output_dir.mkdir(parents=True, exist_ok=True) @@ -54,14 +54,15 @@ def shard_parquet( _logger.info(f"Total rows in source: {parquet_file.metadata.num_rows}") # Initialize tracking variables - shard_idx = 0 # Tracks the current shard file number (e.g., 00000) - current_shard_rows = 0 # Tracks how many rows have been written to the current shard - writer = None # Holds the active pyarrow ParquetWriter instance + shard_idx = 0 # Tracks the current shard file number (e.g., 00000) + current_shard_rows = ( + 0 # Tracks how many rows have been written to the current shard + ) + writer = None # Holds the active pyarrow ParquetWriter instance try: # Iterate through the source file in memory-efficient chunks (batches) for batch in parquet_file.iter_batches(batch_size=row_group_size): - # If we don't have an active writer, create a new one for the current shard if writer is None: out_path = output_dir / f"shard_{shard_idx:05d}.parquet" @@ -76,20 +77,20 @@ def shard_parquet( # Finalize the current file writer.close() writer = None # Reset the writer so a new one spawns on the next loop iteration - + _logger.info(f"Finished writing shard {shard_idx:05d}") - + # Prepare counters for the next shard shard_idx += 1 - current_shard_rows = 0 + current_shard_rows = 0 # After the loop ends, check if there's a partially filled final shard left open if writer is not None: _logger.info(f"Finished writing final shard {shard_idx:05d}") - + finally: # Ensure the active writer is properly closed even if an unexpected error occurs if writer is not None: writer.close() - _logger.info("Sharding complete!") \ No newline at end of file + _logger.info("Sharding complete!") From 47614bbefff20c3720c8b62f95e009699f205c42 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20Pek=C3=A1r?= <492788@mail.muni.cz> Date: Mon, 13 Apr 2026 21:03:46 +0000 Subject: [PATCH 5/6] feat: init --- rationai/mlkit/data/__init__.py | 4 ++++ 1 file changed, 4 insertions(+) create mode 100644 rationai/mlkit/data/__init__.py diff --git a/rationai/mlkit/data/__init__.py b/rationai/mlkit/data/__init__.py new file mode 100644 index 0000000..86b308d --- /dev/null +++ b/rationai/mlkit/data/__init__.py @@ -0,0 +1,4 @@ +from rationai.mlkit.data.shard_parquet import shard_parquet + + +__all__ = ["shard_parquet"] From 55cff1a53d844ad9178aa389c7688b81fc31ba42 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20Pek=C3=A1r?= <492788@mail.muni.cz> Date: Tue, 14 Apr 2026 08:59:08 +0000 Subject: [PATCH 6/6] feat: fix comments --- rationai/mlkit/data/shard_parquet.py | 23 ++++++----------------- 1 file changed, 6 insertions(+), 17 deletions(-) diff --git a/rationai/mlkit/data/shard_parquet.py b/rationai/mlkit/data/shard_parquet.py index 1810862..95d2654 100644 --- a/rationai/mlkit/data/shard_parquet.py +++ b/rationai/mlkit/data/shard_parquet.py @@ -4,7 +4,6 @@ import pyarrow.parquet as pq -# Initialize a module-level logger _logger = logging.getLogger(__name__) @@ -43,53 +42,43 @@ def shard_parquet( # --- Setup Output Directory --- output_dir = Path(output_dir) - - # Create the target directory and any intermediate directories if they don't exist. - # exist_ok=True prevents crashes if you run the script multiple times. output_dir.mkdir(parents=True, exist_ok=True) # --- Read and Shard Process --- - # Open the Parquet file as a context manager so it safely closes when done with pq.ParquetFile(input_file) as parquet_file: _logger.info(f"Total rows in source: {parquet_file.metadata.num_rows}") # Initialize tracking variables - shard_idx = 0 # Tracks the current shard file number (e.g., 00000) - current_shard_rows = ( - 0 # Tracks how many rows have been written to the current shard - ) - writer = None # Holds the active pyarrow ParquetWriter instance + shard_idx = 0 + current_shard_rows = 0 + writer = None try: # Iterate through the source file in memory-efficient chunks (batches) for batch in parquet_file.iter_batches(batch_size=row_group_size): - # If we don't have an active writer, create a new one for the current shard if writer is None: out_path = output_dir / f"shard_{shard_idx:05d}.parquet" writer = pq.ParquetWriter(out_path, batch.schema) - # Write the current batch and update the running row count + # Write the current batch writer.write_batch(batch) current_shard_rows += batch.num_rows # Check if the current shard has reached its maximum capacity if current_shard_rows >= rows_per_shard: - # Finalize the current file writer.close() - writer = None # Reset the writer so a new one spawns on the next loop iteration + writer = None _logger.info(f"Finished writing shard {shard_idx:05d}") - # Prepare counters for the next shard shard_idx += 1 current_shard_rows = 0 - # After the loop ends, check if there's a partially filled final shard left open if writer is not None: _logger.info(f"Finished writing final shard {shard_idx:05d}") finally: - # Ensure the active writer is properly closed even if an unexpected error occurs + # Ensure the active writer is properly closed if writer is not None: writer.close()