Skip to content

Commit

Permalink
unify entry point for reader
Browse files Browse the repository at this point in the history
Signed-off-by: xxchan <xxchan22f@gmail.com>
  • Loading branch information
xxchan committed Jan 10, 2025
1 parent 5bde2a9 commit 0ea38c8
Show file tree
Hide file tree
Showing 25 changed files with 150 additions and 167 deletions.
25 changes: 25 additions & 0 deletions src/connector/src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,31 @@ macro_rules! impl_connector_properties {
};
Ok(enumerator)
}


pub async fn create_split_reader(
self,
splits: Vec<SplitImpl>,
parser_config: ParserConfig,
source_ctx: SourceContextRef,
columns: Option<Vec<Column>>,
mut opt: $crate::source::CreateSplitReaderOpt,
) -> Result<(BoxSourceChunkStream, $crate::source::CreateSplitReaderResult)> {
opt.support_multiple_splits = self.support_multiple_splits();
tracing::debug!(
?splits,
support_multiple_splits = opt.support_multiple_splits,
"spawning connector split reader",
);

match self {
$(
ConnectorProperties::$variant_name(prop) => {
$crate::source::create_split_readers(*prop, splits, parser_config, source_ctx, columns, opt).await
}
)*
}
}
}
}

Expand Down
66 changes: 62 additions & 4 deletions src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ use async_trait::async_trait;
use aws_sdk_s3::types::Object;
use bytes::Bytes;
use enum_as_inner::EnumAsInner;
use futures::future::try_join_all;
use futures::stream::BoxStream;
use futures::Stream;
use futures::{Stream, StreamExt};
use itertools::Itertools;
use risingwave_common::array::StreamChunk;
use risingwave_common::bail;
Expand All @@ -31,6 +32,7 @@ use risingwave_common::types::{JsonbVal, Scalar};
use risingwave_pb::catalog::{PbSource, PbStreamSourceInfo};
use risingwave_pb::plan_common::ExternalTableDesc;
use risingwave_pb::source::ConnectorSplit;
use rw_futures_util::select_all;
use serde::de::DeserializeOwned;
use serde_json::json;
use tokio::sync::mpsc;
Expand Down Expand Up @@ -118,15 +120,71 @@ impl<P: DeserializeOwned + UnknownFields> TryFromBTreeMap for P {
}
}

pub async fn create_split_reader<P: SourceProperties>(
#[derive(Default)]
pub struct CreateSplitReaderOpt {
pub support_multiple_splits: bool,
pub seek_to_latest: bool,
}

#[derive(Default)]
pub struct CreateSplitReaderResult {
pub latest_splits: Option<Vec<SplitImpl>>,
pub backfill_info: HashMap<SplitId, BackfillInfo>,
}

pub async fn create_split_readers<P: SourceProperties>(
prop: P,
splits: Vec<SplitImpl>,
parser_config: ParserConfig,
source_ctx: SourceContextRef,
columns: Option<Vec<Column>>,
) -> Result<P::SplitReader> {
opt: CreateSplitReaderOpt,
) -> Result<(BoxSourceChunkStream, CreateSplitReaderResult)> {
let splits = splits.into_iter().map(P::Split::try_from).try_collect()?;
P::SplitReader::new(prop, splits, parser_config, source_ctx, columns).await
let mut res = CreateSplitReaderResult {
backfill_info: HashMap::new(),
latest_splits: None,
};
if opt.support_multiple_splits {
let mut reader = P::SplitReader::new(
prop.clone(),
splits,
parser_config.clone(),
source_ctx.clone(),
columns.clone(),
)
.await?;
if opt.seek_to_latest {
res.latest_splits = Some(reader.seek_to_latest().await?);
}
res.backfill_info = reader.backfill_info();
Ok((reader.into_stream().boxed(), res))
} else {
let mut readers = try_join_all(splits.into_iter().map(|split| {
// TODO: is this reader split across multiple threads...? Realistically, we want
// source_ctx to live in a single actor.
P::SplitReader::new(
prop.clone(),
vec![split],
parser_config.clone(),
source_ctx.clone(),
columns.clone(),
)
}))
.await?;
if opt.seek_to_latest {
let mut latest_splits = vec![];
for reader in &mut readers {
latest_splits.extend(reader.seek_to_latest().await?);
}
res.latest_splits = Some(latest_splits);
}
res.backfill_info = readers.iter().flat_map(|r| r.backfill_info()).collect();
Ok((
select_all(readers.into_iter().map(|r| r.into_stream())).boxed(),
res,
))
}
}

/// [`SplitEnumerator`] fetches the split metadata from the external source service.
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/source/cdc/enumerator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::source::cdc::{
table_schema_exclude_additional_columns, CdcProperties, CdcSourceTypeTrait, Citus,
DebeziumCdcSplit, Mongodb, Mysql, Postgres, SqlServer,
};
use crate::source::{SourceEnumeratorContextRef, SplitEnumerator};
use crate::source::SplitEnumerator;

pub const DATABASE_SERVERS_KEY: &str = "database.servers";

Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/source/cdc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ where
self,
context: crate::source::SourceEnumeratorContextRef,
) -> crate::error::ConnectorResult<Self::SplitEnumerator> {
Ok(DebeziumSplitEnumerator::new(self, context).await?)
DebeziumSplitEnumerator::new(self, context).await
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/connector/src/source/datagen/enumerator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@ use anyhow::Context;
use async_trait::async_trait;

use crate::source::datagen::{DatagenProperties, DatagenSplit};
use crate::source::{SourceEnumeratorContextRef, SplitEnumerator};
use crate::source::SplitEnumerator;

#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub struct DatagenSplitEnumerator {
split_num: i32,
}

impl DatagenSplitEnumerator {
pub async fn new(
pub fn new(
properties: DatagenProperties,
_context: crate::source::SourceEnumeratorContextRef,
) -> crate::error::ConnectorResult<DatagenSplitEnumerator> {
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/source/datagen/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ impl SourceProperties for DatagenProperties {
self,
context: crate::source::SourceEnumeratorContextRef,
) -> crate::error::ConnectorResult<Self::SplitEnumerator> {
DatagenSplitEnumerator::new(self, context).await
DatagenSplitEnumerator::new(self, context)
}
}

Expand Down
8 changes: 4 additions & 4 deletions src/connector/src/source/filesystem/opendal_source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ impl SourceProperties for GcsProperties {
self,
context: crate::source::SourceEnumeratorContextRef,
) -> crate::error::ConnectorResult<Self::SplitEnumerator> {
OpendalEnumerator::new(self, context).await
OpendalEnumerator::new(self, context)
}
}

Expand Down Expand Up @@ -167,7 +167,7 @@ impl SourceProperties for OpendalS3Properties {
self,
context: crate::source::SourceEnumeratorContextRef,
) -> crate::error::ConnectorResult<Self::SplitEnumerator> {
OpendalEnumerator::new(self, context).await
OpendalEnumerator::new(self, context)
}
}

Expand Down Expand Up @@ -207,7 +207,7 @@ impl SourceProperties for PosixFsProperties {
self,
context: crate::source::SourceEnumeratorContextRef,
) -> crate::error::ConnectorResult<Self::SplitEnumerator> {
OpendalEnumerator::new(self, context).await
OpendalEnumerator::new(self, context)
}
}

Expand Down Expand Up @@ -253,7 +253,7 @@ impl SourceProperties for AzblobProperties {
self,
context: crate::source::SourceEnumeratorContextRef,
) -> crate::error::ConnectorResult<Self::SplitEnumerator> {
OpendalEnumerator::new(self, context).await
OpendalEnumerator::new(self, context)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use super::OpendalSource;
use crate::error::ConnectorResult;
use crate::source::filesystem::file_common::CompressionFormat;
use crate::source::filesystem::{FsPageItem, OpendalFsSplit};
use crate::source::{SourceEnumeratorContextRef, SplitEnumerator};
use crate::source::SplitEnumerator;

#[derive(Debug, Clone)]
pub struct OpendalEnumerator<Src: OpendalSource> {
Expand All @@ -39,7 +39,7 @@ pub struct OpendalEnumerator<Src: OpendalSource> {
}

impl<Src: OpendalSource> OpendalEnumerator<Src> {
pub async fn new(
pub fn new(
properties: Src::Properties,
_context: crate::source::SourceEnumeratorContextRef,
) -> ConnectorResult<Self> {
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/source/filesystem/s3/enumerator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use crate::aws_utils::{default_conn_config, s3_client};
use crate::connector_common::AwsAuthProps;
use crate::source::filesystem::file_common::FsSplit;
use crate::source::filesystem::s3::S3Properties;
use crate::source::{FsListInner, SourceEnumeratorContextRef, SplitEnumerator};
use crate::source::{FsListInner, SplitEnumerator};

/// Get the prefix from a glob
pub fn get_prefix(glob: &str) -> String {
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/source/filesystem/s3/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ impl SourceProperties for S3Properties {
self,
context: crate::source::SourceEnumeratorContextRef,
) -> crate::error::ConnectorResult<Self::SplitEnumerator> {
Ok(S3SplitEnumerator::new(self, context).await?)
S3SplitEnumerator::new(self, context).await
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ use crate::error::ConnectorResult;
use crate::source::base::SplitEnumerator;
use crate::source::google_pubsub::split::PubsubSplit;
use crate::source::google_pubsub::PubsubProperties;
use crate::source::SourceEnumeratorContextRef;

pub struct PubsubSplitEnumerator {
subscription: String,
Expand Down
8 changes: 4 additions & 4 deletions src/connector/src/source/iceberg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ use crate::connector_common::IcebergCommon;
use crate::error::{ConnectorError, ConnectorResult};
use crate::parser::ParserConfig;
use crate::source::{
BoxSourceChunkStream, Column, SourceContextRef, SourceEnumeratorContextRef, SourceProperties,
SplitEnumerator, SplitId, SplitMetaData, SplitReader, UnknownFields,
BoxSourceChunkStream, Column, SourceContextRef, SourceProperties, SplitEnumerator, SplitId,
SplitMetaData, SplitReader, UnknownFields,
};
pub const ICEBERG_CONNECTOR: &str = "iceberg";

Expand Down Expand Up @@ -115,7 +115,7 @@ impl SourceProperties for IcebergProperties {
self,
context: crate::source::SourceEnumeratorContextRef,
) -> crate::error::ConnectorResult<Self::SplitEnumerator> {
IcebergSplitEnumerator::new(self, context).await
IcebergSplitEnumerator::new(self, context)
}
}

Expand Down Expand Up @@ -256,7 +256,7 @@ pub struct IcebergSplitEnumerator {
}

impl IcebergSplitEnumerator {
pub async fn new(
pub fn new(
properties: IcebergProperties,
context: crate::source::SourceEnumeratorContextRef,
) -> ConnectorResult<Self> {
Expand Down
1 change: 0 additions & 1 deletion src/connector/src/source/kafka/enumerator/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ use crate::source::kafka::{
KafkaConnectionProps, KafkaContextCommon, KafkaProperties, RwConsumerContext,
KAFKA_ISOLATION_LEVEL,
};
use crate::source::SourceEnumeratorContextRef;

type KafkaClientType = BaseConsumer<RwConsumerContext>;

Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/source/kinesis/enumerator/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use risingwave_common::bail;
use crate::error::ConnectorResult as Result;
use crate::source::kinesis::split::KinesisOffset;
use crate::source::kinesis::*;
use crate::source::{SourceEnumeratorContextRef, SplitEnumerator};
use crate::source::SplitEnumerator;

pub struct KinesisSplitEnumerator {
stream_name: String,
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/source/mqtt/enumerator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use tokio::sync::RwLock;
use super::source::MqttSplit;
use super::MqttProperties;
use crate::error::ConnectorResult;
use crate::source::{SourceEnumeratorContextRef, SplitEnumerator};
use crate::source::SplitEnumerator;

pub struct MqttSplitEnumerator {
#[expect(dead_code)]
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/source/nats/enumerator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use risingwave_common::bail;
use super::source::{NatsOffset, NatsSplit};
use super::NatsProperties;
use crate::error::ConnectorResult;
use crate::source::{SourceEnumeratorContextRef, SplitEnumerator, SplitId};
use crate::source::{SplitEnumerator, SplitId};

#[derive(Debug, Clone)]
pub struct NatsSplitEnumerator {
Expand Down
6 changes: 2 additions & 4 deletions src/connector/src/source/nexmark/enumerator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,14 @@ use async_trait::async_trait;
use crate::error::ConnectorResult;
use crate::source::nexmark::split::NexmarkSplit;
use crate::source::nexmark::NexmarkProperties;
use crate::source::{SourceEnumeratorContextRef, SplitEnumerator};
use crate::source::SplitEnumerator;

pub struct NexmarkSplitEnumerator {
split_num: i32,
}

impl NexmarkSplitEnumerator {}

impl NexmarkSplitEnumerator {
pub async fn new(
pub fn new(
properties: NexmarkProperties,
_context: crate::source::SourceEnumeratorContextRef,
) -> ConnectorResult<NexmarkSplitEnumerator> {
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/source/nexmark/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ impl SourceProperties for NexmarkProperties {
self,
context: crate::source::SourceEnumeratorContextRef,
) -> crate::error::ConnectorResult<Self::SplitEnumerator> {
NexmarkSplitEnumerator::new(self, context).await
NexmarkSplitEnumerator::new(self, context)
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/source/pulsar/enumerator/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::error::ConnectorResult;
use crate::source::pulsar::split::PulsarSplit;
use crate::source::pulsar::topic::{parse_topic, Topic};
use crate::source::pulsar::PulsarProperties;
use crate::source::{SourceEnumeratorContextRef, SplitEnumerator};
use crate::source::SplitEnumerator;

pub struct PulsarSplitEnumerator {
client: Pulsar<TokioExecutor>,
Expand Down
20 changes: 12 additions & 8 deletions src/connector/src/source/reader/fs_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,9 @@ use risingwave_common::catalog::ColumnId;
use crate::error::ConnectorResult;
use crate::parser::{CommonParserConfig, ParserConfig, SpecificParserConfig};
use crate::source::{
create_split_reader, BoxSourceChunkStream, ConnectorProperties, ConnectorState,
SourceColumnDesc, SourceContext, SplitReader,
BoxSourceChunkStream, ConnectorProperties, ConnectorState, SourceColumnDesc, SourceContext,
};
use crate::{dispatch_source_prop, WithOptionsSecResolved};
use crate::WithOptionsSecResolved;

#[derive(Clone, Debug)]
pub struct FsSourceReader {
Expand Down Expand Up @@ -92,11 +91,16 @@ impl FsSourceReader {
let stream = match state {
None => pending().boxed(),
Some(splits) => {
dispatch_source_prop!(config, prop, {
create_split_reader(*prop, splits, parser_config, source_ctx, None)
.await?
.into_stream()
})
config
.create_split_reader(
splits,
parser_config,
source_ctx,
None,
Default::default(),
)
.await?
.0
}
};
Ok(stream)
Expand Down
Loading

0 comments on commit 0ea38c8

Please sign in to comment.