diff --git a/daft/daft/__init__.pyi b/daft/daft/__init__.pyi index 3ad38f6885..798c91ce42 100644 --- a/daft/daft/__init__.pyi +++ b/daft/daft/__init__.pyi @@ -586,8 +586,8 @@ class IOConfig: """Replaces values if provided, returning a new IOConfig.""" ... -class NativeStorageConfig: - """Storage configuration for the Rust-native I/O layer.""" +class StorageConfig: + """Configuration for interacting with a particular storage backend.""" # Whether or not to use a multithreaded tokio runtime for processing I/O multithreaded_io: bool @@ -595,17 +595,6 @@ class NativeStorageConfig: def __init__(self, multithreaded_io: bool, io_config: IOConfig): ... -class StorageConfig: - """Configuration for interacting with a particular storage backend, using a particular I/O layer implementation.""" - - @staticmethod - def native(config: NativeStorageConfig) -> StorageConfig: - """Create from a native storage config.""" - ... - - @property - def config(self) -> NativeStorageConfig: ... - class ScanTask: """A batch of scan tasks for reading data from an external source.""" diff --git a/daft/delta_lake/delta_lake_scan.py b/daft/delta_lake/delta_lake_scan.py index 20dd2e93ae..b81b0128e9 100644 --- a/daft/delta_lake/delta_lake_scan.py +++ b/daft/delta_lake/delta_lake_scan.py @@ -41,7 +41,7 @@ def __init__( # Thus, if we don't detect any credentials being available, we attempt to detect it from the environment using our Daft credentials chain. # # See: https://github.com/delta-io/delta-rs/issues/2117 - deltalake_sdk_io_config = storage_config.config.io_config + deltalake_sdk_io_config = storage_config.io_config scheme = urlparse(table_uri).scheme if scheme == "s3" or scheme == "s3a": # Try to get region from boto3 diff --git a/daft/hudi/hudi_scan.py b/daft/hudi/hudi_scan.py index d68cfd99bb..2f342e4a6a 100644 --- a/daft/hudi/hudi_scan.py +++ b/daft/hudi/hudi_scan.py @@ -25,7 +25,7 @@ class HudiScanOperator(ScanOperator): def __init__(self, table_uri: str, storage_config: StorageConfig) -> None: super().__init__() - resolved_path, self._resolved_fs = _resolve_paths_and_filesystem(table_uri, storage_config.config.io_config) + resolved_path, self._resolved_fs = _resolve_paths_and_filesystem(table_uri, storage_config.io_config) self._table = HudiTable(table_uri, self._resolved_fs, resolved_path[0]) self._storage_config = storage_config self._schema = Schema.from_pyarrow_schema(self._table.schema) diff --git a/daft/io/_csv.py b/daft/io/_csv.py index dab07ef452..5c57c21918 100644 --- a/daft/io/_csv.py +++ b/daft/io/_csv.py @@ -8,7 +8,6 @@ CsvSourceConfig, FileFormatConfig, IOConfig, - NativeStorageConfig, StorageConfig, ) from daft.dataframe import DataFrame @@ -87,7 +86,7 @@ def read_csv( chunk_size=_chunk_size, ) file_format_config = FileFormatConfig.from_csv_config(csv_config) - storage_config = StorageConfig.native(NativeStorageConfig(True, io_config)) + storage_config = StorageConfig(True, io_config) builder = get_tabular_files_scan( path=path, diff --git a/daft/io/_deltalake.py b/daft/io/_deltalake.py index 24ba15ee43..8cbf6a6cf5 100644 --- a/daft/io/_deltalake.py +++ b/daft/io/_deltalake.py @@ -4,7 +4,7 @@ from daft import context from daft.api_annotations import PublicAPI -from daft.daft import IOConfig, NativeStorageConfig, ScanOperatorHandle, StorageConfig +from daft.daft import IOConfig, ScanOperatorHandle, StorageConfig from daft.dataframe import DataFrame from daft.dependencies import unity_catalog from daft.io.catalog import DataCatalogTable @@ -60,7 +60,7 @@ def read_deltalake( ) io_config = context.get_context().daft_planning_config.default_io_config if io_config is None else io_config - storage_config = StorageConfig.native(NativeStorageConfig(multithreaded_io, io_config)) + storage_config = StorageConfig(multithreaded_io, io_config) if isinstance(table, str): table_uri = table @@ -72,7 +72,7 @@ def read_deltalake( # Override the storage_config with the one provided by Unity catalog table_io_config = table.io_config if table_io_config is not None: - storage_config = StorageConfig.native(NativeStorageConfig(multithreaded_io, table_io_config)) + storage_config = StorageConfig(multithreaded_io, table_io_config) else: raise ValueError( f"table argument must be a table URI string, DataCatalogTable or UnityCatalogTable instance, but got: {type(table)}, {table}" diff --git a/daft/io/_hudi.py b/daft/io/_hudi.py index a8d4c6f999..2a70188f65 100644 --- a/daft/io/_hudi.py +++ b/daft/io/_hudi.py @@ -4,7 +4,7 @@ from daft import context from daft.api_annotations import PublicAPI -from daft.daft import IOConfig, NativeStorageConfig, ScanOperatorHandle, StorageConfig +from daft.daft import IOConfig, ScanOperatorHandle, StorageConfig from daft.dataframe import DataFrame from daft.logical.builder import LogicalPlanBuilder @@ -33,7 +33,7 @@ def read_hudi( io_config = context.get_context().daft_planning_config.default_io_config if io_config is None else io_config multithreaded_io = context.get_context().get_or_create_runner().name != "ray" - storage_config = StorageConfig.native(NativeStorageConfig(multithreaded_io, io_config)) + storage_config = StorageConfig(multithreaded_io, io_config) hudi_operator = HudiScanOperator(table_uri, storage_config=storage_config) diff --git a/daft/io/_iceberg.py b/daft/io/_iceberg.py index 62f47babba..dbf94dd76d 100644 --- a/daft/io/_iceberg.py +++ b/daft/io/_iceberg.py @@ -4,7 +4,7 @@ from daft import context from daft.api_annotations import PublicAPI -from daft.daft import IOConfig, NativeStorageConfig, ScanOperatorHandle, StorageConfig +from daft.daft import IOConfig, ScanOperatorHandle, StorageConfig from daft.dataframe import DataFrame from daft.logical.builder import LogicalPlanBuilder @@ -123,7 +123,7 @@ def read_iceberg( io_config = context.get_context().daft_planning_config.default_io_config if io_config is None else io_config multithreaded_io = context.get_context().get_or_create_runner().name != "ray" - storage_config = StorageConfig.native(NativeStorageConfig(multithreaded_io, io_config)) + storage_config = StorageConfig(multithreaded_io, io_config) iceberg_operator = IcebergScanOperator(pyiceberg_table, snapshot_id=snapshot_id, storage_config=storage_config) diff --git a/daft/io/_json.py b/daft/io/_json.py index b99620c566..62bb02ac15 100644 --- a/daft/io/_json.py +++ b/daft/io/_json.py @@ -8,7 +8,6 @@ FileFormatConfig, IOConfig, JsonSourceConfig, - NativeStorageConfig, StorageConfig, ) from daft.dataframe import DataFrame @@ -64,7 +63,7 @@ def read_json( json_config = JsonSourceConfig(_buffer_size, _chunk_size) file_format_config = FileFormatConfig.from_json_config(json_config) - storage_config = StorageConfig.native(NativeStorageConfig(True, io_config)) + storage_config = StorageConfig(True, io_config) builder = get_tabular_files_scan( path=path, diff --git a/daft/io/_parquet.py b/daft/io/_parquet.py index 22ca1f6ac3..e133f2a505 100644 --- a/daft/io/_parquet.py +++ b/daft/io/_parquet.py @@ -7,7 +7,6 @@ from daft.daft import ( FileFormatConfig, IOConfig, - NativeStorageConfig, ParquetSourceConfig, StorageConfig, ) @@ -84,7 +83,7 @@ def read_parquet( file_format_config = FileFormatConfig.from_parquet_config( ParquetSourceConfig(coerce_int96_timestamp_unit=pytimeunit, row_groups=row_groups, chunk_size=_chunk_size) ) - storage_config = StorageConfig.native(NativeStorageConfig(multithreaded_io, io_config)) + storage_config = StorageConfig(multithreaded_io, io_config) builder = get_tabular_files_scan( path=path, diff --git a/daft/table/table_io.py b/daft/table/table_io.py index dc022c4375..2992cf6709 100644 --- a/daft/table/table_io.py +++ b/daft/table/table_io.py @@ -17,7 +17,6 @@ JsonConvertOptions, JsonParseOptions, JsonReadOptions, - NativeStorageConfig, StorageConfig, ) from daft.dependencies import pa, pacsv, pads, pq @@ -90,7 +89,7 @@ def read_json( Returns: MicroPartition: Parsed MicroPartition from JSON """ - config = storage_config.config if storage_config is not None else NativeStorageConfig(True, IOConfig()) + config = storage_config if storage_config is not None else StorageConfig(True, IOConfig()) assert isinstance(file, (str, pathlib.Path)), "Native downloader only works on string inputs to read_json" json_convert_options = JsonConvertOptions( limit=read_options.num_rows, @@ -126,7 +125,7 @@ def read_parquet( Returns: MicroPartition: Parsed MicroPartition from Parquet """ - config = storage_config.config if storage_config is not None else NativeStorageConfig(True, IOConfig()) + config = storage_config if storage_config is not None else StorageConfig(True, IOConfig()) assert isinstance( file, (str, pathlib.Path) ), "Native downloader only works on string or Path inputs to read_parquet" @@ -211,7 +210,7 @@ def read_csv( Returns: MicroPartition: Parsed MicroPartition from CSV """ - config = storage_config.config if storage_config is not None else NativeStorageConfig(True, IOConfig()) + config = storage_config if storage_config is not None else StorageConfig(True, IOConfig()) assert isinstance(file, (str, pathlib.Path)), "Native downloader only works on string or Path inputs to read_csv" has_header = csv_options.header_index is not None csv_convert_options = CsvConvertOptions( diff --git a/src/daft-local-execution/src/sources/scan_task.rs b/src/daft-local-execution/src/sources/scan_task.rs index 21be8d1610..6e4a76e1fe 100644 --- a/src/daft-local-execution/src/sources/scan_task.rs +++ b/src/daft-local-execution/src/sources/scan_task.rs @@ -15,7 +15,7 @@ use daft_io::IOStatsRef; use daft_json::{JsonConvertOptions, JsonParseOptions, JsonReadOptions}; use daft_micropartition::MicroPartition; use daft_parquet::read::{read_parquet_bulk_async, ParquetSchemaInferenceOptions}; -use daft_scan::{storage_config::StorageConfig, ChunkSpec, ScanTask}; +use daft_scan::{ChunkSpec, ScanTask}; use futures::{Stream, StreamExt, TryStreamExt}; use snafu::ResultExt; use tracing::instrument; @@ -241,14 +241,14 @@ async fn stream_scan_task( } let source = scan_task.sources.first().unwrap(); let url = source.get_path(); - let (io_config, multi_threaded_io) = match scan_task.storage_config.as_ref() { - StorageConfig::Native(native_storage_config) => ( - native_storage_config.io_config.as_ref(), - native_storage_config.multithreaded_io, - ), - }; - let io_config = Arc::new(io_config.cloned().unwrap_or_default()); - let io_client = daft_io::get_io_client(multi_threaded_io, io_config)?; + let io_config = Arc::new( + scan_task + .storage_config + .io_config + .clone() + .unwrap_or_default(), + ); + let io_client = daft_io::get_io_client(scan_task.storage_config.multithreaded_io, io_config)?; let table_stream = match scan_task.file_format_config.as_ref() { FileFormatConfig::Parquet(ParquetSourceConfig { coerce_int96_timestamp_unit, diff --git a/src/daft-micropartition/src/micropartition.rs b/src/daft-micropartition/src/micropartition.rs index 7451c28a5d..5de8f40494 100644 --- a/src/daft-micropartition/src/micropartition.rs +++ b/src/daft-micropartition/src/micropartition.rs @@ -19,10 +19,7 @@ use daft_json::{JsonConvertOptions, JsonParseOptions, JsonReadOptions}; use daft_parquet::read::{ read_parquet_bulk, read_parquet_metadata_bulk, ParquetSchemaInferenceOptions, }; -use daft_scan::{ - storage_config::{NativeStorageConfig, StorageConfig}, - ChunkSpec, DataSource, ScanTask, -}; +use daft_scan::{storage_config::StorageConfig, ChunkSpec, DataSource, ScanTask}; use daft_stats::{PartitionSpec, TableMetadata, TableStatistics}; use daft_table::Table; use parquet2::metadata::FileMetaData; @@ -111,193 +108,193 @@ fn materialize_scan_task( .iter() .map(daft_scan::DataSource::get_path); - let mut table_values = match scan_task.storage_config.as_ref() { - StorageConfig::Native(native_storage_config) => { - let multithreaded_io = native_storage_config.multithreaded_io; - let io_config = Arc::new(native_storage_config.io_config.clone().unwrap_or_default()); - let io_client = daft_io::get_io_client(multithreaded_io, io_config).unwrap(); - - match scan_task.file_format_config.as_ref() { - // ******************** - // Native Parquet Reads - // ******************** - FileFormatConfig::Parquet(ParquetSourceConfig { - coerce_int96_timestamp_unit, - field_id_mapping, - chunk_size, - .. - }) => { - let inference_options = - ParquetSchemaInferenceOptions::new(Some(*coerce_int96_timestamp_unit)); + let multithreaded_io = scan_task.storage_config.multithreaded_io; + let io_config = Arc::new( + scan_task + .storage_config + .io_config + .clone() + .unwrap_or_default(), + ); + let io_client = daft_io::get_io_client(multithreaded_io, io_config).unwrap(); + + let mut table_values = match scan_task.file_format_config.as_ref() { + // ******************** + // Native Parquet Reads + // ******************** + FileFormatConfig::Parquet(ParquetSourceConfig { + coerce_int96_timestamp_unit, + field_id_mapping, + chunk_size, + .. + }) => { + let inference_options = + ParquetSchemaInferenceOptions::new(Some(*coerce_int96_timestamp_unit)); - // TODO: This is a hardcoded magic value but should be configurable - let num_parallel_tasks = 8; + // TODO: This is a hardcoded magic value but should be configurable + let num_parallel_tasks = 8; - let urls = urls.collect::>(); + let urls = urls.collect::>(); - // Create vec of all unique delete files in the scan task - let iceberg_delete_files = scan_task - .sources - .iter() - .filter_map(|s| s.get_iceberg_delete_files()) - .flatten() - .map(String::as_str) - .collect::>() - .into_iter() - .collect::>(); - - let delete_map = _read_delete_files( - iceberg_delete_files.as_slice(), - urls.as_slice(), - io_client.clone(), - io_stats.clone(), - num_parallel_tasks, - multithreaded_io, - &inference_options, - ) - .context(DaftCoreComputeSnafu)?; + // Create vec of all unique delete files in the scan task + let iceberg_delete_files = scan_task + .sources + .iter() + .filter_map(|s| s.get_iceberg_delete_files()) + .flatten() + .map(String::as_str) + .collect::>() + .into_iter() + .collect::>(); - let row_groups = parquet_sources_to_row_groups(scan_task.sources.as_slice()); - let metadatas = scan_task - .sources - .iter() - .map(|s| s.get_parquet_metadata().cloned()) - .collect::>>(); - daft_parquet::read::read_parquet_bulk( - urls.as_slice(), - file_column_names.as_deref(), - None, - scan_task.pushdowns.limit, - row_groups, - scan_task.pushdowns.filters.clone(), - io_client, - io_stats, - num_parallel_tasks, - multithreaded_io, - &inference_options, - field_id_mapping.clone(), - metadatas, - Some(delete_map), - *chunk_size, - ) - .context(DaftCoreComputeSnafu)? - } + let delete_map = _read_delete_files( + iceberg_delete_files.as_slice(), + urls.as_slice(), + io_client.clone(), + io_stats.clone(), + num_parallel_tasks, + multithreaded_io, + &inference_options, + ) + .context(DaftCoreComputeSnafu)?; - // **************** - // Native CSV Reads - // **************** - FileFormatConfig::Csv(cfg) => { - let schema_of_file = scan_task.schema.clone(); - let col_names = if !cfg.has_headers { - Some( - schema_of_file - .fields - .values() - .map(|f| f.name.as_str()) - .collect::>(), - ) - } else { - None - }; - let convert_options = CsvConvertOptions::new_internal( - scan_task.pushdowns.limit, - file_column_names - .as_ref() - .map(|cols| cols.iter().map(|col| (*col).to_string()).collect()), - col_names - .as_ref() - .map(|cols| cols.iter().map(|col| (*col).to_string()).collect()), - Some(schema_of_file), - scan_task.pushdowns.filters.clone(), - ); - let parse_options = CsvParseOptions::new_with_defaults( - cfg.has_headers, - cfg.delimiter, - cfg.double_quote, - cfg.quote, - cfg.allow_variable_columns, - cfg.escape_char, - cfg.comment, - ) - .context(DaftCSVSnafu)?; - let read_options = - CsvReadOptions::new_internal(cfg.buffer_size, cfg.chunk_size); - let uris = urls.collect::>(); - daft_csv::read_csv_bulk( - uris.as_slice(), - Some(convert_options), - Some(parse_options), - Some(read_options), - io_client, - io_stats, - native_storage_config.multithreaded_io, - None, - 8, - ) - .context(DaftCoreComputeSnafu)? - } + let row_groups = parquet_sources_to_row_groups(scan_task.sources.as_slice()); + let metadatas = scan_task + .sources + .iter() + .map(|s| s.get_parquet_metadata().cloned()) + .collect::>>(); + daft_parquet::read::read_parquet_bulk( + urls.as_slice(), + file_column_names.as_deref(), + None, + scan_task.pushdowns.limit, + row_groups, + scan_task.pushdowns.filters.clone(), + io_client, + io_stats, + num_parallel_tasks, + multithreaded_io, + &inference_options, + field_id_mapping.clone(), + metadatas, + Some(delete_map), + *chunk_size, + ) + .context(DaftCoreComputeSnafu)? + } - // **************** - // Native JSON Reads - // **************** - FileFormatConfig::Json(cfg) => { - let convert_options = JsonConvertOptions::new_internal( - scan_task.pushdowns.limit, - file_column_names - .as_ref() - .map(|cols| cols.iter().map(|col| (*col).to_string()).collect()), - Some(scan_task.schema.clone()), - scan_task.pushdowns.filters.clone(), - ); - let parse_options = JsonParseOptions::new_internal(); - let read_options = - JsonReadOptions::new_internal(cfg.buffer_size, cfg.chunk_size); - let uris = urls.collect::>(); - daft_json::read_json_bulk( - uris.as_slice(), - Some(convert_options), - Some(parse_options), - Some(read_options), - io_client, - io_stats, - native_storage_config.multithreaded_io, - None, - 8, - ) - .context(DaftCoreComputeSnafu)? - } - #[cfg(feature = "python")] - FileFormatConfig::Database(DatabaseSourceConfig { sql, conn }) => { - let predicate = scan_task + // **************** + // Native CSV Reads + // **************** + FileFormatConfig::Csv(cfg) => { + let schema_of_file = scan_task.schema.clone(); + let col_names = if !cfg.has_headers { + Some( + schema_of_file + .fields + .values() + .map(|f| f.name.as_str()) + .collect::>(), + ) + } else { + None + }; + let convert_options = CsvConvertOptions::new_internal( + scan_task.pushdowns.limit, + file_column_names + .as_ref() + .map(|cols| cols.iter().map(|col| (*col).to_string()).collect()), + col_names + .as_ref() + .map(|cols| cols.iter().map(|col| (*col).to_string()).collect()), + Some(schema_of_file), + scan_task.pushdowns.filters.clone(), + ); + let parse_options = CsvParseOptions::new_with_defaults( + cfg.has_headers, + cfg.delimiter, + cfg.double_quote, + cfg.quote, + cfg.allow_variable_columns, + cfg.escape_char, + cfg.comment, + ) + .context(DaftCSVSnafu)?; + let read_options = CsvReadOptions::new_internal(cfg.buffer_size, cfg.chunk_size); + let uris = urls.collect::>(); + daft_csv::read_csv_bulk( + uris.as_slice(), + Some(convert_options), + Some(parse_options), + Some(read_options), + io_client, + io_stats, + scan_task.storage_config.multithreaded_io, + None, + 8, + ) + .context(DaftCoreComputeSnafu)? + } + + // **************** + // Native JSON Reads + // **************** + FileFormatConfig::Json(cfg) => { + let convert_options = JsonConvertOptions::new_internal( + scan_task.pushdowns.limit, + file_column_names + .as_ref() + .map(|cols| cols.iter().map(|col| (*col).to_string()).collect()), + Some(scan_task.schema.clone()), + scan_task.pushdowns.filters.clone(), + ); + let parse_options = JsonParseOptions::new_internal(); + let read_options = JsonReadOptions::new_internal(cfg.buffer_size, cfg.chunk_size); + let uris = urls.collect::>(); + daft_json::read_json_bulk( + uris.as_slice(), + Some(convert_options), + Some(parse_options), + Some(read_options), + io_client, + io_stats, + scan_task.storage_config.multithreaded_io, + None, + 8, + ) + .context(DaftCoreComputeSnafu)? + } + #[cfg(feature = "python")] + FileFormatConfig::Database(DatabaseSourceConfig { sql, conn }) => { + let predicate = scan_task + .pushdowns + .filters + .as_ref() + .map(|p| (*p.as_ref()).clone().into()); + pyo3::Python::with_gil(|py| { + let table = crate::python::read_sql_into_py_table( + py, + sql, + conn, + predicate.clone(), + scan_task.schema.clone().into(), + scan_task .pushdowns - .filters + .columns .as_ref() - .map(|p| (*p.as_ref()).clone().into()); - pyo3::Python::with_gil(|py| { - let table = crate::python::read_sql_into_py_table( - py, - sql, - conn, - predicate.clone(), - scan_task.schema.clone().into(), - scan_task - .pushdowns - .columns - .as_ref() - .map(|cols| cols.as_ref().clone()), - scan_task.pushdowns.limit, - ) - .map(std::convert::Into::into) - .context(crate::PyIOSnafu)?; - Ok(vec![table]) - })? - } - #[cfg(feature = "python")] - FileFormatConfig::PythonFunction => { - let tables = crate::python::read_pyfunc_into_table_iter(&scan_task)?; - tables.collect::>>()? - } - } + .map(|cols| cols.as_ref().clone()), + scan_task.pushdowns.limit, + ) + .map(std::convert::Into::into) + .context(crate::PyIOSnafu)?; + Ok(vec![table]) + })? + } + #[cfg(feature = "python")] + FileFormatConfig::PythonFunction => { + let tables = crate::python::read_pyfunc_into_table_iter(&scan_task)?; + tables.collect::>>()? } }; @@ -385,11 +382,10 @@ impl MicroPartition { &scan_task.metadata, &scan_task.statistics, scan_task.file_format_config.as_ref(), - scan_task.storage_config.as_ref(), ) { // CASE: ScanTask provides all required metadata. // If the scan_task provides metadata (e.g. retrieved from a catalog) we can use it to create an unloaded MicroPartition - (Some(metadata), Some(statistics), _, _) if scan_task.pushdowns.filters.is_none() => { + (Some(metadata), Some(statistics), _) if scan_task.pushdowns.filters.is_none() => { Ok(Self::new_unloaded( scan_task.clone(), scan_task @@ -414,7 +410,6 @@ impl MicroPartition { chunk_size, .. }), - StorageConfig::Native(cfg), ) => { let uris = scan_task .sources @@ -450,10 +445,15 @@ impl MicroPartition { row_groups, scan_task.pushdowns.filters.clone(), scan_task.partition_spec(), - cfg.io_config.clone().map(Arc::new).unwrap_or_default(), + scan_task + .storage_config + .io_config + .clone() + .map(Arc::new) + .unwrap_or_default(), Some(io_stats), if scan_task.sources.len() == 1 { 1 } else { 128 }, // Hardcoded for to 128 bulk reads - cfg.multithreaded_io, + scan_task.storage_config.multithreaded_io, &ParquetSchemaInferenceOptions { coerce_int96_timestamp_unit, ..Default::default() @@ -1111,14 +1111,7 @@ pub fn read_parquet_into_micropartition>( }) .into(), scan_task_daft_schema, - StorageConfig::Native( - NativeStorageConfig::new_internal( - multithreaded_io, - Some(io_config.as_ref().clone()), - ) - .into(), - ) - .into(), + StorageConfig::new_internal(multithreaded_io, Some(io_config.as_ref().clone())).into(), Pushdowns::new( None, None, diff --git a/src/daft-micropartition/src/python.rs b/src/daft-micropartition/src/python.rs index 06c59c97fd..e062abb2b5 100644 --- a/src/daft-micropartition/src/python.rs +++ b/src/daft-micropartition/src/python.rs @@ -12,7 +12,7 @@ use daft_io::{python::IOConfig, IOStatsContext}; use daft_json::{JsonConvertOptions, JsonParseOptions, JsonReadOptions}; use daft_parquet::read::ParquetSchemaInferenceOptions; use daft_scan::{ - python::pylib::PyScanTask, storage_config::PyStorageConfig, DataSource, ScanTask, ScanTaskRef, + python::pylib::PyScanTask, storage_config::StorageConfig, DataSource, ScanTask, ScanTaskRef, }; use daft_stats::{TableMetadata, TableStatistics}; use daft_table::{python::PyTable, Table}; @@ -534,7 +534,7 @@ impl PyMicroPartition { py: Python, uri: &str, schema: PySchema, - storage_config: PyStorageConfig, + storage_config: StorageConfig, include_columns: Option>, num_rows: Option, ) -> PyResult { @@ -801,7 +801,7 @@ pub fn read_json_into_py_table( py: Python, uri: &str, schema: PySchema, - storage_config: PyStorageConfig, + storage_config: StorageConfig, include_columns: Option>, num_rows: Option, ) -> PyResult { @@ -831,7 +831,7 @@ pub fn read_csv_into_py_table( delimiter: Option, double_quote: bool, schema: PySchema, - storage_config: PyStorageConfig, + storage_config: StorageConfig, include_columns: Option>, num_rows: Option, ) -> PyResult { @@ -863,7 +863,7 @@ pub fn read_parquet_into_py_table( uri: &str, schema: PySchema, coerce_int96_timestamp_unit: PyTimeUnit, - storage_config: PyStorageConfig, + storage_config: StorageConfig, include_columns: Option>, num_rows: Option, ) -> PyResult { diff --git a/src/daft-scan/src/builder.rs b/src/daft-scan/src/builder.rs index 9f51425fbf..b154f20d92 100644 --- a/src/daft-scan/src/builder.rs +++ b/src/daft-scan/src/builder.rs @@ -10,10 +10,7 @@ use daft_schema::{field::Field, schema::SchemaRef}; #[cfg(feature = "python")] use {crate::python::pylib::ScanOperatorHandle, pyo3::prelude::*}; -use crate::{ - glob::GlobScanOperator, - storage_config::{NativeStorageConfig, StorageConfig}, -}; +use crate::{glob::GlobScanOperator, storage_config::StorageConfig}; pub struct ParquetScanBuilder { pub glob_paths: Vec, @@ -109,9 +106,10 @@ impl ParquetScanBuilder { GlobScanOperator::try_new( self.glob_paths, Arc::new(FileFormatConfig::Parquet(cfg)), - Arc::new(StorageConfig::Native(Arc::new( - NativeStorageConfig::new_internal(self.multithreaded, self.io_config), - ))), + Arc::new(StorageConfig::new_internal( + self.multithreaded, + self.io_config, + )), self.infer_schema, self.schema, self.file_path_column, @@ -252,9 +250,7 @@ impl CsvScanBuilder { GlobScanOperator::try_new( self.glob_paths, Arc::new(FileFormatConfig::Csv(cfg)), - Arc::new(StorageConfig::Native(Arc::new( - NativeStorageConfig::new_internal(false, self.io_config), - ))), + Arc::new(StorageConfig::new_internal(false, self.io_config)), self.infer_schema, self.schema, self.file_path_column, @@ -273,25 +269,22 @@ pub fn delta_scan>( io_config: Option, multithreaded_io: bool, ) -> DaftResult { - use crate::storage_config::{NativeStorageConfig, PyStorageConfig, StorageConfig}; + use crate::storage_config::StorageConfig; Python::with_gil(|py| { let io_config = io_config.unwrap_or_default(); - let native_storage_config = NativeStorageConfig { + let storage_config = StorageConfig { io_config: Some(io_config), multithreaded_io, }; - let py_storage_config: PyStorageConfig = - Arc::new(StorageConfig::Native(Arc::new(native_storage_config))).into(); - // let py_io_config = PyIOConfig { config: io_config }; let delta_lake_scan = PyModule::import_bound(py, "daft.delta_lake.delta_lake_scan")?; let delta_lake_scan_operator = delta_lake_scan.getattr(pyo3::intern!(py, "DeltaLakeScanOperator"))?; let delta_lake_operator = delta_lake_scan_operator - .call1((glob_path.as_ref(), py_storage_config))? + .call1((glob_path.as_ref(), storage_config))? .to_object(py); let scan_operator_handle = ScanOperatorHandle::from_python_scan_operator(delta_lake_operator, py)?; diff --git a/src/daft-scan/src/lib.rs b/src/daft-scan/src/lib.rs index 8070d7b627..2f984dc213 100644 --- a/src/daft-scan/src/lib.rs +++ b/src/daft-scan/src/lib.rs @@ -840,11 +840,7 @@ mod test { use daft_schema::{schema::Schema, time_unit::TimeUnit}; use itertools::Itertools; - use crate::{ - glob::GlobScanOperator, - storage_config::{NativeStorageConfig, StorageConfig}, - DataSource, ScanTask, - }; + use crate::{glob::GlobScanOperator, storage_config::StorageConfig, DataSource, ScanTask}; fn make_scan_task(num_sources: usize) -> ScanTask { let sources = (0..num_sources) @@ -871,9 +867,7 @@ mod test { sources, Arc::new(file_format_config), Arc::new(Schema::empty()), - Arc::new(StorageConfig::Native(Arc::new( - NativeStorageConfig::new_internal(false, None), - ))), + Arc::new(StorageConfig::new_internal(false, None)), Pushdowns::default(), None, ) @@ -896,9 +890,7 @@ mod test { let glob_scan_operator: GlobScanOperator = GlobScanOperator::try_new( sources, Arc::new(file_format_config), - Arc::new(StorageConfig::Native(Arc::new( - NativeStorageConfig::new_internal(false, None), - ))), + Arc::new(StorageConfig::new_internal(false, None)), false, Some(Arc::new(Schema::empty())), None, diff --git a/src/daft-scan/src/python.rs b/src/daft-scan/src/python.rs index 29df2fa90d..d9588d80a2 100644 --- a/src/daft-scan/src/python.rs +++ b/src/daft-scan/src/python.rs @@ -4,7 +4,7 @@ use common_py_serde::{deserialize_py_object, serialize_py_object}; use pyo3::{prelude::*, types::PyTuple}; use serde::{Deserialize, Serialize}; -use crate::storage_config::{NativeStorageConfig, PyStorageConfig}; +use crate::storage_config::StorageConfig; #[derive(Debug, Clone, Serialize, Deserialize)] struct PyObjectSerializableWrapper( @@ -87,9 +87,7 @@ pub mod pylib { use super::PythonTablesFactoryArgs; use crate::{ - anonymous::AnonymousScanOperator, - glob::GlobScanOperator, - storage_config::{PyStorageConfig, StorageConfig}, + anonymous::AnonymousScanOperator, glob::GlobScanOperator, storage_config::StorageConfig, DataSource, ScanTask, }; #[pyclass(module = "daft.daft", frozen)] @@ -110,7 +108,7 @@ pub mod pylib { files: Vec, schema: PySchema, file_format_config: PyFileFormatConfig, - storage_config: PyStorageConfig, + storage_config: StorageConfig, ) -> PyResult { py.allow_threads(|| { let schema = schema.schema; @@ -132,7 +130,7 @@ pub mod pylib { py: Python, glob_path: Vec, file_format_config: PyFileFormatConfig, - storage_config: PyStorageConfig, + storage_config: StorageConfig, hive_partitioning: bool, infer_schema: bool, schema: Option, @@ -346,7 +344,7 @@ pub mod pylib { file: String, file_format: PyFileFormatConfig, schema: PySchema, - storage_config: PyStorageConfig, + storage_config: StorageConfig, num_rows: Option, size_bytes: Option, iceberg_delete_files: Option>, @@ -429,7 +427,7 @@ pub mod pylib { vec![data_source], file_format.into(), schema.schema, - Arc::new(StorageConfig::Native(Arc::new(Default::default()))), // read SQL doesn't actually use the storage config + Arc::new(Default::default()), // read SQL doesn't actually use the storage config pushdowns.map(|p| p.0.as_ref().clone()).unwrap_or_default(), None, ); @@ -472,9 +470,7 @@ pub mod pylib { schema.schema, // HACK: StorageConfig isn't used when running the Python function but this is a non-optional arg for // ScanTask creation, so we just put in a placeholder here - Arc::new(crate::storage_config::StorageConfig::Native(Arc::new( - Default::default(), - ))), + Arc::new(Default::default()), pushdowns.map(|p| p.0.as_ref().clone()).unwrap_or_default(), None, ); @@ -562,9 +558,7 @@ pub mod pylib { vec![data_source], Arc::new(FileFormatConfig::Parquet(default::Default::default())), Arc::new(schema), - Arc::new(crate::storage_config::StorageConfig::Native(Arc::new( - default::Default::default(), - ))), + Arc::new(Default::default()), Pushdowns::new(None, None, columns.map(Arc::new), None), None, ); @@ -573,8 +567,7 @@ pub mod pylib { } pub fn register_modules(parent: &Bound) -> PyResult<()> { - parent.add_class::()?; - parent.add_class::()?; + parent.add_class::()?; parent.add_class::()?; parent.add_class::()?; diff --git a/src/daft-scan/src/scan_task_iters.rs b/src/daft-scan/src/scan_task_iters.rs index 3ee2a18ccd..8c38c1ecec 100644 --- a/src/daft-scan/src/scan_task_iters.rs +++ b/src/daft-scan/src/scan_task_iters.rs @@ -8,9 +8,7 @@ use daft_io::IOStatsContext; use daft_parquet::read::read_parquet_metadata; use parquet2::metadata::RowGroupList; -use crate::{ - storage_config::StorageConfig, ChunkSpec, DataSource, Pushdowns, ScanTask, ScanTaskRef, -}; +use crate::{ChunkSpec, DataSource, Pushdowns, ScanTask, ScanTaskRef}; pub(crate) type BoxScanTaskIter<'a> = Box> + 'a>; @@ -206,13 +204,11 @@ pub(crate) fn split_by_row_groups( FileFormatConfig::Parquet(ParquetSourceConfig { field_id_mapping, .. }), - StorageConfig::Native(_), [source], Some(None), None, ) = ( t.file_format_config.as_ref(), - t.storage_config.as_ref(), &t.sources[..], t.sources.first().map(DataSource::get_chunk_spec), t.pushdowns.limit, diff --git a/src/daft-scan/src/storage_config.rs b/src/daft-scan/src/storage_config.rs index f72f2521a3..b2ab043e57 100644 --- a/src/daft-scan/src/storage_config.rs +++ b/src/daft-scan/src/storage_config.rs @@ -9,59 +9,20 @@ use serde::{Deserialize, Serialize}; #[cfg(feature = "python")] use { common_io_config::python, - pyo3::{pyclass, pymethods, IntoPy, PyObject, PyResult, Python}, + pyo3::{pyclass, pymethods, PyObject, PyResult, Python}, std::hash::Hash, }; /// Configuration for interacting with a particular storage backend, using a particular /// I/O layer implementation. #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Hash)] -pub enum StorageConfig { - Native(Arc), -} - -impl StorageConfig { - pub fn get_io_client_and_runtime(&self) -> DaftResult<(RuntimeRef, Arc)> { - // Grab an IOClient and Runtime - // TODO: This should be cleaned up and hidden behind a better API from daft-io - match self { - Self::Native(cfg) => { - let multithreaded_io = cfg.multithreaded_io; - Ok(( - get_io_runtime(multithreaded_io), - get_io_client( - multithreaded_io, - Arc::new(cfg.io_config.clone().unwrap_or_default()), - )?, - )) - } - } - } - - #[must_use] - pub fn var_name(&self) -> &'static str { - match self { - Self::Native(_) => "Native", - } - } - - #[must_use] - pub fn multiline_display(&self) -> Vec { - match self { - Self::Native(source) => source.multiline_display(), - } - } -} - -/// Storage configuration for the Rust-native I/O layer. -#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Hash)] #[cfg_attr(feature = "python", pyclass(module = "daft.daft"))] -pub struct NativeStorageConfig { +pub struct StorageConfig { pub io_config: Option, pub multithreaded_io: bool, } -impl NativeStorageConfig { +impl StorageConfig { #[must_use] pub fn new_internal(multithreaded_io: bool, io_config: Option) -> Self { Self { @@ -70,6 +31,19 @@ impl NativeStorageConfig { } } + pub fn get_io_client_and_runtime(&self) -> DaftResult<(RuntimeRef, Arc)> { + // Grab an IOClient and Runtime + // TODO: This should be cleaned up and hidden behind a better API from daft-io + let multithreaded_io = self.multithreaded_io; + Ok(( + get_io_runtime(multithreaded_io), + get_io_client( + multithreaded_io, + Arc::new(self.io_config.clone().unwrap_or_default()), + )?, + )) + } + #[must_use] pub fn multiline_display(&self) -> Vec { let mut res = vec![]; @@ -84,7 +58,7 @@ impl NativeStorageConfig { } } -impl Default for NativeStorageConfig { +impl Default for StorageConfig { fn default() -> Self { Self::new_internal(true, None) } @@ -92,7 +66,7 @@ impl Default for NativeStorageConfig { #[cfg(feature = "python")] #[pymethods] -impl NativeStorageConfig { +impl StorageConfig { #[new] #[must_use] pub fn new(multithreaded_io: bool, io_config: Option) -> Self { @@ -112,45 +86,4 @@ impl NativeStorageConfig { } } -/// A Python-exposed interface for storage configs. -#[derive(Clone, Debug, Serialize, Deserialize)] -#[serde(transparent)] -#[cfg_attr( - feature = "python", - pyclass(module = "daft.daft", name = "StorageConfig") -)] -pub struct PyStorageConfig(Arc); - -#[cfg(feature = "python")] -#[pymethods] -impl PyStorageConfig { - /// Create from a native storage config. - #[staticmethod] - fn native(config: NativeStorageConfig) -> Self { - Self(Arc::new(StorageConfig::Native(config.into()))) - } - - /// Get the underlying storage config. - #[getter] - fn get_config(&self, py: Python) -> PyObject { - use StorageConfig::Native; - - match self.0.as_ref() { - Native(config) => config.as_ref().clone().into_py(py), - } - } -} - -impl_bincode_py_state_serialization!(PyStorageConfig); - -impl From for Arc { - fn from(value: PyStorageConfig) -> Self { - value.0 - } -} - -impl From> for PyStorageConfig { - fn from(value: Arc) -> Self { - Self(value) - } -} +impl_bincode_py_state_serialization!(StorageConfig);