diff --git a/fuzz/src/array/mod.rs b/fuzz/src/array/mod.rs index 90943085d1d..0b101b91d8f 100644 --- a/fuzz/src/array/mod.rs +++ b/fuzz/src/array/mod.rs @@ -59,7 +59,11 @@ use vortex_array::scalar_fn::fns::operators::Operator; use vortex_array::search_sorted::SearchResult; use vortex_array::search_sorted::SearchSorted; use vortex_array::search_sorted::SearchSortedSide; -use vortex_btrblocks::{BtrBlocksCompressor, BtrBlocksCompressorBuilder, FloatCode, IntCode, StringCode}; +use vortex_btrblocks::BtrBlocksCompressor; +use vortex_btrblocks::BtrBlocksCompressorBuilder; +use vortex_btrblocks::FloatCode; +use vortex_btrblocks::IntCode; +use vortex_btrblocks::StringCode; use vortex_error::VortexExpect; use vortex_error::vortex_panic; use vortex_mask::Mask; diff --git a/vortex-array/public-api.lock b/vortex-array/public-api.lock index 8a973d9503e..b9f15b58729 100644 --- a/vortex-array/public-api.lock +++ b/vortex-array/public-api.lock @@ -1082,6 +1082,10 @@ impl vortex_array::arrays::filter::FilterKernel for vortex_array::arrays::Chunke pub fn vortex_array::arrays::Chunked::filter(array: &vortex_array::arrays::ChunkedArray, mask: &vortex_mask::Mask, _ctx: &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult> +impl vortex_array::arrays::reversed::ReverseReduce for vortex_array::arrays::Chunked + +pub fn vortex_array::arrays::Chunked::reverse(array: &vortex_array::arrays::ChunkedArray) -> vortex_error::VortexResult> + impl vortex_array::arrays::slice::SliceKernel for vortex_array::arrays::Chunked pub fn vortex_array::arrays::Chunked::slice(array: &Self::Array, range: core::ops::range::Range, _ctx: &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult> @@ -1682,6 +1686,10 @@ impl vortex_array::arrays::filter::FilterReduce for vortex_array::arrays::dict:: pub fn vortex_array::arrays::dict::Dict::filter(array: &vortex_array::arrays::dict::DictArray, mask: &vortex_mask::Mask) -> vortex_error::VortexResult> +impl vortex_array::arrays::reversed::ReverseReduce for vortex_array::arrays::dict::Dict + +pub fn vortex_array::arrays::dict::Dict::reverse(array: &vortex_array::arrays::dict::DictArray) -> vortex_error::VortexResult> + impl vortex_array::arrays::slice::SliceReduce for vortex_array::arrays::dict::Dict pub fn vortex_array::arrays::dict::Dict::slice(array: &Self::Array, range: core::ops::range::Range) -> vortex_error::VortexResult> @@ -1792,6 +1800,10 @@ impl vortex_array::arrays::filter::FilterReduce for vortex_array::arrays::dict:: pub fn vortex_array::arrays::dict::Dict::filter(array: &vortex_array::arrays::dict::DictArray, mask: &vortex_mask::Mask) -> vortex_error::VortexResult> +impl vortex_array::arrays::reversed::ReverseReduce for vortex_array::arrays::dict::Dict + +pub fn vortex_array::arrays::dict::Dict::reverse(array: &vortex_array::arrays::dict::DictArray) -> vortex_error::VortexResult> + impl vortex_array::arrays::slice::SliceReduce for vortex_array::arrays::dict::Dict pub fn vortex_array::arrays::dict::Dict::slice(array: &Self::Array, range: core::ops::range::Range) -> vortex_error::VortexResult> @@ -3570,6 +3582,160 @@ pub fn vortex_array::arrays::primitive::chunk_range(chunk_idx: usize, offset: us pub fn vortex_array::arrays::primitive::patch_chunk(decoded_values: &mut [T], patches_indices: &[I], patches_values: &[T], patches_offset: usize, chunk_offsets_slice: &[C], chunk_idx: usize, offset_within_chunk: usize) where T: vortex_array::dtype::NativePType, I: vortex_array::dtype::UnsignedPType, C: vortex_array::dtype::UnsignedPType +pub mod vortex_array::arrays::reversed + +pub struct vortex_array::arrays::reversed::ReverseReduceAdaptor(pub V) + +impl core::default::Default for vortex_array::arrays::reversed::ReverseReduceAdaptor + +pub fn vortex_array::arrays::reversed::ReverseReduceAdaptor::default() -> vortex_array::arrays::reversed::ReverseReduceAdaptor + +impl core::fmt::Debug for vortex_array::arrays::reversed::ReverseReduceAdaptor + +pub fn vortex_array::arrays::reversed::ReverseReduceAdaptor::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result + +impl vortex_array::optimizer::rules::ArrayParentReduceRule for vortex_array::arrays::reversed::ReverseReduceAdaptor + +pub type vortex_array::arrays::reversed::ReverseReduceAdaptor::Parent = vortex_array::arrays::Reversed + +pub fn vortex_array::arrays::reversed::ReverseReduceAdaptor::reduce_parent(&self, array: &::Array, _parent: ::Match, child_idx: usize) -> vortex_error::VortexResult> + +pub struct vortex_array::arrays::reversed::Reversed + +impl vortex_array::arrays::Reversed + +pub const vortex_array::arrays::Reversed::ID: vortex_array::vtable::ArrayId + +impl core::clone::Clone for vortex_array::arrays::Reversed + +pub fn vortex_array::arrays::Reversed::clone(&self) -> vortex_array::arrays::Reversed + +impl core::fmt::Debug for vortex_array::arrays::Reversed + +pub fn vortex_array::arrays::Reversed::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result + +impl vortex_array::vtable::OperationsVTable for vortex_array::arrays::Reversed + +pub fn vortex_array::arrays::Reversed::scalar_at(array: &vortex_array::arrays::ReversedArray, index: usize) -> vortex_error::VortexResult + +impl vortex_array::vtable::VTable for vortex_array::arrays::Reversed + +pub type vortex_array::arrays::Reversed::Array = vortex_array::arrays::ReversedArray + +pub type vortex_array::arrays::Reversed::Metadata = vortex_array::EmptyMetadata + +pub type vortex_array::arrays::Reversed::OperationsVTable = vortex_array::arrays::Reversed + +pub type vortex_array::arrays::Reversed::ValidityVTable = vortex_array::arrays::Reversed + +pub fn vortex_array::arrays::Reversed::append_to_builder(array: &Self::Array, builder: &mut dyn vortex_array::builders::ArrayBuilder, ctx: &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult<()> + +pub fn vortex_array::arrays::Reversed::array_eq(array: &vortex_array::arrays::ReversedArray, other: &vortex_array::arrays::ReversedArray, precision: vortex_array::Precision) -> bool + +pub fn vortex_array::arrays::Reversed::array_hash(array: &vortex_array::arrays::ReversedArray, state: &mut H, precision: vortex_array::Precision) + +pub fn vortex_array::arrays::Reversed::buffer(_array: &vortex_array::arrays::ReversedArray, idx: usize) -> vortex_array::buffer::BufferHandle + +pub fn vortex_array::arrays::Reversed::buffer_name(_array: &vortex_array::arrays::ReversedArray, _idx: usize) -> core::option::Option + +pub fn vortex_array::arrays::Reversed::build(dtype: &vortex_array::dtype::DType, len: usize, _metadata: &Self::Metadata, _buffers: &[vortex_array::buffer::BufferHandle], children: &dyn vortex_array::serde::ArrayChildren) -> vortex_error::VortexResult + +pub fn vortex_array::arrays::Reversed::child(array: &vortex_array::arrays::ReversedArray, idx: usize) -> vortex_array::ArrayRef + +pub fn vortex_array::arrays::Reversed::child_name(_array: &vortex_array::arrays::ReversedArray, idx: usize) -> alloc::string::String + +pub fn vortex_array::arrays::Reversed::deserialize(_bytes: &[u8], _dtype: &vortex_array::dtype::DType, _len: usize, _buffers: &[vortex_array::buffer::BufferHandle], _session: &vortex_session::VortexSession) -> vortex_error::VortexResult + +pub fn vortex_array::arrays::Reversed::dtype(array: &vortex_array::arrays::ReversedArray) -> &vortex_array::dtype::DType + +pub fn vortex_array::arrays::Reversed::execute(array: alloc::sync::Arc, ctx: &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult + +pub fn vortex_array::arrays::Reversed::execute_parent(array: &Self::Array, parent: &vortex_array::ArrayRef, child_idx: usize, ctx: &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult> + +pub fn vortex_array::arrays::Reversed::id(&self) -> vortex_array::vtable::ArrayId + +pub fn vortex_array::arrays::Reversed::len(array: &vortex_array::arrays::ReversedArray) -> usize + +pub fn vortex_array::arrays::Reversed::metadata(_array: &vortex_array::arrays::ReversedArray) -> vortex_error::VortexResult + +pub fn vortex_array::arrays::Reversed::nbuffers(_array: &vortex_array::arrays::ReversedArray) -> usize + +pub fn vortex_array::arrays::Reversed::nchildren(_array: &vortex_array::arrays::ReversedArray) -> usize + +pub fn vortex_array::arrays::Reversed::reduce(array: &Self::Array) -> vortex_error::VortexResult> + +pub fn vortex_array::arrays::Reversed::reduce_parent(array: &Self::Array, parent: &vortex_array::ArrayRef, child_idx: usize) -> vortex_error::VortexResult> + +pub fn vortex_array::arrays::Reversed::serialize(_metadata: Self::Metadata) -> vortex_error::VortexResult>> + +pub fn vortex_array::arrays::Reversed::stats(array: &vortex_array::arrays::ReversedArray) -> vortex_array::stats::StatsSetRef<'_> + +pub fn vortex_array::arrays::Reversed::vtable(_array: &Self::Array) -> &Self + +pub fn vortex_array::arrays::Reversed::with_children(array: &mut Self::Array, children: alloc::vec::Vec) -> vortex_error::VortexResult<()> + +impl vortex_array::vtable::ValidityVTable for vortex_array::arrays::Reversed + +pub fn vortex_array::arrays::Reversed::validity(array: &vortex_array::arrays::ReversedArray) -> vortex_error::VortexResult + +pub struct vortex_array::arrays::reversed::ReversedArray + +impl vortex_array::arrays::ReversedArray + +pub fn vortex_array::arrays::ReversedArray::child(&self) -> &vortex_array::ArrayRef + +pub fn vortex_array::arrays::ReversedArray::new(child: vortex_array::ArrayRef) -> Self + +pub unsafe fn vortex_array::arrays::ReversedArray::new_unchecked(child: vortex_array::ArrayRef) -> Self + +pub fn vortex_array::arrays::ReversedArray::try_new(child: vortex_array::ArrayRef) -> vortex_error::VortexResult + +pub fn vortex_array::arrays::ReversedArray::validate(child: &vortex_array::ArrayRef, dtype: &vortex_array::dtype::DType, len: usize) -> vortex_error::VortexResult<()> + +impl vortex_array::arrays::ReversedArray + +pub fn vortex_array::arrays::ReversedArray::into_array_ref(self: alloc::sync::Arc) -> vortex_array::ArrayRef + +pub fn vortex_array::arrays::ReversedArray::to_array(&self) -> vortex_array::ArrayRef + +impl core::clone::Clone for vortex_array::arrays::ReversedArray + +pub fn vortex_array::arrays::ReversedArray::clone(&self) -> vortex_array::arrays::ReversedArray + +impl core::convert::AsRef for vortex_array::arrays::ReversedArray + +pub fn vortex_array::arrays::ReversedArray::as_ref(&self) -> &dyn vortex_array::DynArray + +impl core::convert::From for vortex_array::ArrayRef + +pub fn vortex_array::ArrayRef::from(value: vortex_array::arrays::ReversedArray) -> vortex_array::ArrayRef + +impl core::fmt::Debug for vortex_array::arrays::ReversedArray + +pub fn vortex_array::arrays::ReversedArray::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result + +impl core::ops::deref::Deref for vortex_array::arrays::ReversedArray + +pub type vortex_array::arrays::ReversedArray::Target = dyn vortex_array::DynArray + +pub fn vortex_array::arrays::ReversedArray::deref(&self) -> &Self::Target + +impl vortex_array::IntoArray for vortex_array::arrays::ReversedArray + +pub fn vortex_array::arrays::ReversedArray::into_array(self) -> vortex_array::ArrayRef + +pub trait vortex_array::arrays::reversed::ReverseReduce: vortex_array::vtable::VTable + +pub fn vortex_array::arrays::reversed::ReverseReduce::reverse(array: &Self::Array) -> vortex_error::VortexResult> + +impl vortex_array::arrays::reversed::ReverseReduce for vortex_array::arrays::Chunked + +pub fn vortex_array::arrays::Chunked::reverse(array: &vortex_array::arrays::ChunkedArray) -> vortex_error::VortexResult> + +impl vortex_array::arrays::reversed::ReverseReduce for vortex_array::arrays::dict::Dict + +pub fn vortex_array::arrays::dict::Dict::reverse(array: &vortex_array::arrays::dict::DictArray) -> vortex_error::VortexResult> + pub mod vortex_array::arrays::scalar_fn pub struct vortex_array::arrays::scalar_fn::AnyScalarFn @@ -5304,6 +5470,10 @@ impl vortex_array::arrays::filter::FilterKernel for vortex_array::arrays::Chunke pub fn vortex_array::arrays::Chunked::filter(array: &vortex_array::arrays::ChunkedArray, mask: &vortex_mask::Mask, _ctx: &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult> +impl vortex_array::arrays::reversed::ReverseReduce for vortex_array::arrays::Chunked + +pub fn vortex_array::arrays::Chunked::reverse(array: &vortex_array::arrays::ChunkedArray) -> vortex_error::VortexResult> + impl vortex_array::arrays::slice::SliceKernel for vortex_array::arrays::Chunked pub fn vortex_array::arrays::Chunked::slice(array: &Self::Array, range: core::ops::range::Range, _ctx: &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult> @@ -5806,6 +5976,10 @@ impl vortex_array::arrays::filter::FilterReduce for vortex_array::arrays::dict:: pub fn vortex_array::arrays::dict::Dict::filter(array: &vortex_array::arrays::dict::DictArray, mask: &vortex_mask::Mask) -> vortex_error::VortexResult> +impl vortex_array::arrays::reversed::ReverseReduce for vortex_array::arrays::dict::Dict + +pub fn vortex_array::arrays::dict::Dict::reverse(array: &vortex_array::arrays::dict::DictArray) -> vortex_error::VortexResult> + impl vortex_array::arrays::slice::SliceReduce for vortex_array::arrays::dict::Dict pub fn vortex_array::arrays::dict::Dict::slice(array: &Self::Array, range: core::ops::range::Range) -> vortex_error::VortexResult> @@ -7212,6 +7386,130 @@ impl vortex_array::accessor::ArrayAccessor< pub fn vortex_array::arrays::PrimitiveArray::with_iterator(&self, f: F) -> R where F: for<'a> core::ops::function::FnOnce(&mut dyn core::iter::traits::iterator::Iterator>) -> R +pub struct vortex_array::arrays::Reversed + +impl vortex_array::arrays::Reversed + +pub const vortex_array::arrays::Reversed::ID: vortex_array::vtable::ArrayId + +impl core::clone::Clone for vortex_array::arrays::Reversed + +pub fn vortex_array::arrays::Reversed::clone(&self) -> vortex_array::arrays::Reversed + +impl core::fmt::Debug for vortex_array::arrays::Reversed + +pub fn vortex_array::arrays::Reversed::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result + +impl vortex_array::vtable::OperationsVTable for vortex_array::arrays::Reversed + +pub fn vortex_array::arrays::Reversed::scalar_at(array: &vortex_array::arrays::ReversedArray, index: usize) -> vortex_error::VortexResult + +impl vortex_array::vtable::VTable for vortex_array::arrays::Reversed + +pub type vortex_array::arrays::Reversed::Array = vortex_array::arrays::ReversedArray + +pub type vortex_array::arrays::Reversed::Metadata = vortex_array::EmptyMetadata + +pub type vortex_array::arrays::Reversed::OperationsVTable = vortex_array::arrays::Reversed + +pub type vortex_array::arrays::Reversed::ValidityVTable = vortex_array::arrays::Reversed + +pub fn vortex_array::arrays::Reversed::append_to_builder(array: &Self::Array, builder: &mut dyn vortex_array::builders::ArrayBuilder, ctx: &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult<()> + +pub fn vortex_array::arrays::Reversed::array_eq(array: &vortex_array::arrays::ReversedArray, other: &vortex_array::arrays::ReversedArray, precision: vortex_array::Precision) -> bool + +pub fn vortex_array::arrays::Reversed::array_hash(array: &vortex_array::arrays::ReversedArray, state: &mut H, precision: vortex_array::Precision) + +pub fn vortex_array::arrays::Reversed::buffer(_array: &vortex_array::arrays::ReversedArray, idx: usize) -> vortex_array::buffer::BufferHandle + +pub fn vortex_array::arrays::Reversed::buffer_name(_array: &vortex_array::arrays::ReversedArray, _idx: usize) -> core::option::Option + +pub fn vortex_array::arrays::Reversed::build(dtype: &vortex_array::dtype::DType, len: usize, _metadata: &Self::Metadata, _buffers: &[vortex_array::buffer::BufferHandle], children: &dyn vortex_array::serde::ArrayChildren) -> vortex_error::VortexResult + +pub fn vortex_array::arrays::Reversed::child(array: &vortex_array::arrays::ReversedArray, idx: usize) -> vortex_array::ArrayRef + +pub fn vortex_array::arrays::Reversed::child_name(_array: &vortex_array::arrays::ReversedArray, idx: usize) -> alloc::string::String + +pub fn vortex_array::arrays::Reversed::deserialize(_bytes: &[u8], _dtype: &vortex_array::dtype::DType, _len: usize, _buffers: &[vortex_array::buffer::BufferHandle], _session: &vortex_session::VortexSession) -> vortex_error::VortexResult + +pub fn vortex_array::arrays::Reversed::dtype(array: &vortex_array::arrays::ReversedArray) -> &vortex_array::dtype::DType + +pub fn vortex_array::arrays::Reversed::execute(array: alloc::sync::Arc, ctx: &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult + +pub fn vortex_array::arrays::Reversed::execute_parent(array: &Self::Array, parent: &vortex_array::ArrayRef, child_idx: usize, ctx: &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult> + +pub fn vortex_array::arrays::Reversed::id(&self) -> vortex_array::vtable::ArrayId + +pub fn vortex_array::arrays::Reversed::len(array: &vortex_array::arrays::ReversedArray) -> usize + +pub fn vortex_array::arrays::Reversed::metadata(_array: &vortex_array::arrays::ReversedArray) -> vortex_error::VortexResult + +pub fn vortex_array::arrays::Reversed::nbuffers(_array: &vortex_array::arrays::ReversedArray) -> usize + +pub fn vortex_array::arrays::Reversed::nchildren(_array: &vortex_array::arrays::ReversedArray) -> usize + +pub fn vortex_array::arrays::Reversed::reduce(array: &Self::Array) -> vortex_error::VortexResult> + +pub fn vortex_array::arrays::Reversed::reduce_parent(array: &Self::Array, parent: &vortex_array::ArrayRef, child_idx: usize) -> vortex_error::VortexResult> + +pub fn vortex_array::arrays::Reversed::serialize(_metadata: Self::Metadata) -> vortex_error::VortexResult>> + +pub fn vortex_array::arrays::Reversed::stats(array: &vortex_array::arrays::ReversedArray) -> vortex_array::stats::StatsSetRef<'_> + +pub fn vortex_array::arrays::Reversed::vtable(_array: &Self::Array) -> &Self + +pub fn vortex_array::arrays::Reversed::with_children(array: &mut Self::Array, children: alloc::vec::Vec) -> vortex_error::VortexResult<()> + +impl vortex_array::vtable::ValidityVTable for vortex_array::arrays::Reversed + +pub fn vortex_array::arrays::Reversed::validity(array: &vortex_array::arrays::ReversedArray) -> vortex_error::VortexResult + +pub struct vortex_array::arrays::ReversedArray + +impl vortex_array::arrays::ReversedArray + +pub fn vortex_array::arrays::ReversedArray::child(&self) -> &vortex_array::ArrayRef + +pub fn vortex_array::arrays::ReversedArray::new(child: vortex_array::ArrayRef) -> Self + +pub unsafe fn vortex_array::arrays::ReversedArray::new_unchecked(child: vortex_array::ArrayRef) -> Self + +pub fn vortex_array::arrays::ReversedArray::try_new(child: vortex_array::ArrayRef) -> vortex_error::VortexResult + +pub fn vortex_array::arrays::ReversedArray::validate(child: &vortex_array::ArrayRef, dtype: &vortex_array::dtype::DType, len: usize) -> vortex_error::VortexResult<()> + +impl vortex_array::arrays::ReversedArray + +pub fn vortex_array::arrays::ReversedArray::into_array_ref(self: alloc::sync::Arc) -> vortex_array::ArrayRef + +pub fn vortex_array::arrays::ReversedArray::to_array(&self) -> vortex_array::ArrayRef + +impl core::clone::Clone for vortex_array::arrays::ReversedArray + +pub fn vortex_array::arrays::ReversedArray::clone(&self) -> vortex_array::arrays::ReversedArray + +impl core::convert::AsRef for vortex_array::arrays::ReversedArray + +pub fn vortex_array::arrays::ReversedArray::as_ref(&self) -> &dyn vortex_array::DynArray + +impl core::convert::From for vortex_array::ArrayRef + +pub fn vortex_array::ArrayRef::from(value: vortex_array::arrays::ReversedArray) -> vortex_array::ArrayRef + +impl core::fmt::Debug for vortex_array::arrays::ReversedArray + +pub fn vortex_array::arrays::ReversedArray::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result + +impl core::ops::deref::Deref for vortex_array::arrays::ReversedArray + +pub type vortex_array::arrays::ReversedArray::Target = dyn vortex_array::DynArray + +pub fn vortex_array::arrays::ReversedArray::deref(&self) -> &Self::Target + +impl vortex_array::IntoArray for vortex_array::arrays::ReversedArray + +pub fn vortex_array::arrays::ReversedArray::into_array(self) -> vortex_array::ArrayRef + pub struct vortex_array::arrays::ScalarFnArray impl vortex_array::arrays::scalar_fn::ScalarFnArray @@ -14968,6 +15266,12 @@ pub type vortex_array::arrays::primitive::PrimitiveMaskedValidityRule::Parent = pub fn vortex_array::arrays::primitive::PrimitiveMaskedValidityRule::reduce_parent(&self, array: &vortex_array::arrays::PrimitiveArray, parent: &vortex_array::arrays::MaskedArray, _child_idx: usize) -> vortex_error::VortexResult> +impl vortex_array::optimizer::rules::ArrayParentReduceRule for vortex_array::arrays::reversed::ReverseReduceAdaptor + +pub type vortex_array::arrays::reversed::ReverseReduceAdaptor::Parent = vortex_array::arrays::Reversed + +pub fn vortex_array::arrays::reversed::ReverseReduceAdaptor::reduce_parent(&self, array: &::Array, _parent: ::Match, child_idx: usize) -> vortex_error::VortexResult> + impl vortex_array::optimizer::rules::ArrayParentReduceRule for vortex_array::arrays::dict::TakeReduceAdaptor where V: vortex_array::arrays::dict::TakeReduce pub type vortex_array::arrays::dict::TakeReduceAdaptor::Parent = vortex_array::arrays::dict::Dict @@ -20780,6 +21084,10 @@ impl vortex_array::vtable::OperationsVTable for pub fn vortex_array::arrays::Primitive::scalar_at(array: &vortex_array::arrays::PrimitiveArray, index: usize) -> vortex_error::VortexResult +impl vortex_array::vtable::OperationsVTable for vortex_array::arrays::Reversed + +pub fn vortex_array::arrays::Reversed::scalar_at(array: &vortex_array::arrays::ReversedArray, index: usize) -> vortex_error::VortexResult + impl vortex_array::vtable::OperationsVTable for vortex_array::arrays::Shared pub fn vortex_array::arrays::Shared::scalar_at(array: &vortex_array::arrays::SharedArray, index: usize) -> vortex_error::VortexResult @@ -21492,6 +21800,62 @@ pub fn vortex_array::arrays::Primitive::vtable(_array: &Self::Array) -> &Self pub fn vortex_array::arrays::Primitive::with_children(array: &mut Self::Array, children: alloc::vec::Vec) -> vortex_error::VortexResult<()> +impl vortex_array::vtable::VTable for vortex_array::arrays::Reversed + +pub type vortex_array::arrays::Reversed::Array = vortex_array::arrays::ReversedArray + +pub type vortex_array::arrays::Reversed::Metadata = vortex_array::EmptyMetadata + +pub type vortex_array::arrays::Reversed::OperationsVTable = vortex_array::arrays::Reversed + +pub type vortex_array::arrays::Reversed::ValidityVTable = vortex_array::arrays::Reversed + +pub fn vortex_array::arrays::Reversed::append_to_builder(array: &Self::Array, builder: &mut dyn vortex_array::builders::ArrayBuilder, ctx: &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult<()> + +pub fn vortex_array::arrays::Reversed::array_eq(array: &vortex_array::arrays::ReversedArray, other: &vortex_array::arrays::ReversedArray, precision: vortex_array::Precision) -> bool + +pub fn vortex_array::arrays::Reversed::array_hash(array: &vortex_array::arrays::ReversedArray, state: &mut H, precision: vortex_array::Precision) + +pub fn vortex_array::arrays::Reversed::buffer(_array: &vortex_array::arrays::ReversedArray, idx: usize) -> vortex_array::buffer::BufferHandle + +pub fn vortex_array::arrays::Reversed::buffer_name(_array: &vortex_array::arrays::ReversedArray, _idx: usize) -> core::option::Option + +pub fn vortex_array::arrays::Reversed::build(dtype: &vortex_array::dtype::DType, len: usize, _metadata: &Self::Metadata, _buffers: &[vortex_array::buffer::BufferHandle], children: &dyn vortex_array::serde::ArrayChildren) -> vortex_error::VortexResult + +pub fn vortex_array::arrays::Reversed::child(array: &vortex_array::arrays::ReversedArray, idx: usize) -> vortex_array::ArrayRef + +pub fn vortex_array::arrays::Reversed::child_name(_array: &vortex_array::arrays::ReversedArray, idx: usize) -> alloc::string::String + +pub fn vortex_array::arrays::Reversed::deserialize(_bytes: &[u8], _dtype: &vortex_array::dtype::DType, _len: usize, _buffers: &[vortex_array::buffer::BufferHandle], _session: &vortex_session::VortexSession) -> vortex_error::VortexResult + +pub fn vortex_array::arrays::Reversed::dtype(array: &vortex_array::arrays::ReversedArray) -> &vortex_array::dtype::DType + +pub fn vortex_array::arrays::Reversed::execute(array: alloc::sync::Arc, ctx: &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult + +pub fn vortex_array::arrays::Reversed::execute_parent(array: &Self::Array, parent: &vortex_array::ArrayRef, child_idx: usize, ctx: &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult> + +pub fn vortex_array::arrays::Reversed::id(&self) -> vortex_array::vtable::ArrayId + +pub fn vortex_array::arrays::Reversed::len(array: &vortex_array::arrays::ReversedArray) -> usize + +pub fn vortex_array::arrays::Reversed::metadata(_array: &vortex_array::arrays::ReversedArray) -> vortex_error::VortexResult + +pub fn vortex_array::arrays::Reversed::nbuffers(_array: &vortex_array::arrays::ReversedArray) -> usize + +pub fn vortex_array::arrays::Reversed::nchildren(_array: &vortex_array::arrays::ReversedArray) -> usize + +pub fn vortex_array::arrays::Reversed::reduce(array: &Self::Array) -> vortex_error::VortexResult> + +pub fn vortex_array::arrays::Reversed::reduce_parent(array: &Self::Array, parent: &vortex_array::ArrayRef, child_idx: usize) -> vortex_error::VortexResult> + +pub fn vortex_array::arrays::Reversed::serialize(_metadata: Self::Metadata) -> vortex_error::VortexResult>> + +pub fn vortex_array::arrays::Reversed::stats(array: &vortex_array::arrays::ReversedArray) -> vortex_array::stats::StatsSetRef<'_> + +pub fn vortex_array::arrays::Reversed::vtable(_array: &Self::Array) -> &Self + +pub fn vortex_array::arrays::Reversed::with_children(array: &mut Self::Array, children: alloc::vec::Vec) -> vortex_error::VortexResult<()> + impl vortex_array::vtable::VTable for vortex_array::arrays::Shared pub type vortex_array::arrays::Shared::Array = vortex_array::arrays::SharedArray @@ -22076,6 +22440,10 @@ impl vortex_array::vtable::ValidityVTable for vort pub fn vortex_array::arrays::Filter::validity(array: &vortex_array::arrays::FilterArray) -> vortex_error::VortexResult +impl vortex_array::vtable::ValidityVTable for vortex_array::arrays::Reversed + +pub fn vortex_array::arrays::Reversed::validity(array: &vortex_array::arrays::ReversedArray) -> vortex_error::VortexResult + impl vortex_array::vtable::ValidityVTable for vortex_array::arrays::Shared pub fn vortex_array::arrays::Shared::validity(array: &vortex_array::arrays::SharedArray) -> vortex_error::VortexResult @@ -22454,6 +22822,8 @@ pub fn vortex_array::ArrayAdapter::is_valid(&self, index: usize) -> vortex_er pub fn vortex_array::ArrayAdapter::len(&self) -> usize +pub fn vortex_array::ArrayAdapter::reverse(&self) -> vortex_error::VortexResult + pub fn vortex_array::ArrayAdapter::scalar_at(&self, index: usize) -> vortex_error::VortexResult pub fn vortex_array::ArrayAdapter::slice(&self, range: core::ops::range::Range) -> vortex_error::VortexResult @@ -22860,6 +23230,8 @@ pub fn vortex_array::DynArray::is_valid(&self, index: usize) -> vortex_error::Vo pub fn vortex_array::DynArray::len(&self) -> usize +pub fn vortex_array::DynArray::reverse(&self) -> vortex_error::VortexResult + pub fn vortex_array::DynArray::scalar_at(&self, index: usize) -> vortex_error::VortexResult pub fn vortex_array::DynArray::slice(&self, range: core::ops::range::Range) -> vortex_error::VortexResult @@ -22910,6 +23282,8 @@ pub fn alloc::sync::Arc::is_valid(&self, index: usiz pub fn alloc::sync::Arc::len(&self) -> usize +pub fn alloc::sync::Arc::reverse(&self) -> vortex_error::VortexResult + pub fn alloc::sync::Arc::scalar_at(&self, index: usize) -> vortex_error::VortexResult pub fn alloc::sync::Arc::slice(&self, range: core::ops::range::Range) -> vortex_error::VortexResult @@ -22960,6 +23334,8 @@ pub fn vortex_array::ArrayAdapter::is_valid(&self, index: usize) -> vortex_er pub fn vortex_array::ArrayAdapter::len(&self) -> usize +pub fn vortex_array::ArrayAdapter::reverse(&self) -> vortex_error::VortexResult + pub fn vortex_array::ArrayAdapter::scalar_at(&self, index: usize) -> vortex_error::VortexResult pub fn vortex_array::ArrayAdapter::slice(&self, range: core::ops::range::Range) -> vortex_error::VortexResult @@ -23142,6 +23518,10 @@ impl vortex_array::IntoArray for vortex_array::arrays::PrimitiveArray pub fn vortex_array::arrays::PrimitiveArray::into_array(self) -> vortex_array::ArrayRef +impl vortex_array::IntoArray for vortex_array::arrays::ReversedArray + +pub fn vortex_array::arrays::ReversedArray::into_array(self) -> vortex_array::ArrayRef + impl vortex_array::IntoArray for vortex_array::arrays::SharedArray pub fn vortex_array::arrays::SharedArray::into_array(self) -> vortex_array::ArrayRef diff --git a/vortex-array/src/arrays/reversed/array.rs b/vortex-array/src/arrays/reversed/array.rs index bfc97c81141..1b4bf208ffb 100644 --- a/vortex-array/src/arrays/reversed/array.rs +++ b/vortex-array/src/arrays/reversed/array.rs @@ -5,9 +5,10 @@ use vortex_error::VortexExpect as _; use vortex_error::VortexResult; use vortex_error::vortex_ensure; +use crate::ArrayRef; +use crate::DynArray; use crate::dtype::DType; use crate::stats::ArrayStats; -use crate::{ArrayRef, DynArray}; #[derive(Clone, Debug)] pub struct ReversedArray { diff --git a/vortex-datafusion/public-api.lock b/vortex-datafusion/public-api.lock index ac5a03b3c42..0a883275e2c 100644 --- a/vortex-datafusion/public-api.lock +++ b/vortex-datafusion/public-api.lock @@ -190,6 +190,8 @@ pub fn vortex_datafusion::VortexSource::new(table_schema: datafusion_datasource: pub fn vortex_datafusion::VortexSource::options(&self) -> &vortex_datafusion::VortexTableOptions +pub fn vortex_datafusion::VortexSource::reversed(&self) -> bool + pub fn vortex_datafusion::VortexSource::with_expression_convertor(self, expr_convertor: alloc::sync::Arc) -> Self pub fn vortex_datafusion::VortexSource::with_file_metadata_cache(self, file_metadata_cache: alloc::sync::Arc) -> Self @@ -198,6 +200,8 @@ pub fn vortex_datafusion::VortexSource::with_options(self, opts: vortex_datafusi pub fn vortex_datafusion::VortexSource::with_projection_pushdown(self, enabled: bool) -> Self +pub fn vortex_datafusion::VortexSource::with_reversed(self, reversed: bool) -> Self + pub fn vortex_datafusion::VortexSource::with_scan_concurrency(self, scan_concurrency: usize) -> Self pub fn vortex_datafusion::VortexSource::with_segment_cache_builder(self, builder: alloc::sync::Arc) -> Self @@ -232,6 +236,8 @@ pub fn vortex_datafusion::VortexSource::try_pushdown_filters(&self, filters: all pub fn vortex_datafusion::VortexSource::try_pushdown_projection(&self, projection: &datafusion_physical_expr::projection::ProjectionExprs) -> datafusion_common::error::Result>> +pub fn vortex_datafusion::VortexSource::try_reverse_output(&self, order: &[datafusion_physical_expr_common::sort_expr::PhysicalSortExpr], eq_properties: &datafusion_physical_expr::equivalence::properties::EquivalenceProperties) -> datafusion_common::error::Result>> + pub fn vortex_datafusion::VortexSource::with_batch_size(&self, batch_size: usize) -> alloc::sync::Arc pub struct vortex_datafusion::VortexTableOptions diff --git a/vortex-datafusion/src/persistent/mod.rs b/vortex-datafusion/src/persistent/mod.rs index 558032ebd01..7c9958a3746 100644 --- a/vortex-datafusion/src/persistent/mod.rs +++ b/vortex-datafusion/src/persistent/mod.rs @@ -204,6 +204,272 @@ mod tests { Ok(()) } + /// Helper: render a physical plan for the given SQL and assert that it (does not) contain + /// a SortExec — i.e. that the source has absorbed the requested ordering. + async fn plan_tree(ctx: &TestSessionContext, sql: &str) -> anyhow::Result { + let df = ctx.session.sql(sql).await?; + let physical_plan = ctx + .session + .state() + .create_physical_plan(df.logical_plan()) + .await?; + Ok(DisplayableExecutionPlan::new(physical_plan.as_ref()) + .tree_render() + .to_string()) + } + + #[tokio::test] + async fn reverse_pushdown_swaps_file_order() -> anyhow::Result<()> { + let ctx = TestSessionContext::default(); + + // Each insert produces a separate file. ts is monotonically increasing across files, + // so DataFusion's planner accepts `ts ASC` as the table's output ordering. + ctx.session + .sql( + "CREATE EXTERNAL TABLE ts_tbl \ + (ts INT NOT NULL) \ + STORED AS vortex \ + WITH ORDER (ts ASC) \ + LOCATION '/ts/'", + ) + .await?; + + let data = [vec![1, 2, 3], vec![10, 11, 12], vec![100, 101, 102]]; + + for chunk in data { + let values = chunk + .iter() + .map(|v| v.to_string()) + .collect::>() + .join("),("); + ctx.session + .sql(&format!("INSERT INTO ts_tbl VALUES ({values})")) + .await? + .collect() + .await?; + } + + // We return Inexact, so DataFusion keeps a SortExec on top, but the source itself + // is reversed (advertised via fmt_extra). The TopK below the SortPreservingMergeExec + // also confirms LIMIT pushed down through the new plan. + let tree = plan_tree(&ctx, "SELECT ts FROM ts_tbl ORDER BY ts DESC LIMIT 2").await?; + assert!( + tree.contains("reversed: true"), + "expected the source to be reversed, plan was:\n{tree}", + ); + assert!( + tree.contains("SortExec(TopK)"), + "expected a TopK sort after reverse pushdown, plan was:\n{tree}", + ); + + let rows = ctx + .session + .sql("SELECT ts FROM ts_tbl ORDER BY ts DESC LIMIT 2") + .await? + .collect() + .await?; + let formatted = pretty_format_batches(&rows)?.to_string(); + assert_snapshot!(formatted, @r" + +-----+ + | ts | + +-----+ + | 102 | + | 101 | + +-----+ + "); + + Ok(()) + } + + #[tokio::test] + async fn reverse_pushdown_rejected_for_unrelated_ordering() -> anyhow::Result<()> { + let ctx = TestSessionContext::default(); + + ctx.session + .sql( + "CREATE EXTERNAL TABLE ab_tbl \ + (a INT NOT NULL, b INT NOT NULL) \ + STORED AS vortex \ + WITH ORDER (a ASC) \ + LOCATION '/ab/'", + ) + .await?; + + ctx.session + .sql("INSERT INTO ab_tbl VALUES (1, 10), (2, 20)") + .await? + .collect() + .await?; + + // ORDER BY b DESC is unrelated to the table's declared ordering on `a`. The source + // must refuse to push down, leaving a SortExec in the plan and `reversed: true` + // absent from the source. + let tree = plan_tree(&ctx, "SELECT a, b FROM ab_tbl ORDER BY b DESC LIMIT 1").await?; + assert!( + tree.contains("Sort"), + "expected a Sort node when the requested ordering is unrelated, plan was:\n{tree}", + ); + assert!( + !tree.contains("reversed: true"), + "expected the source to not be reversed, plan was:\n{tree}", + ); + + Ok(()) + } + + #[tokio::test] + async fn reverse_pushdown_no_ordering_declared() -> anyhow::Result<()> { + let ctx = TestSessionContext::default(); + + ctx.session + .sql( + "CREATE EXTERNAL TABLE noord_tbl \ + (ts INT NOT NULL) \ + STORED AS vortex \ + LOCATION '/noord/'", + ) + .await?; + + ctx.session + .sql("INSERT INTO noord_tbl VALUES (3), (1), (2)") + .await? + .collect() + .await?; + + let tree = plan_tree(&ctx, "SELECT ts FROM noord_tbl ORDER BY ts DESC LIMIT 1").await?; + assert!( + tree.contains("Sort"), + "expected a Sort node when no WITH ORDER is declared, plan was:\n{tree}", + ); + assert!( + !tree.contains("reversed: true"), + "expected the source to not be reversed, plan was:\n{tree}", + ); + + Ok(()) + } + + #[tokio::test] + async fn reverse_pushdown_then_filter() -> anyhow::Result<()> { + let ctx = TestSessionContext::default(); + + ctx.session + .sql( + "CREATE EXTERNAL TABLE filt_tbl \ + (ts INT NOT NULL) \ + STORED AS vortex \ + WITH ORDER (ts ASC) \ + LOCATION '/filt/'", + ) + .await?; + + for chunk in [vec![1, 2, 3], vec![10, 11, 12], vec![100, 101, 102]] { + let values = chunk + .iter() + .map(|v| v.to_string()) + .collect::>() + .join("),("); + ctx.session + .sql(&format!("INSERT INTO filt_tbl VALUES ({values})")) + .await? + .collect() + .await?; + } + + // Filter pushdown and reverse sort pushdown should coexist. + let rows = ctx + .session + .sql("SELECT ts FROM filt_tbl WHERE ts < 50 ORDER BY ts DESC LIMIT 2") + .await? + .collect() + .await?; + + let formatted = pretty_format_batches(&rows)?.to_string(); + assert_snapshot!(formatted, @r" + +----+ + | ts | + +----+ + | 12 | + | 11 | + +----+ + "); + + Ok(()) + } + + #[tokio::test] + async fn order_by_desc_limit_returns_largest_rows() -> anyhow::Result<()> { + // Synergy test: combines Part 1 (limit pushdown with filter) and Part 2 (reverse + // sort pushdown). With files holding non-overlapping ts ranges, `ORDER BY ts DESC + // LIMIT k` should produce the largest k ts values, reading them from the last file + // (after reverse pushdown swaps file order, the last file becomes the first scanned). + let ctx = TestSessionContext::default(); + + ctx.session + .sql( + "CREATE EXTERNAL TABLE big_tbl \ + (ts INT NOT NULL, payload INT NOT NULL) \ + STORED AS vortex \ + WITH ORDER (ts ASC) \ + LOCATION '/big/'", + ) + .await?; + + // Five files, each with 100 rows. ts values across all files run 0..500 + // monotonically increasing, with the largest (400..500) in the last file. + for file_idx in 0..5 { + let base = file_idx * 100; + let values = (0..100) + .map(|i| format!("({}, {})", base + i, (base + i) * 2)) + .collect::>() + .join(", "); + ctx.session + .sql(&format!("INSERT INTO big_tbl VALUES {values}")) + .await? + .collect() + .await?; + } + + // Filter rules out ts < 50, then ORDER BY ts DESC LIMIT 10 should yield ts in + // [490..500], coming entirely from the last file. + let rows = ctx + .session + .sql("SELECT ts FROM big_tbl WHERE ts >= 50 ORDER BY ts DESC LIMIT 10") + .await? + .collect() + .await?; + let formatted = pretty_format_batches(&rows)?.to_string(); + assert_snapshot!(formatted, @r" + +-----+ + | ts | + +-----+ + | 499 | + | 498 | + | 497 | + | 496 | + | 495 | + | 494 | + | 493 | + | 492 | + | 491 | + | 490 | + +-----+ + "); + + // Confirm the source was marked reversed. + let tree = plan_tree( + &ctx, + "SELECT ts FROM big_tbl WHERE ts >= 50 ORDER BY ts DESC LIMIT 10", + ) + .await?; + assert!( + tree.contains("reversed: true"), + "expected reverse pushdown to engage end-to-end, plan was:\n{tree}", + ); + + Ok(()) + } + /// Doc example: demonstrates creating, writing, reading, and filtering a Vortex table. #[tokio::test] async fn doc_example() -> anyhow::Result<()> { diff --git a/vortex-datafusion/src/persistent/opener.rs b/vortex-datafusion/src/persistent/opener.rs index 96ba313c062..ff12083846c 100644 --- a/vortex-datafusion/src/persistent/opener.rs +++ b/vortex-datafusion/src/persistent/opener.rs @@ -40,6 +40,9 @@ use vortex::error::VortexError; use vortex::file::OpenOptionsSessionExt; use vortex::io::InstrumentedReadAt; use vortex::layout::LayoutReader; +use vortex::layout::segments::FileIdentity; +use vortex::layout::segments::FileVersion; +use vortex::layout::segments::SegmentCacheBuilder; use vortex::metrics::Label; use vortex::metrics::MetricsRegistry; use vortex::scan::ScanBuilder; @@ -54,10 +57,6 @@ use crate::convert::exprs::make_vortex_predicate; use crate::convert::schema::calculate_physical_schema; use crate::metrics::PARTITION_LABEL; use crate::metrics::PATH_LABEL; -use vortex::layout::segments::FileIdentity; -use vortex::layout::segments::FileVersion; -use vortex::layout::segments::SegmentCacheBuilder; - use crate::persistent::cache::CachedVortexMetadata; use crate::persistent::reader::VortexReaderFactory; use crate::persistent::stream::PrunableStream; @@ -360,9 +359,7 @@ impl FileOpener for VortexOpener { }) .transpose()?; - if let Some(limit) = limit - && filter.is_none() - { + if let Some(limit) = limit { scan_builder = scan_builder.with_limit(limit); } diff --git a/vortex-datafusion/src/persistent/source.rs b/vortex-datafusion/src/persistent/source.rs index cba47d0dcc4..531bd2669f0 100644 --- a/vortex-datafusion/src/persistent/source.rs +++ b/vortex-datafusion/src/persistent/source.rs @@ -13,13 +13,17 @@ use datafusion_datasource::file::FileSource; use datafusion_datasource::file_scan_config::FileScanConfig; use datafusion_datasource::file_stream::FileOpener; use datafusion_execution::cache::cache_manager::FileMetadataCache; +use datafusion_physical_expr::EquivalenceProperties; use datafusion_physical_expr::PhysicalExprRef; use datafusion_physical_expr::conjunction; use datafusion_physical_expr::projection::ProjectionExprs; use datafusion_physical_expr_adapter::DefaultPhysicalExprAdapterFactory; use datafusion_physical_expr_common::physical_expr::fmt_sql; +use datafusion_physical_expr_common::sort_expr::LexOrdering; +use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr; use datafusion_physical_plan::DisplayFormatType; use datafusion_physical_plan::PhysicalExpr; +use datafusion_physical_plan::SortOrderPushdownResult; use datafusion_physical_plan::filter_pushdown::FilterPushdownPropagation; use datafusion_physical_plan::filter_pushdown::PushedDown; use datafusion_physical_plan::filter_pushdown::PushedDownPredicate; @@ -266,12 +270,18 @@ impl FileSource for VortexSource { if let Some(ref predicate) = self.vortex_predicate { write!(f, ", predicate: {predicate}")?; } + if self.reversed { + write!(f, ", reversed: true")?; + } } // Use TreeRender style key=value formatting to display the predicate DisplayFormatType::TreeRender => { if let Some(ref predicate) = self.vortex_predicate { writeln!(f, "predicate={}", fmt_sql(predicate.as_ref()))?; }; + if self.reversed { + writeln!(f, "reversed=true")?; + } } } Ok(()) @@ -359,6 +369,62 @@ impl FileSource for VortexSource { Ok(Some(Arc::new(source))) } + /// Attempt to push down a reversed scan when the requested ordering is the strict reverse + /// of one of the source's declared output orderings. + /// + /// Mental model: the rebuilt source emits rows **already in the target order**, and the + /// scan-time atomic counter inside `VortexOpener` caps emission once the LIMIT is + /// satisfied. This is fundamentally different from the + /// dynamic-filter-pushdown shape `ParquetSource` uses for unsorted `ORDER BY x LIMIT k`, + /// where TopK above the scan feeds bounds back as a dynamic filter. Here the source is + /// the correctness guarantor for ordering; the `SortExec(TopK)` left above the rebuilt + /// source by the planner is just a passthrough cap plus a cross-split merge. + /// + /// The reversal itself is **exact** at the Vortex layer: `ScanBuilder::with_reversed` + /// flips intra-file split order *and* reverses rows within each chunk (via + /// `ArrayRef::reverse` on every produced array), so each file emits its rows in true + /// reverse order — not just reversed at chunk granularity. Combined with + /// `FileScanConfig::rebuild_with_source` reversing the `file_groups` order, the + /// overall stream is fully reversed. We could in principle return `Exact` and let the + /// planner drop the upstream sort entirely, but DataFusion 52's `rebuild_with_source` + /// only clears the stale `output_ordering` on the `Inexact` branch — returning `Exact` + /// would leave the rebuilt config still advertising the original (now-violated) + /// ordering and trip the sanity checker. So the `Inexact` return here is a + /// DataFusion-52 plumbing concession, not a Vortex correctness limitation. + fn try_reverse_output( + &self, + order: &[PhysicalSortExpr], + eq_properties: &EquivalenceProperties, + ) -> DFResult>> { + let Some(requested) = LexOrdering::new(order.iter().cloned()) else { + return Ok(SortOrderPushdownResult::Unsupported); + }; + + // The source advertises one or more candidate orderings via its equivalence + // properties. We can satisfy the request by reading in reverse only if reversing + // some declared ordering yields a prefix that matches the request. + let can_reverse = eq_properties + .oeq_class() + .iter() + .any(|candidate| candidate.is_reverse(&requested)); + + if !can_reverse { + return Ok(SortOrderPushdownResult::Unsupported); + } + + let reversed = self.clone().with_reversed(!self.reversed); + // TODO(df53): switch to `SortOrderPushdownResult::Exact` once the workspace bumps to + // DataFusion 53+. The reversal is exact at the Vortex layer; only DF 52's + // `rebuild_with_source` (which keeps the stale `output_ordering` on the `Exact` + // branch and trips `SanityCheckPlan`) forces us to return `Inexact` here. The DF 53 + // rewrite renames this hook to `try_pushdown_sort` and lets the source publish the + // new ordering, so `Exact` becomes correct and lets the planner drop the upstream + // `SortExec` entirely. + Ok(SortOrderPushdownResult::Inexact { + inner: Arc::new(reversed) as Arc, + }) + } + fn projection(&self) -> Option<&ProjectionExprs> { Some(&self.projection) } diff --git a/vortex-file/src/tests.rs b/vortex-file/src/tests.rs index e477146e5cd..e336d6fbbec 100644 --- a/vortex-file/src/tests.rs +++ b/vortex-file/src/tests.rs @@ -1696,3 +1696,107 @@ async fn timestamp_unit_mismatch_errors_with_constant_children() Ok(()) } + +#[tokio::test] +#[cfg_attr(miri, ignore)] +async fn limit_with_filter_truncates_globally() -> VortexResult<()> { + // A chunked array exposes multiple splits so the scan exercises the atomic-limit + // reservation across concurrent split tasks. + let chunks = (0..5) + .map(|chunk_idx| { + let base = chunk_idx * 100; + (base..base + 100).collect::().into_array() + }) + .collect::>(); + let numbers = ChunkedArray::from_iter(chunks).into_array(); + let array = StructArray::from_fields(&[("n", numbers)])?.into_array(); + + let mut buf = ByteBufferMut::empty(); + SESSION + .write_options() + .write(&mut buf, array.to_array_stream()) + .await?; + let file = SESSION.open_options().open_buffer(buf)?; + + // 500 rows total, 250 match the filter (n >= 250), but the limit caps the output at 7. + let result = file + .scan()? + .with_filter(gt_eq(get_item("n", root()), lit(250_i32))) + .with_limit(7) + .into_array_stream()? + .read_all() + .await?; + + assert_eq!( + result.len(), + 7, + "limit should be honored even when a filter is set", + ); + + Ok(()) +} + +#[tokio::test] +#[cfg_attr(miri, ignore)] +async fn limit_with_filter_straddles_split_boundary() -> VortexResult<()> { + // Three splits, filter matches every row, so total matching (300) exceeds the + // shared limit (250). At least one split must observe `reserved < true_count` and + // trim its mask via `Mask::limit(reserved)` — the most subtle correctness path in + // the atomic-limit design. Earlier-contributing splits exercise the + // `reserved == true_count` (no-trim) branch; later splits short-circuit at the + // top-of-function `remaining == 0` check. + let chunks = (0..3) + .map(|chunk_idx| { + let base = chunk_idx * 100; + (base..base + 100).collect::().into_array() + }) + .collect::>(); + let numbers = ChunkedArray::from_iter(chunks).into_array(); + let array = StructArray::from_fields(&[("n", numbers)])?.into_array(); + + let mut buf = ByteBufferMut::empty(); + SESSION + .write_options() + .write(&mut buf, array.to_array_stream()) + .await?; + let file = SESSION.open_options().open_buffer(buf)?; + + let result = file + .scan()? + .with_filter(gt_eq(get_item("n", root()), lit(0_i32))) + .with_limit(250) + .into_array_stream()? + .read_all() + .await?; + + // Soft cap: total emission must equal the limit because the filter accepts + // everything and the limit straddles a split boundary. + assert_eq!(result.len(), 250); + + Ok(()) +} + +#[tokio::test] +#[cfg_attr(miri, ignore)] +async fn limit_zero_with_filter_yields_no_rows() -> VortexResult<()> { + let numbers = (0..200).collect::().into_array(); + let array = StructArray::from_fields(&[("n", numbers)])?.into_array(); + + let mut buf = ByteBufferMut::empty(); + SESSION + .write_options() + .write(&mut buf, array.to_array_stream()) + .await?; + let file = SESSION.open_options().open_buffer(buf)?; + + let result = file + .scan()? + .with_filter(gt_eq(get_item("n", root()), lit(0_i32))) + .with_limit(0) + .into_array_stream()? + .read_all() + .await?; + + assert_eq!(result.len(), 0); + Ok(()) +} diff --git a/vortex-io/public-api.lock b/vortex-io/public-api.lock index 2037e272e3b..70f5d23f254 100644 --- a/vortex-io/public-api.lock +++ b/vortex-io/public-api.lock @@ -804,13 +804,13 @@ pub async fn vortex_io::object_store::ObjectStoreWrite::shutdown(&mut self) -> s pub async fn vortex_io::object_store::ObjectStoreWrite::write_all(&mut self, buffer: B) -> std::io::error::Result -impl vortex_io::VortexWrite for std::io::cursor::Cursor where std::io::cursor::Cursor: std::io::Write +impl vortex_io::VortexWrite for core::io::cursor::Cursor where core::io::cursor::Cursor: std::io::Write -pub fn std::io::cursor::Cursor::flush(&mut self) -> impl core::future::future::Future> +pub fn core::io::cursor::Cursor::flush(&mut self) -> impl core::future::future::Future> -pub fn std::io::cursor::Cursor::shutdown(&mut self) -> impl core::future::future::Future> +pub fn core::io::cursor::Cursor::shutdown(&mut self) -> impl core::future::future::Future> -pub fn std::io::cursor::Cursor::write_all(&mut self, buffer: B) -> impl core::future::future::Future> +pub fn core::io::cursor::Cursor::write_all(&mut self, buffer: B) -> impl core::future::future::Future> impl vortex_io::VortexWrite for vortex_io::AsyncWriteAdapter diff --git a/vortex-python/src/lib.rs b/vortex-python/src/lib.rs index f6a312855b6..62429f3cbb8 100644 --- a/vortex-python/src/lib.rs +++ b/vortex-python/src/lib.rs @@ -25,12 +25,14 @@ mod scan; mod serde; mod store; +use std::ops::Deref; +use std::sync::LazyLock; + use log::LevelFilter; use pyo3::exceptions::PyRuntimeError; use pyo3::prelude::*; -use pyo3_log::{Caching, Logger}; -use std::ops::Deref; -use std::sync::LazyLock; +use pyo3_log::Caching; +use pyo3_log::Logger; use tokio::runtime::Runtime; use vortex::VortexSessionDefault; use vortex::error::VortexError; diff --git a/vortex-scan/public-api.lock b/vortex-scan/public-api.lock index 7456d12de14..7c87e86f169 100644 --- a/vortex-scan/public-api.lock +++ b/vortex-scan/public-api.lock @@ -300,6 +300,8 @@ pub fn vortex_scan::ScanBuilder::ordered(&self) -> bool pub fn vortex_scan::ScanBuilder::prepare(self) -> vortex_error::VortexResult> +pub fn vortex_scan::ScanBuilder::reversed(&self) -> bool + pub fn vortex_scan::ScanBuilder::session(&self) -> &vortex_session::VortexSession pub fn vortex_scan::ScanBuilder::with_concurrency(self, concurrency: usize) -> Self @@ -314,6 +316,8 @@ pub fn vortex_scan::ScanBuilder::with_ordered(self, ordered: bool) -> Self pub fn vortex_scan::ScanBuilder::with_projection(self, projection: vortex_array::expr::expression::Expression) -> Self +pub fn vortex_scan::ScanBuilder::with_reversed(self, reversed: bool) -> Self + pub fn vortex_scan::ScanBuilder::with_row_indices(self, row_indices: vortex_buffer::buffer::Buffer) -> Self pub fn vortex_scan::ScanBuilder::with_row_offset(self, row_offset: u64) -> Self diff --git a/vortex-scan/src/repeated_scan.rs b/vortex-scan/src/repeated_scan.rs index 05df2bc4bcd..45a570ea7fa 100644 --- a/vortex-scan/src/repeated_scan.rs +++ b/vortex-scan/src/repeated_scan.rs @@ -5,6 +5,8 @@ use std::cmp; use std::iter; use std::ops::Range; use std::sync::Arc; +use std::sync::atomic::AtomicU64; +use std::sync::atomic::Ordering; use futures::Stream; use futures::future::BoxFuture; @@ -168,7 +170,10 @@ impl RepeatedScan { }), }; - let mut limit = self.limit; + // Promote the scalar limit to a shared atomic counter so that the limit can be applied + // safely across both the synchronous no-filter path and the asynchronous filter path. + // All tasks share this counter and reserve from it atomically. + let limit = self.limit.map(|l| Arc::new(AtomicU64::new(l))); let mut tasks = Vec::new(); let ranges = if self.reversed { @@ -182,11 +187,17 @@ impl RepeatedScan { continue; } - if limit.is_some_and(|l| l == 0) { + // The counter only ever decreases, so if a previous split has already drained it + // we can stop building tasks. Splits whose IO is already in flight will still + // observe the empty counter and short-circuit. + if limit + .as_ref() + .is_some_and(|l| l.load(Ordering::Acquire) == 0) + { break; } - tasks.push(split_exec(ctx.clone(), range, limit.as_mut())?); + tasks.push(split_exec(ctx.clone(), range, limit.clone())?); } Ok(tasks) diff --git a/vortex-scan/src/scan_builder.rs b/vortex-scan/src/scan_builder.rs index 84dcb55c2b1..f99f30784b6 100644 --- a/vortex-scan/src/scan_builder.rs +++ b/vortex-scan/src/scan_builder.rs @@ -30,7 +30,6 @@ use vortex_array::stream::ArrayStreamAdapter; use vortex_buffer::Buffer; use vortex_error::VortexExpect; use vortex_error::VortexResult; -use vortex_error::vortex_bail; use vortex_io::runtime::BlockingRuntime; use vortex_io::runtime::Handle; use vortex_io::runtime::Task; @@ -260,10 +259,6 @@ impl ScanBuilder { pub fn prepare(self) -> VortexResult> { let dtype = self.dtype()?; - if self.filter.is_some() && self.limit.is_some() { - vortex_bail!("Vortex doesn't support scans with both a filter and a limit") - } - // Spin up the root layout reader, and wrap it in a FilterLayoutReader to perform // conjunction splitting if a filter is provided. let mut layout_reader = self.layout_reader; diff --git a/vortex-scan/src/tasks.rs b/vortex-scan/src/tasks.rs index 9ba1e309948..9c447630951 100644 --- a/vortex-scan/src/tasks.rs +++ b/vortex-scan/src/tasks.rs @@ -6,6 +6,8 @@ use std::ops::BitAnd; use std::ops::Range; use std::sync::Arc; +use std::sync::atomic::AtomicU64; +use std::sync::atomic::Ordering; use bit_vec::BitVec; use futures::FutureExt; @@ -23,6 +25,39 @@ use crate::selection::Selection; pub type TaskFuture = BoxFuture<'static, VortexResult>; +/// Atomically reserve up to `available` rows from the shared `remaining` counter. +/// +/// Returns the number of rows actually reserved (which may be less than `available` if +/// the counter ran out). Uses a CAS loop so that concurrent splits cannot underflow the +/// counter or oversubscribe past the requested limit. +/// +/// The reservation is bounded by `available` so the returned value always fits back into +/// the same type. +fn reserve_up_to(remaining: &AtomicU64, available: usize) -> usize { + // Saturate the available rows to u64 so we can CAS on the shared atomic. Since the + // reservation is bounded by `available`, the returned value is bounded by it too, + // and we can cast back to usize without losing precision. + let available_u64 = u64::try_from(available).unwrap_or(u64::MAX); + let mut current = remaining.load(Ordering::Acquire); + let reserved_u64 = loop { + if current == 0 { + return 0; + } + let take = current.min(available_u64); + match remaining.compare_exchange_weak( + current, + current - take, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => break take, + Err(observed) => current = observed, + } + }; + // `reserved_u64 <= available_u64 <= available`, so it fits in usize. + usize::try_from(reserved_u64).unwrap_or(available) +} + /// Logic for executing a single split reading task. /// /// # Task execution flow @@ -38,7 +73,7 @@ pub type TaskFuture = BoxFuture<'static, VortexResult>; pub(super) fn split_exec( ctx: Arc>, split: Range, - limit: Option<&mut u64>, + limit: Option>, ) -> VortexResult>> { // Apply the selection to calculate a read mask let read_mask = ctx.selection.row_mask(&split); @@ -48,19 +83,25 @@ pub(super) fn split_exec( return Ok(ok(None).boxed()); } + // Early exit: if a shared limit is set and already exhausted, skip IO entirely. + if let Some(remaining) = limit.as_ref() + && remaining.load(Ordering::Acquire) == 0 + { + return Ok(ok(None).boxed()); + } + let filter_mask = match ctx.filter.as_ref() { // No filter == immediate mask None => { - let row_mask = match limit { - Some(l) if *l == 0 => Mask::new_false(row_mask.len()), - Some(l) => { + let row_mask = match limit.as_ref() { + Some(remaining) => { let true_count = row_mask.true_count(); - let mask_limit = usize::try_from(*l) - .map(|l| l.min(true_count)) - .unwrap_or(true_count); - let row_mask = row_mask.limit(mask_limit); - *l -= mask_limit as u64; - row_mask + let reserved = reserve_up_to(remaining, true_count); + if reserved == 0 { + Mask::new_false(row_mask.len()) + } else { + row_mask.limit(reserved) + } } None => row_mask, }; @@ -131,6 +172,20 @@ pub(super) fn split_exec( mask = conjunct_mask; } + // Apply the shared limit. We do this after the filter is fully evaluated so we + // only reserve from the global counter what this split would actually emit. + if let Some(remaining) = limit.as_ref() { + let true_count = mask.true_count(); + let reserved = reserve_up_to(remaining, true_count); + mask = if reserved == 0 { + Mask::new_false(mask.len()) + } else if reserved < true_count { + mask.limit(reserved) + } else { + mask + }; + } + Ok(mask) }) } @@ -168,3 +223,58 @@ pub(super) struct TaskContext { /// Function that maps into an A. pub(super) mapper: Arc VortexResult + Send + Sync>, } + +#[cfg(test)] +mod tests { + use std::sync::Arc; + use std::sync::atomic::AtomicU64; + use std::sync::atomic::Ordering; + use std::thread; + + use super::reserve_up_to; + + #[test] + fn reserve_up_to_drains_counter() { + let remaining = AtomicU64::new(10); + assert_eq!(reserve_up_to(&remaining, 4), 4); + assert_eq!(remaining.load(Ordering::Acquire), 6); + assert_eq!(reserve_up_to(&remaining, 10), 6); + assert_eq!(remaining.load(Ordering::Acquire), 0); + assert_eq!(reserve_up_to(&remaining, 10), 0); + } + + #[test] + fn reserve_up_to_concurrent_never_oversubscribes() { + // Even under heavy contention the CAS loop must never let the counter underflow or + // hand out more than the initial budget across all racing threads. + const LIMIT: u64 = 1024; + const THREADS: usize = 32; + const PER_THREAD_REQUEST: usize = 64; + + let remaining = Arc::new(AtomicU64::new(LIMIT)); + let handles: Vec<_> = (0..THREADS) + .map(|_| { + let remaining = remaining.clone(); + thread::spawn(move || { + let mut total: usize = 0; + while total < PER_THREAD_REQUEST { + let take = reserve_up_to(&remaining, PER_THREAD_REQUEST - total); + if take == 0 { + break; + } + total += take; + } + total + }) + }) + .collect(); + + let granted: usize = handles.into_iter().map(|h| h.join().unwrap()).sum(); + let granted_u64 = u64::try_from(granted).unwrap(); + assert!( + granted_u64 <= LIMIT, + "granted={granted_u64} exceeded limit={LIMIT}" + ); + assert_eq!(granted_u64 + remaining.load(Ordering::Acquire), LIMIT); + } +}