Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 18 additions & 8 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -368,19 +368,29 @@ def _build_partition_predicate(
"""
partition_fields = [schema.find_field(field.source_id).name for field in spec.fields]

expr: BooleanExpression = AlwaysFalse()
match_partition_expressions = []
for partition_record in partition_records:
match_partition_expression: BooleanExpression = AlwaysTrue()

for pos, partition_field in enumerate(partition_fields):
predicate = (
predicates = [
(
EqualTo(Reference(partition_field), partition_record[pos])
if partition_record[pos] is not None
else IsNull(Reference(partition_field))
)
match_partition_expression = And(match_partition_expression, predicate)
expr = Or(expr, match_partition_expression)
return expr
for pos, partition_field in enumerate(partition_fields)
]

if not predicates:
match_partition_expressions.append(AlwaysTrue())
elif len(predicates) == 1:
match_partition_expressions.append(predicates[0])
else:
match_partition_expressions.append(And(*predicates))

if not match_partition_expressions:
return AlwaysFalse()
if len(match_partition_expressions) == 1:
return match_partition_expressions[0]
return Or(*match_partition_expressions)

def _append_snapshot_producer(
self, snapshot_properties: dict[str, str], branch: str | None = MAIN_BRANCH
Expand Down
18 changes: 18 additions & 0 deletions tests/table/test_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
EqualTo,
In,
)
from pyiceberg.expressions.visitors import bind
from pyiceberg.io import PY_IO_IMPL, load_file_io
from pyiceberg.partitioning import PartitionField, PartitionSpec
from pyiceberg.schema import Schema
Expand Down Expand Up @@ -90,6 +91,7 @@
BucketTransform,
IdentityTransform,
)
from pyiceberg.typedef import Record
from pyiceberg.types import (
BinaryType,
BooleanType,
Expand Down Expand Up @@ -788,6 +790,22 @@ def test_apply_add_schema_update(table_v2: Table) -> None:
assert test_context.is_added_schema(2)


def test_build_partition_predicate_binds_many_partitions_without_recursion(table_v2: Table) -> None:
schema = Schema(NestedField(field_id=1, name="date", field_type=StringType(), required=False))
partition_spec = PartitionSpec(
PartitionField(source_id=1, field_id=1000, transform=IdentityTransform(), name="date"),
)
partition_records = {Record(f"2026-02-{partition_idx:04d}T00") for partition_idx in range(512)}

predicate = table_v2.transaction()._build_partition_predicate( # pylint: disable=W0212
partition_records=partition_records,
spec=partition_spec,
schema=schema,
)

assert bind(schema, predicate, case_sensitive=True)


def test_update_metadata_table_schema(table_v2: Table) -> None:
transaction = table_v2.transaction()
update = transaction.update_schema()
Expand Down
Loading