From a4e08f90915441bb33ffeec02ddbb95a5b55000a Mon Sep 17 00:00:00 2001 From: Nicolas Vandeginste Date: Wed, 22 Apr 2026 17:46:22 +0200 Subject: [PATCH] Fix partition overwrite predicate recursion --- pyiceberg/table/__init__.py | 26 ++++++++++++++++++-------- tests/table/test_init.py | 18 ++++++++++++++++++ 2 files changed, 36 insertions(+), 8 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index bb8765b651..b5fefd8842 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -368,19 +368,29 @@ def _build_partition_predicate( """ partition_fields = [schema.find_field(field.source_id).name for field in spec.fields] - expr: BooleanExpression = AlwaysFalse() + match_partition_expressions = [] for partition_record in partition_records: - match_partition_expression: BooleanExpression = AlwaysTrue() - - for pos, partition_field in enumerate(partition_fields): - predicate = ( + predicates = [ + ( EqualTo(Reference(partition_field), partition_record[pos]) if partition_record[pos] is not None else IsNull(Reference(partition_field)) ) - match_partition_expression = And(match_partition_expression, predicate) - expr = Or(expr, match_partition_expression) - return expr + for pos, partition_field in enumerate(partition_fields) + ] + + if not predicates: + match_partition_expressions.append(AlwaysTrue()) + elif len(predicates) == 1: + match_partition_expressions.append(predicates[0]) + else: + match_partition_expressions.append(And(*predicates)) + + if not match_partition_expressions: + return AlwaysFalse() + if len(match_partition_expressions) == 1: + return match_partition_expressions[0] + return Or(*match_partition_expressions) def _append_snapshot_producer( self, snapshot_properties: dict[str, str], branch: str | None = MAIN_BRANCH diff --git a/tests/table/test_init.py b/tests/table/test_init.py index 30c4a3a45a..717b006a22 100644 --- a/tests/table/test_init.py +++ b/tests/table/test_init.py @@ -32,6 +32,7 @@ EqualTo, In, ) +from pyiceberg.expressions.visitors import bind from pyiceberg.io import PY_IO_IMPL, load_file_io from pyiceberg.partitioning import PartitionField, PartitionSpec from pyiceberg.schema import Schema @@ -90,6 +91,7 @@ BucketTransform, IdentityTransform, ) +from pyiceberg.typedef import Record from pyiceberg.types import ( BinaryType, BooleanType, @@ -788,6 +790,22 @@ def test_apply_add_schema_update(table_v2: Table) -> None: assert test_context.is_added_schema(2) +def test_build_partition_predicate_binds_many_partitions_without_recursion(table_v2: Table) -> None: + schema = Schema(NestedField(field_id=1, name="date", field_type=StringType(), required=False)) + partition_spec = PartitionSpec( + PartitionField(source_id=1, field_id=1000, transform=IdentityTransform(), name="date"), + ) + partition_records = {Record(f"2026-02-{partition_idx:04d}T00") for partition_idx in range(512)} + + predicate = table_v2.transaction()._build_partition_predicate( # pylint: disable=W0212 + partition_records=partition_records, + spec=partition_spec, + schema=schema, + ) + + assert bind(schema, predicate, case_sensitive=True) + + def test_update_metadata_table_schema(table_v2: Table) -> None: transaction = table_v2.transaction() update = transaction.update_schema()