Skip to content

Commit

Permalink
Merge branch 'main' of github.com:apache/datafusion into type-class
Browse files Browse the repository at this point in the history
  • Loading branch information
jayzhan211 committed Nov 13, 2024
2 parents 6b1e08a + 2a2de82 commit afa23df
Show file tree
Hide file tree
Showing 66 changed files with 803 additions and 310 deletions.
44 changes: 22 additions & 22 deletions datafusion/core/src/catalog_common/information_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,7 @@ impl SchemaProvider for InformationSchemaProvider {
};

Ok(Some(Arc::new(
StreamingTable::try_new(table.schema().clone(), vec![table]).unwrap(),
StreamingTable::try_new(Arc::clone(table.schema()), vec![table]).unwrap(),
)))
}

Expand Down Expand Up @@ -526,7 +526,7 @@ impl InformationSchemaTables {
schema_names: StringBuilder::new(),
table_names: StringBuilder::new(),
table_types: StringBuilder::new(),
schema: self.schema.clone(),
schema: Arc::clone(&self.schema),
}
}
}
Expand All @@ -540,7 +540,7 @@ impl PartitionStream for InformationSchemaTables {
let mut builder = self.builder();
let config = self.config.clone();
Box::pin(RecordBatchStreamAdapter::new(
self.schema.clone(),
Arc::clone(&self.schema),
// TODO: Stream this
futures::stream::once(async move {
config.make_tables(&mut builder).await?;
Expand Down Expand Up @@ -582,7 +582,7 @@ impl InformationSchemaTablesBuilder {

fn finish(&mut self) -> RecordBatch {
RecordBatch::try_new(
self.schema.clone(),
Arc::clone(&self.schema),
vec![
Arc::new(self.catalog_names.finish()),
Arc::new(self.schema_names.finish()),
Expand Down Expand Up @@ -618,7 +618,7 @@ impl InformationSchemaViews {
schema_names: StringBuilder::new(),
table_names: StringBuilder::new(),
definitions: StringBuilder::new(),
schema: self.schema.clone(),
schema: Arc::clone(&self.schema),
}
}
}
Expand All @@ -632,7 +632,7 @@ impl PartitionStream for InformationSchemaViews {
let mut builder = self.builder();
let config = self.config.clone();
Box::pin(RecordBatchStreamAdapter::new(
self.schema.clone(),
Arc::clone(&self.schema),
// TODO: Stream this
futures::stream::once(async move {
config.make_views(&mut builder).await?;
Expand Down Expand Up @@ -670,7 +670,7 @@ impl InformationSchemaViewBuilder {

fn finish(&mut self) -> RecordBatch {
RecordBatch::try_new(
self.schema.clone(),
Arc::clone(&self.schema),
vec![
Arc::new(self.catalog_names.finish()),
Arc::new(self.schema_names.finish()),
Expand Down Expand Up @@ -733,7 +733,7 @@ impl InformationSchemaColumns {
numeric_scales: UInt64Builder::with_capacity(default_capacity),
datetime_precisions: UInt64Builder::with_capacity(default_capacity),
interval_types: StringBuilder::new(),
schema: self.schema.clone(),
schema: Arc::clone(&self.schema),
}
}
}
Expand All @@ -747,7 +747,7 @@ impl PartitionStream for InformationSchemaColumns {
let mut builder = self.builder();
let config = self.config.clone();
Box::pin(RecordBatchStreamAdapter::new(
self.schema.clone(),
Arc::clone(&self.schema),
// TODO: Stream this
futures::stream::once(async move {
config.make_columns(&mut builder).await?;
Expand Down Expand Up @@ -876,7 +876,7 @@ impl InformationSchemaColumnsBuilder {

fn finish(&mut self) -> RecordBatch {
RecordBatch::try_new(
self.schema.clone(),
Arc::clone(&self.schema),
vec![
Arc::new(self.catalog_names.finish()),
Arc::new(self.schema_names.finish()),
Expand Down Expand Up @@ -921,7 +921,7 @@ impl InformationSchemata {

fn builder(&self) -> InformationSchemataBuilder {
InformationSchemataBuilder {
schema: self.schema.clone(),
schema: Arc::clone(&self.schema),
catalog_name: StringBuilder::new(),
schema_name: StringBuilder::new(),
schema_owner: StringBuilder::new(),
Expand Down Expand Up @@ -967,7 +967,7 @@ impl InformationSchemataBuilder {

fn finish(&mut self) -> RecordBatch {
RecordBatch::try_new(
self.schema.clone(),
Arc::clone(&self.schema),
vec![
Arc::new(self.catalog_name.finish()),
Arc::new(self.schema_name.finish()),
Expand All @@ -991,7 +991,7 @@ impl PartitionStream for InformationSchemata {
let mut builder = self.builder();
let config = self.config.clone();
Box::pin(RecordBatchStreamAdapter::new(
self.schema.clone(),
Arc::clone(&self.schema),
// TODO: Stream this
futures::stream::once(async move {
config.make_schemata(&mut builder).await;
Expand Down Expand Up @@ -1023,7 +1023,7 @@ impl InformationSchemaDfSettings {
names: StringBuilder::new(),
values: StringBuilder::new(),
descriptions: StringBuilder::new(),
schema: self.schema.clone(),
schema: Arc::clone(&self.schema),
}
}
}
Expand All @@ -1037,7 +1037,7 @@ impl PartitionStream for InformationSchemaDfSettings {
let config = self.config.clone();
let mut builder = self.builder();
Box::pin(RecordBatchStreamAdapter::new(
self.schema.clone(),
Arc::clone(&self.schema),
// TODO: Stream this
futures::stream::once(async move {
// create a mem table with the names of tables
Expand All @@ -1064,7 +1064,7 @@ impl InformationSchemaDfSettingsBuilder {

fn finish(&mut self) -> RecordBatch {
RecordBatch::try_new(
self.schema.clone(),
Arc::clone(&self.schema),
vec![
Arc::new(self.names.finish()),
Arc::new(self.values.finish()),
Expand Down Expand Up @@ -1102,7 +1102,7 @@ impl InformationSchemaRoutines {

fn builder(&self) -> InformationSchemaRoutinesBuilder {
InformationSchemaRoutinesBuilder {
schema: self.schema.clone(),
schema: Arc::clone(&self.schema),
specific_catalog: StringBuilder::new(),
specific_schema: StringBuilder::new(),
specific_name: StringBuilder::new(),
Expand Down Expand Up @@ -1161,7 +1161,7 @@ impl InformationSchemaRoutinesBuilder {

fn finish(&mut self) -> RecordBatch {
RecordBatch::try_new(
self.schema.clone(),
Arc::clone(&self.schema),
vec![
Arc::new(self.specific_catalog.finish()),
Arc::new(self.specific_schema.finish()),
Expand Down Expand Up @@ -1189,7 +1189,7 @@ impl PartitionStream for InformationSchemaRoutines {
let config = self.config.clone();
let mut builder = self.builder();
Box::pin(RecordBatchStreamAdapter::new(
self.schema.clone(),
Arc::clone(&self.schema),
futures::stream::once(async move {
config.make_routines(
ctx.scalar_functions(),
Expand Down Expand Up @@ -1229,7 +1229,7 @@ impl InformationSchemaParameters {

fn builder(&self) -> InformationSchemaParametersBuilder {
InformationSchemaParametersBuilder {
schema: self.schema.clone(),
schema: Arc::clone(&self.schema),
specific_catalog: StringBuilder::new(),
specific_schema: StringBuilder::new(),
specific_name: StringBuilder::new(),
Expand Down Expand Up @@ -1295,7 +1295,7 @@ impl InformationSchemaParametersBuilder {

fn finish(&mut self) -> RecordBatch {
RecordBatch::try_new(
self.schema.clone(),
Arc::clone(&self.schema),
vec![
Arc::new(self.specific_catalog.finish()),
Arc::new(self.specific_schema.finish()),
Expand All @@ -1321,7 +1321,7 @@ impl PartitionStream for InformationSchemaParameters {
let config = self.config.clone();
let mut builder = self.builder();
Box::pin(RecordBatchStreamAdapter::new(
self.schema.clone(),
Arc::clone(&self.schema),
futures::stream::once(async move {
config.make_parameters(
ctx.scalar_functions(),
Expand Down
5 changes: 3 additions & 2 deletions datafusion/core/src/catalog_common/listing_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,8 @@ impl ListingSchemaProvider {
},
)
.await?;
let _ = self.register_table(table_name.to_string(), provider.clone())?;
let _ =
self.register_table(table_name.to_string(), Arc::clone(&provider))?;
}
}
Ok(())
Expand Down Expand Up @@ -190,7 +191,7 @@ impl SchemaProvider for ListingSchemaProvider {
self.tables
.lock()
.expect("Can't lock tables")
.insert(name, table.clone());
.insert(name, Arc::clone(&table));
Ok(Some(table))
}

Expand Down
6 changes: 3 additions & 3 deletions datafusion/core/src/catalog_common/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ impl CatalogProviderList for MemoryCatalogProviderList {
}

fn catalog(&self, name: &str) -> Option<Arc<dyn CatalogProvider>> {
self.catalogs.get(name).map(|c| c.value().clone())
self.catalogs.get(name).map(|c| Arc::clone(c.value()))
}
}

Expand Down Expand Up @@ -102,7 +102,7 @@ impl CatalogProvider for MemoryCatalogProvider {
}

fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
self.schemas.get(name).map(|s| s.value().clone())
self.schemas.get(name).map(|s| Arc::clone(s.value()))
}

fn register_schema(
Expand Down Expand Up @@ -175,7 +175,7 @@ impl SchemaProvider for MemorySchemaProvider {
&self,
name: &str,
) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>, DataFusionError> {
Ok(self.tables.get(name).map(|table| table.value().clone()))
Ok(self.tables.get(name).map(|table| Arc::clone(table.value())))
}

fn register_table(
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/datasource/avro_to_arrow/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ impl<'a, R: Read> Reader<'a, R> {
Ok(Self {
array_reader: AvroArrowArrayReader::try_new(
reader,
schema.clone(),
Arc::clone(&schema),
projection,
)?,
schema,
Expand All @@ -153,7 +153,7 @@ impl<'a, R: Read> Reader<'a, R> {
/// Returns the schema of the reader, useful for getting the schema without reading
/// record batches
pub fn schema(&self) -> SchemaRef {
self.schema.clone()
Arc::clone(&self.schema)
}
}

Expand Down
16 changes: 12 additions & 4 deletions datafusion/core/src/datasource/cte_worktable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@ use crate::datasource::{TableProvider, TableType};
#[derive(Debug)]
pub struct CteWorkTable {
/// The name of the CTE work table
// WIP, see https://github.com/apache/datafusion/issues/462
#[allow(dead_code)]
name: String,
/// This schema must be shared across both the static and recursive terms of a recursive query
table_schema: SchemaRef,
Expand All @@ -56,6 +54,16 @@ impl CteWorkTable {
table_schema,
}
}

/// The user-provided name of the CTE
pub fn name(&self) -> &str {
&self.name
}

/// The schema of the recursive term of the query
pub fn schema(&self) -> SchemaRef {
Arc::clone(&self.table_schema)
}
}

#[async_trait]
Expand All @@ -69,7 +77,7 @@ impl TableProvider for CteWorkTable {
}

fn schema(&self) -> SchemaRef {
self.table_schema.clone()
Arc::clone(&self.table_schema)
}

fn table_type(&self) -> TableType {
Expand All @@ -86,7 +94,7 @@ impl TableProvider for CteWorkTable {
// TODO: pushdown filters and limits
Ok(Arc::new(WorkTableExec::new(
self.name.clone(),
self.table_schema.clone(),
Arc::clone(&self.table_schema),
)))
}

Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/default_table_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ pub fn source_as_provider(
.as_any()
.downcast_ref::<DefaultTableSource>()
{
Some(source) => Ok(source.table_provider.clone()),
Some(source) => Ok(Arc::clone(&source.table_provider)),
_ => internal_err!("TableSource was not DefaultTableSource"),
}
}
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/empty.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ impl TableProvider for EmptyTable {
}

fn schema(&self) -> SchemaRef {
self.schema.clone()
Arc::clone(&self.schema)
}

fn table_type(&self) -> TableType {
Expand Down
6 changes: 3 additions & 3 deletions datafusion/core/src/datasource/file_format/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ impl FileFormat for ArrowFormat {
return not_impl_err!("Overwrites are not implemented yet for Arrow format");
}

let sink_schema = conf.output_schema().clone();
let sink_schema = Arc::clone(conf.output_schema());
let sink = Arc::new(ArrowFileSink::new(conf));

Ok(Arc::new(DataSinkExec::new(
Expand Down Expand Up @@ -229,7 +229,7 @@ impl ArrowFileSink {
.collect::<Vec<_>>(),
))
} else {
self.config.output_schema().clone()
Arc::clone(self.config.output_schema())
}
}
}
Expand Down Expand Up @@ -302,7 +302,7 @@ impl DataSink for ArrowFileSink {
let mut object_store_writer = create_writer(
FileCompressionType::UNCOMPRESSED,
&path,
object_store.clone(),
Arc::clone(&object_store),
)
.await?;
file_write_tasks.spawn(async move {
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ impl FileFormat for CsvFormat {

let writer_options = CsvWriterOptions::try_from(&options)?;

let sink_schema = conf.output_schema().clone();
let sink_schema = Arc::clone(conf.output_schema());
let sink = Arc::new(CsvSink::new(conf, writer_options));

Ok(Arc::new(DataSinkExec::new(
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/file_format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ impl FileFormat for JsonFormat {

let writer_options = JsonWriterOptions::try_from(&self.options)?;

let sink_schema = conf.output_schema().clone();
let sink_schema = Arc::clone(conf.output_schema());
let sink = Arc::new(JsonSink::new(conf, writer_options));

Ok(Arc::new(DataSinkExec::new(
Expand Down
Loading

0 comments on commit afa23df

Please sign in to comment.