perf: avoid FFI import/export between native subtree and ShuffleWriter#4507
Conversation
# Conflicts: # spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
|
Ran TPC-H SF1000 with Comet on Spark 3.5.8:
Looks to be a decent improvement. |
|
TPC-DS SF1000 is a little over 6% faster as well.
|
| /// nullability to `actual.nullable || expected.nullable`. See | ||
| /// <https://github.com/apache/datafusion-comet/issues/4515>. | ||
| #[derive(Debug)] | ||
| pub struct SchemaAlignExec { |
There was a problem hiding this comment.
@mbutrovich WDYT about adding some unit tests for SchemaAlignExec?
There was a problem hiding this comment.
I would love for this class to be short-lived and eventually be gone, but yeah I can add some tests in the meantime.
There was a problem hiding this comment.
Added Rust unit tests for SchemaAlignExec in schema_align.rs, one per concrete drift tracked in #4515:
aligns_width_bucket_int32_to_int64- top-levelInt32->Int64cast, values preserved.aligns_date_trunc_timestamp_timezone-Timestamp(us)->Timestamp(us, "UTC"), instants unchanged.aligns_collect_set_list_element_nullability- the interesting one:List(nullable Int32)->List(non-null Int32). This narrows the inner element nullability, and I wasn't sure arrow's list cast would do that rather than fail the re-stamp on DataType inequality. It does. The test confirms the cast reshapes the inner field and the batch re-stamps cleanly, so the operator genuinely covers the nestedcollect_setcase, not just the top-level ones.
All three pass. The module doc-comment also records the intent: each test is anchored to a drift pair, so when a function's upstream return type is corrected the pair collapses, try_new_or_passthrough returns the child unwrapped, and the matching test flips to a passthrough assertion. That is the signal the workaround for that function can be removed, which lines up with wanting this class to eventually go away.
andygrove
left a comment
There was a problem hiding this comment.
Thanks @mbutrovich. I've been wanting to see this optimization for a while now. Left a comment about adding unit tests but will leave that up to you.
andygrove
left a comment
There was a problem hiding this comment.
Thanks for adding the tests!




Which issue does this PR close?
Closes #3925. Replaces #3930.
Rationale for this change
When
CometShuffleExchangeExechas aCometNativeExecchild, the current code runs twoCometExecIterators per partition: one for the upstream subtree, one forScan("ShuffleWriterInput") -> ShuffleWriterconsuming the FFI'd batches back. The JVM doesn't read the data, so the round-trip is pure overhead.This PR collapses the two iterators into one
CometExecIteratorrooted atShuffleWriter(child = childNativeOp): no JVM intermediary, no Arrow FFI export/import, one fewercreatePlan/releasePlanpair per partition. The proto already supported this nested form.Although the Arrow C Data Interface is zero-copy, this native-to-JVM-and-back round-trip was not. The old
Scan("ShuffleWriterInput")leftarrow_ffi_safeat its proto default offalse, so on importScanExecrancopy_arrayon every batch: an unconditional deep copy into freshly-sized buffers. Collapsing the iterators removes that per-batch copy.Why this replaces #3930
#3930 also removed the FFI but kept the two-iterator structure, passing batches between them through a JVM-side
u64-keyedMutex<HashMap<u64, RecordBatch>>. This PR achieves the same result without the global registry, the batch-handle protocol, or the second iterator: the child plan is just a child operator in the same DataFusion plan as the writer.What changes are included in this PR?
CometNativeExecexposesNativeExecContext(buildNativeContext()/executeColumnarWithContext(ctx)) so the shuffle path reusesdoExecuteColumnar's per-partition setup (broadcast alignment, plan-data injection, subqueries, encryption).CometNativeShuffleInputRDDis a thin scheduling-anchor RDD that resolves leaf partitions on the driver and carries per-partition leaf/shuffle-block iterators.NativeShuffleSpec(child native op, metric node, exec context) is carried byCometShuffleDependencyand only populated on native-child shuffles.CometShuffleExchangeExecdispatches: native-child shuffles useprepareNativeShuffleDependencyand the thin RDD; everything else uses the existing 5-argprepareShuffleDependencywith a synthesizedScan("ShuffleWriterInput")placeholder. Behavior unchanged forCometSparkToColumnarExec,CometCollectLimitExec,CometTakeOrderedAndProjectExec.CometNativeShuffleWriterrewritten to one code path overShuffleWriter(child = spec.childNativeOp).CometNativeShuffleWritercallsreportScanInputMetricsso a native scan now executing inside the writer's plan still populates the task'sinputMetrics(bytesRead/recordsRead). The split-driver flow reported this from the scan's ownCometExecRDD.compute, which no longer runs once the scan is inlined.Removing the FFI boundary surfaces four issues the old deep-copy +
ScanExeccast was masking:copy_arraycompacted every batch into tightly-sized buffers, so the writer'sdata_sizemetric and spill reservation (both fromRecordBatch::get_array_memory_size) were accurate. With the child inlined, the writer sees the native subtree's buffers directly: a partialHashAggregateemits one group-values buffer sliced into batch_size chunks, andget_array_memory_sizecharges that buffer's capacity once per slice, overstating resident memory by the chunk count (TPC-H q17 reported a 91.2 TiBdata sizevs 177.9 GiB on main, with tens of thousands of spurious spills). The writer now charges each distinct backing allocation once viacount_new_buffers. The sum ofget_slice_memory_sizeis not a substitute: a slice pins its whole backing buffer, which the group-valuesVecrounds up to the next power of two, so live-row size under-counts resident memory.width_bucket,date_trunc,collect_set, ...). NewSchemaAlignExecbetween the inlined child and the writer casts per column and widens nullability toactual.nullable || expected.nullable. Each distinct drift logs one process-deduped warning pointing at DataFusion / DataFusion-Spark functions whose Arrow return type drifts from Spark catalyst's declared type #4515. NewShuffleWriter.expected_output_schemaproto field.HashAggregateExec.resultExpressions(EXISTS / row-existence-only subqueries) where the JVM-declaredoutput=[]disagrees with the native aggregate's natural shape. Fixed inCometBaseAggregate.doConvert: when natural shape differs from declared shape, emit an explicitProjectionproto op above the HashAgg. NativeOpStruct::HashAggalways returns its natural shape.HashAggregate.result_exprs(field 3) andapply_result_projection(field 8) reserved.ShuffleWriterExec. With the writer now the root operator for any stage that ends in a shuffle, an upstreamDataFusionError::External(SparkError)(e.g. decimal overflow in ANSI mode) was being re-wrapped byexternal_shuffle'sArrowError::ExternalErroradapter, hiding the typed exception from the JNI bridge's single-level downcast. Drop the wrap;try_flattenalready handles the matchingDataFusionErrortypes directly.How are these changes tested?
Existing tests, plus:
CometAggregateSuiteregression test for the catalyst-pruned aggregate path. The original Spark SQL test that surfaced the aggregate bug (subquery/exists-subquery/exists-orderby-limit.sqlquery 19) is also covered by the SQL-tests harness CI.multi_partitionRust test (shuffle_partitioner_charges_shared_buffer_once): a batch sliced into chunks that share one backing buffer is charged once, so it stays under the memory limit and reports adata_sizethat tracks the buffer rather than the chunk count.CometTaskMetricsSuitetest (native shuffle reports task input metrics for its scan child): a native shuffle over aCometNativeScanExecpopulates the ShuffleMapTask'sinputMetrics. It fails (zerorecordsRead) without thereportScanInputMetricscall.