Skip to content

Commit

Permalink
ContextRef = Arc<Context> (#1802)
Browse files Browse the repository at this point in the history
Seems to follow the convention
  • Loading branch information
gatesn authored Jan 3, 2025
1 parent a69030c commit 830f49d
Show file tree
Hide file tree
Showing 15 changed files with 44 additions and 46 deletions.
2 changes: 1 addition & 1 deletion bench-vortex/src/clickbench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ pub async fn register_vortex_files(
.try_collect::<Vec<_>>()
.await?;

let format = Arc::new(VortexFormat::new(&CTX));
let format = Arc::new(VortexFormat::new(CTX.clone()));
let table_path = vortex_dir
.to_str()
.ok_or_else(|| vortex_err!("Path is not valid UTF-8"))?;
Expand Down
4 changes: 2 additions & 2 deletions bench-vortex/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use vortex::compress::CompressionStrategy;
use vortex::dtype::DType;
use vortex::fastlanes::DeltaEncoding;
use vortex::sampling_compressor::SamplingCompressor;
use vortex::{ArrayData, Context, IntoArrayData};
use vortex::{ArrayData, Context, ContextRef, IntoArrayData};

use crate::data_downloads::FileType;
use crate::reader::BATCH_SIZE;
Expand All @@ -45,7 +45,7 @@ pub mod vortex_utils;
const TARGET_BLOCK_BYTESIZE: usize = 16 * (1 << 20);
const TARGET_BLOCK_SIZE: usize = 64 * (1 << 10);

pub static CTX: LazyLock<Arc<Context>> = LazyLock::new(|| {
pub static CTX: LazyLock<ContextRef> = LazyLock::new(|| {
Arc::new(
Context::default()
.with_encodings(SamplingCompressor::default().used_encodings())
Expand Down
2 changes: 1 addition & 1 deletion bench-vortex/src/tpch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ async fn register_vortex_file(
})
.await?;

let format = Arc::new(VortexFormat::new(&CTX));
let format = Arc::new(VortexFormat::new(CTX.clone()));
let table_url = ListingTableUrl::parse(vtx_file.to_str().unwrap())?;
let config = ListingTableConfig::new(table_url)
.with_listing_options(ListingOptions::new(format as _))
Expand Down
5 changes: 5 additions & 0 deletions vortex-array/src/context.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::sync::Arc;

use crate::aliases::hash_map::HashMap;
use crate::array::{
BoolEncoding, ChunkedEncoding, ConstantEncoding, ExtensionEncoding, ListEncoding, NullEncoding,
Expand All @@ -11,6 +13,9 @@ pub struct Context {
encodings: HashMap<u16, EncodingRef>,
}

/// An atomic shared reference to a [`Context`].
pub type ContextRef = Arc<Context>;

impl Context {
pub fn with_encoding(mut self, encoding: EncodingRef) -> Self {
self.encodings.insert(encoding.id().code(), encoding);
Expand Down
4 changes: 2 additions & 2 deletions vortex-array/src/data/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::stats::{ArrayStatistics, Stat, Statistics, StatsSet};
use crate::stream::{ArrayStream, ArrayStreamAdapter};
use crate::validity::{ArrayValidity, LogicalValidity, ValidityVTable};
use crate::{
ArrayChildrenIterator, ArrayDType, ArrayLen, ArrayMetadata, Context, NamedChildrenCollector,
ArrayChildrenIterator, ArrayDType, ArrayLen, ArrayMetadata, ContextRef, NamedChildrenCollector,
TryDeserializeArrayMetadata,
};

Expand Down Expand Up @@ -79,7 +79,7 @@ impl ArrayData {
}

pub fn try_new_viewed<F>(
ctx: Arc<Context>,
ctx: ContextRef,
dtype: DType,
len: usize,
// TODO(ngates): use ConstByteBuffer
Expand Down
4 changes: 2 additions & 2 deletions vortex-array/src/data/viewed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use vortex_scalar::{Scalar, ScalarValue};
use crate::encoding::opaque::OpaqueEncoding;
use crate::encoding::EncodingRef;
use crate::stats::{Stat, Statistics, StatsSet};
use crate::{flatbuffers as fb, ArrayData, ArrayMetadata, ChildrenCollector, Context};
use crate::{flatbuffers as fb, ArrayData, ArrayMetadata, ChildrenCollector, ContextRef};

/// Zero-copy view over flatbuffer-encoded array data, created without eager serialization.
#[derive(Clone)]
Expand All @@ -24,7 +24,7 @@ pub(super) struct ViewedArrayData {
pub(super) flatbuffer: ByteBuffer,
pub(super) flatbuffer_loc: usize,
pub(super) buffers: Arc<[ByteBuffer]>,
pub(super) ctx: Arc<Context>,
pub(super) ctx: ContextRef,
#[cfg(feature = "canonical_counter")]
pub(super) canonical_counter: Arc<std::sync::atomic::AtomicUsize>,
}
Expand Down
6 changes: 3 additions & 3 deletions vortex-datafusion/src/persistent/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
use itertools::Itertools;
use vortex_array::Context;
use vortex_array::ContextRef;

use super::cache::InitialReadCache;
use crate::persistent::opener::VortexFileOpener;
Expand All @@ -23,7 +23,7 @@ pub struct VortexExec {
predicate: Option<Arc<dyn PhysicalExpr>>,
plan_properties: PlanProperties,
projected_statistics: Statistics,
ctx: Arc<Context>,
ctx: ContextRef,
initial_read_cache: InitialReadCache,
}

Expand All @@ -32,7 +32,7 @@ impl VortexExec {
file_scan_config: FileScanConfig,
metrics: ExecutionPlanMetricsSet,
predicate: Option<Arc<dyn PhysicalExpr>>,
ctx: Arc<Context>,
ctx: ContextRef,
initial_read_cache: InitialReadCache,
) -> DFResult<Self> {
let projected_schema = project_schema(
Expand Down
8 changes: 4 additions & 4 deletions vortex-datafusion/src/persistent/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use futures::{stream, StreamExt as _, TryStreamExt as _};
use object_store::{ObjectMeta, ObjectStore};
use vortex_array::array::StructArray;
use vortex_array::arrow::infer_schema;
use vortex_array::Context;
use vortex_array::ContextRef;
use vortex_error::VortexResult;
use vortex_file::metadata::fetch_metadata;
use vortex_file::{
Expand All @@ -35,7 +35,7 @@ use crate::can_be_pushed_down;

#[derive(Debug)]
pub struct VortexFormat {
context: Arc<Context>,
context: ContextRef,
initial_read_cache: InitialReadCache,
opts: VortexFormatOptions,
}
Expand Down Expand Up @@ -68,10 +68,10 @@ impl Default for VortexFormat {
}

impl VortexFormat {
pub fn new(context: &Context) -> Self {
pub fn new(context: ContextRef) -> Self {
let opts = VortexFormatOptions::default();
Self {
context: Arc::new(context.clone()),
context,
initial_read_cache: InitialReadCache::new(opts.cache_size_mb),
opts,
}
Expand Down
4 changes: 2 additions & 2 deletions vortex-datafusion/src/persistent/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use datafusion_common::Result as DFResult;
use datafusion_physical_expr::{split_conjunction, PhysicalExpr};
use futures::{FutureExt as _, StreamExt, TryStreamExt};
use object_store::ObjectStore;
use vortex_array::Context;
use vortex_array::ContextRef;
use vortex_expr::datafusion::convert_expr_to_vortex;
use vortex_file::{LayoutContext, LayoutDeserializer, Projection, RowFilter, VortexReadBuilder};
use vortex_io::{IoDispatcher, ObjectStoreReadAt};
Expand All @@ -20,7 +20,7 @@ static IO_DISPATCHER: LazyLock<Arc<IoDispatcher>> =

#[derive(Clone)]
pub struct VortexFileOpener {
pub ctx: Arc<Context>,
pub ctx: ContextRef,
pub object_store: Arc<dyn ObjectStore>,
pub projection: Option<Vec<usize>>,
pub predicate: Option<Arc<dyn PhysicalExpr>>,
Expand Down
8 changes: 4 additions & 4 deletions vortex-file/src/read/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::fmt::{Debug, Display, Formatter};
use std::sync::Arc;

use vortex_array::aliases::hash_map::HashMap;
use vortex_array::Context;
use vortex_array::ContextRef;
use vortex_error::{vortex_err, VortexResult};
use vortex_flatbuffers::footer as fb;

Expand Down Expand Up @@ -61,12 +61,12 @@ impl Default for LayoutContext {

#[derive(Default, Debug, Clone)]
pub struct LayoutDeserializer {
ctx: Arc<Context>,
ctx: ContextRef,
layout_ctx: Arc<LayoutContext>,
}

impl LayoutDeserializer {
pub fn new(ctx: Arc<Context>, layout_ctx: Arc<LayoutContext>) -> Self {
pub fn new(ctx: ContextRef, layout_ctx: Arc<LayoutContext>) -> Self {
Self { ctx, layout_ctx }
}

Expand All @@ -84,7 +84,7 @@ impl LayoutDeserializer {
.reader(path, layout, dtype, scan, self.clone())
}

pub(crate) fn ctx(&self) -> Arc<Context> {
pub(crate) fn ctx(&self) -> ContextRef {
self.ctx.clone()
}
}
4 changes: 2 additions & 2 deletions vortex-file/src/read/layouts/flat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::collections::BTreeSet;
use std::sync::Arc;

use bytes::Bytes;
use vortex_array::{ArrayData, Context};
use vortex_array::{ArrayData, ContextRef};
use vortex_error::{vortex_bail, VortexResult};
use vortex_flatbuffers::footer;
use vortex_ipc::messages::{BufMessageReader, DecoderMessage};
Expand Down Expand Up @@ -52,7 +52,7 @@ pub struct FlatLayoutReader {
range: ByteRange,
scan: Scan,
dtype: Arc<LazyDType>,
ctx: Arc<Context>,
ctx: ContextRef,
}

impl FlatLayoutReader {
Expand Down
13 changes: 5 additions & 8 deletions vortex-ipc/src/iterator.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
use std::io::{Read, Write};
use std::sync::Arc;

use bytes::{Bytes, BytesMut};
use itertools::Itertools;
use vortex_array::iter::ArrayIterator;
use vortex_array::{ArrayDType, ArrayData, Context};
use vortex_array::{ArrayDType, ArrayData, ContextRef};
use vortex_dtype::DType;
use vortex_error::{vortex_bail, vortex_err, VortexResult};

Expand All @@ -13,12 +12,12 @@ use crate::messages::{DecoderMessage, EncoderMessage, MessageEncoder, SyncMessag
/// An [`ArrayIterator`] for reading messages off an IPC stream.
pub struct SyncIPCReader<R: Read> {
reader: SyncMessageReader<R>,
ctx: Arc<Context>,
ctx: ContextRef,
dtype: DType,
}

impl<R: Read> SyncIPCReader<R> {
pub fn try_new(read: R, ctx: Arc<Context>) -> VortexResult<Self> {
pub fn try_new(read: R, ctx: ContextRef) -> VortexResult<Self> {
let mut reader = SyncMessageReader::new(read);
match reader.next().transpose()? {
Some(msg) => match msg {
Expand Down Expand Up @@ -152,11 +151,10 @@ impl Iterator for ArrayIteratorIPCBytes {
#[cfg(test)]
mod test {
use std::io::Cursor;
use std::sync::Arc;

use vortex_array::array::PrimitiveArray;
use vortex_array::iter::{ArrayIterator, ArrayIteratorExt};
use vortex_array::{ArrayDType, Context, IntoArrayVariant, ToArrayData};
use vortex_array::{ArrayDType, IntoArrayVariant, ToArrayData};

use super::*;

Expand All @@ -170,8 +168,7 @@ mod test {
.collect_to_buffer()
.unwrap();

let reader =
SyncIPCReader::try_new(Cursor::new(ipc_buffer), Arc::new(Context::default())).unwrap();
let reader = SyncIPCReader::try_new(Cursor::new(ipc_buffer), Default::default()).unwrap();

assert_eq!(reader.dtype(), array.dtype());
let result = reader.into_array_data().unwrap().into_primitive().unwrap();
Expand Down
9 changes: 4 additions & 5 deletions vortex-ipc/src/messages/decoder.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
use std::fmt::{Debug, Formatter};
use std::sync::Arc;

use bytes::Buf;
use flatbuffers::{root, root_unchecked, Follow};
use itertools::Itertools;
use vortex_array::{flatbuffers as fba, ArrayData, Context};
use vortex_array::{flatbuffers as fba, ArrayData, ContextRef};
use vortex_buffer::{AlignedBuf, Alignment, ByteBuffer};
use vortex_dtype::DType;
use vortex_error::{vortex_bail, vortex_err, VortexExpect, VortexResult};
Expand All @@ -15,7 +14,7 @@ use crate::ALIGNMENT;

/// A message decoded from an IPC stream.
///
/// Note that the `Array` variant cannot fully decode into an [`ArrayData`] without a [`Context`]
/// Note that the `Array` variant cannot fully decode into an [`ArrayData`] without a [`ContextRef`]
/// and a [`DType`]. As such, we partially decode into an [`ArrayParts`] and allow the caller to
/// finish the decoding.
#[derive(Debug)]
Expand Down Expand Up @@ -47,7 +46,7 @@ impl Debug for ArrayParts {
}

impl ArrayParts {
pub fn into_array_data(self, ctx: Arc<Context>, dtype: DType) -> VortexResult<ArrayData> {
pub fn into_array_data(self, ctx: ContextRef, dtype: DType) -> VortexResult<ArrayData> {
ArrayData::try_new_viewed(
ctx,
dtype,
Expand Down Expand Up @@ -290,7 +289,7 @@ mod test {

// Decode the array parts with the context
let actual = array_parts
.into_array_data(Arc::new(Context::default()), expected.dtype().clone())
.into_array_data(Default::default(), expected.dtype().clone())
.unwrap();

assert_eq!(expected.len(), actual.len());
Expand Down
13 changes: 5 additions & 8 deletions vortex-ipc/src/stream.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{ready, Poll};

use bytes::{Bytes, BytesMut};
use futures_util::{AsyncRead, AsyncWrite, AsyncWriteExt, Stream, StreamExt, TryStreamExt};
use pin_project_lite::pin_project;
use vortex_array::stream::ArrayStream;
use vortex_array::{ArrayDType, ArrayData, Context};
use vortex_array::{ArrayDType, ArrayData, ContextRef};
use vortex_dtype::DType;
use vortex_error::{vortex_bail, vortex_err, VortexResult};

Expand All @@ -18,13 +17,13 @@ pin_project! {
pub struct AsyncIPCReader<R> {
#[pin]
reader: AsyncMessageReader<R>,
ctx: Arc<Context>,
ctx: ContextRef,
dtype: DType,
}
}

impl<R: AsyncRead + Unpin> AsyncIPCReader<R> {
pub async fn try_new(read: R, ctx: Arc<Context>) -> VortexResult<Self> {
pub async fn try_new(read: R, ctx: ContextRef) -> VortexResult<Self> {
let mut reader = AsyncMessageReader::new(read);

let dtype = match reader.next().await.transpose()? {
Expand Down Expand Up @@ -186,12 +185,10 @@ impl Stream for ArrayStreamIPCBytes {

#[cfg(test)]
mod test {
use std::sync::Arc;

use futures_util::io::Cursor;
use vortex_array::array::PrimitiveArray;
use vortex_array::stream::{ArrayStream, ArrayStreamExt};
use vortex_array::{ArrayDType, Context, IntoArrayVariant, ToArrayData};
use vortex_array::{ArrayDType, IntoArrayVariant, ToArrayData};

use super::*;

Expand All @@ -206,7 +203,7 @@ mod test {
.await
.unwrap();

let reader = AsyncIPCReader::try_new(Cursor::new(ipc_buffer), Arc::new(Context::default()))
let reader = AsyncIPCReader::try_new(Cursor::new(ipc_buffer), Default::default())
.await
.unwrap();

Expand Down
4 changes: 2 additions & 2 deletions vortex-sampling-compressor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use vortex_array::array::{
VarBinViewEncoding,
};
use vortex_array::encoding::EncodingRef;
use vortex_array::Context;
use vortex_array::{Context, ContextRef};
use vortex_bytebool::ByteBoolEncoding;
use vortex_datetime_parts::DateTimePartsEncoding;
use vortex_dict::DictEncoding;
Expand Down Expand Up @@ -121,7 +121,7 @@ pub const ALL_COMPRESSORS: [CompressorRef; 17] = [
&ZigZagCompressor,
];

pub static ALL_ENCODINGS_CONTEXT: LazyLock<Arc<Context>> = LazyLock::new(|| {
pub static ALL_ENCODINGS_CONTEXT: LazyLock<ContextRef> = LazyLock::new(|| {
Arc::new(Context::default().with_encodings([
&ALPEncoding as EncodingRef,
&ALPRDEncoding,
Expand Down

0 comments on commit 830f49d

Please sign in to comment.