Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(connector): make SplitEnumerator/Reader dyn #20098

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/bench/sink_bench/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ use risingwave_connector::sink::{
SINK_TYPE_APPEND_ONLY, SINK_TYPE_UPSERT,
};
use risingwave_connector::source::datagen::{
DatagenProperties, DatagenSplitEnumerator, DatagenSplitReader,
DatagenProperties, DatagenSplit, DatagenSplitEnumerator, DatagenSplitReader,
};
use risingwave_connector::source::{
Column, DataType, SourceContext, SourceEnumeratorContext, SplitEnumerator, SplitReader,
Expand Down
51 changes: 51 additions & 0 deletions src/connector/src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,57 @@ macro_rules! impl_connector_properties {
}
}
)*


$crate::paste! {
impl ConnectorProperties {
pub async fn create_split_enumerator(self, context: $crate::source::base::SourceEnumeratorContextRef) -> $crate::error::ConnectorResult<Box<dyn AnySplitEnumerator>> {
use $crate::source::prelude::*;
let enumerator: Box<dyn AnySplitEnumerator> = match self {
$(
ConnectorProperties::$variant_name(prop) =>
Box::new( [<$variant_name SplitEnumerator>]::new(*prop, context).await?),
)*
};
Ok(enumerator)
}


pub async fn create_split_reader(
self,
splits: Vec<SplitImpl>,
parser_config: ParserConfig,
source_ctx: SourceContextRef,
columns: Option<Vec<Column>>,
mut opt: $crate::source::CreateSplitReaderOpt,
) -> Result<(BoxSourceChunkStream, $crate::source::CreateSplitReaderResult)> {
opt.support_multiple_splits = self.support_multiple_splits();
tracing::debug!(
?splits,
support_multiple_splits = opt.support_multiple_splits,
"spawning connector split reader",
);

match self {
$(
ConnectorProperties::$variant_name(prop) => {
$crate::source::create_split_readers(*prop, splits, parser_config, source_ctx, columns, opt).await
}
)*
}
}
}
}

// TODO: make SourceProperties dyn-compatible
// pub fn into_dyn(self) -> Box<dyn crate::source::SourceProperties> {
// match self {
// $(
// ConnectorProperties::$variant_name(prop) => prop,
// )*
// }
// }

}
}

Expand Down
88 changes: 83 additions & 5 deletions src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ use async_trait::async_trait;
use aws_sdk_s3::types::Object;
use bytes::Bytes;
use enum_as_inner::EnumAsInner;
use futures::future::try_join_all;
use futures::stream::BoxStream;
use futures::Stream;
use futures::{Stream, StreamExt};
use itertools::Itertools;
use risingwave_common::array::StreamChunk;
use risingwave_common::bail;
Expand All @@ -31,6 +32,7 @@ use risingwave_common::types::{JsonbVal, Scalar};
use risingwave_pb::catalog::{PbSource, PbStreamSourceInfo};
use risingwave_pb::plan_common::ExternalTableDesc;
use risingwave_pb::source::ConnectorSplit;
use rw_futures_util::select_all;
use serde::de::DeserializeOwned;
use serde_json::json;
use tokio::sync::mpsc;
Expand Down Expand Up @@ -87,6 +89,11 @@ pub trait SourceProperties: TryFromBTreeMap + Clone + WithOptions + std::fmt::De

/// Load additional info from `ExternalTableDesc`. Currently only used by CDC.
fn init_from_pb_cdc_table_desc(&mut self, _table_desc: &ExternalTableDesc) {}

// async fn create_split_enumerator(
// self,
// context: crate::source::SourceEnumeratorContextRef,
// ) -> Result<Self::SplitEnumerator>;
}

pub trait UnknownFields {
Expand All @@ -113,21 +120,77 @@ impl<P: DeserializeOwned + UnknownFields> TryFromBTreeMap for P {
}
}

pub async fn create_split_reader<P: SourceProperties>(
#[derive(Default)]
pub struct CreateSplitReaderOpt {
pub support_multiple_splits: bool,
pub seek_to_latest: bool,
}

#[derive(Default)]
pub struct CreateSplitReaderResult {
pub latest_splits: Option<Vec<SplitImpl>>,
pub backfill_info: HashMap<SplitId, BackfillInfo>,
}

pub async fn create_split_readers<P: SourceProperties>(
prop: P,
splits: Vec<SplitImpl>,
parser_config: ParserConfig,
source_ctx: SourceContextRef,
columns: Option<Vec<Column>>,
) -> Result<P::SplitReader> {
opt: CreateSplitReaderOpt,
) -> Result<(BoxSourceChunkStream, CreateSplitReaderResult)> {
let splits = splits.into_iter().map(P::Split::try_from).try_collect()?;
P::SplitReader::new(prop, splits, parser_config, source_ctx, columns).await
let mut res = CreateSplitReaderResult {
backfill_info: HashMap::new(),
latest_splits: None,
};
if opt.support_multiple_splits {
let mut reader = P::SplitReader::new(
prop.clone(),
splits,
parser_config.clone(),
source_ctx.clone(),
columns.clone(),
)
.await?;
if opt.seek_to_latest {
res.latest_splits = Some(reader.seek_to_latest().await?);
}
res.backfill_info = reader.backfill_info();
Ok((reader.into_stream().boxed(), res))
} else {
let mut readers = try_join_all(splits.into_iter().map(|split| {
// TODO: is this reader split across multiple threads...? Realistically, we want
// source_ctx to live in a single actor.
P::SplitReader::new(
prop.clone(),
vec![split],
parser_config.clone(),
source_ctx.clone(),
columns.clone(),
)
}))
.await?;
if opt.seek_to_latest {
let mut latest_splits = vec![];
for reader in &mut readers {
latest_splits.extend(reader.seek_to_latest().await?);
}
res.latest_splits = Some(latest_splits);
}
res.backfill_info = readers.iter().flat_map(|r| r.backfill_info()).collect();
Ok((
select_all(readers.into_iter().map(|r| r.into_stream())).boxed(),
res,
))
}
}

/// [`SplitEnumerator`] fetches the split metadata from the external source service.
/// NOTE: It runs in the meta server, so probably it should be moved to the `meta` crate.
#[async_trait]
pub trait SplitEnumerator: Sized {
pub trait SplitEnumerator: Sized + Send {
type Split: SplitMetaData + Send;
type Properties;

Expand All @@ -139,6 +202,21 @@ pub trait SplitEnumerator: Sized {
pub type SourceContextRef = Arc<SourceContext>;
pub type SourceEnumeratorContextRef = Arc<SourceEnumeratorContext>;

/// Dyn-compatible [`SplitEnumerator`].
#[async_trait]
pub trait AnySplitEnumerator: Send {
async fn list_splits(&mut self) -> Result<Vec<SplitImpl>>;
}

#[async_trait]
impl<T: SplitEnumerator<Split: Into<SplitImpl>>> AnySplitEnumerator for T {
async fn list_splits(&mut self) -> Result<Vec<SplitImpl>> {
SplitEnumerator::list_splits(self)
.await
.map(|s| s.into_iter().map(|s| s.into()).collect())
}
}

/// The max size of a chunk yielded by source stream.
pub const MAX_CHUNK_SIZE: usize = 1024;

Expand Down
3 changes: 2 additions & 1 deletion src/connector/src/source/filesystem/opendal_source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

use std::collections::HashMap;

pub use opendal_enumerator::OpendalEnumerator;

pub mod azblob_source;
pub mod gcs_source;
pub mod posix_fs_source;
Expand All @@ -25,7 +27,6 @@ use with_options::WithOptions;
pub mod opendal_enumerator;
pub mod opendal_reader;

use self::opendal_enumerator::OpendalEnumerator;
use self::opendal_reader::OpendalReader;
use super::file_common::CompressionFormat;
pub use super::s3::S3PropertiesCommon;
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/source/kinesis/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

pub mod enumerator;
pub use enumerator::client::KinesisSplitEnumerator;
pub mod source;
pub mod split;

Expand All @@ -24,7 +25,6 @@ pub use source::KinesisMeta;
use with_options::WithOptions;

use crate::connector_common::KinesisCommon;
use crate::source::kinesis::enumerator::client::KinesisSplitEnumerator;
use crate::source::kinesis::source::reader::KinesisSplitReader;
use crate::source::kinesis::split::KinesisSplit;
use crate::source::SourceProperties;
Expand Down
30 changes: 30 additions & 0 deletions src/connector/src/source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,36 @@
// See the License for the specific language governing permissions and
// limitations under the License.

pub mod prelude {
// import all split enumerators
pub use crate::source::datagen::DatagenSplitEnumerator;
pub use crate::source::filesystem::opendal_source::OpendalEnumerator;
pub use crate::source::filesystem::S3SplitEnumerator;
pub use crate::source::google_pubsub::PubsubSplitEnumerator as GooglePubsubSplitEnumerator;
pub use crate::source::iceberg::IcebergSplitEnumerator;
pub use crate::source::kafka::KafkaSplitEnumerator;
pub use crate::source::kinesis::KinesisSplitEnumerator;
pub use crate::source::mqtt::MqttSplitEnumerator;
pub use crate::source::nats::NatsSplitEnumerator;
pub use crate::source::nexmark::NexmarkSplitEnumerator;
pub use crate::source::pulsar::PulsarSplitEnumerator;
pub use crate::source::test_source::TestSourceSplitEnumerator as TestSplitEnumerator;
pub type AzblobSplitEnumerator =
OpendalEnumerator<crate::source::filesystem::opendal_source::OpendalAzblob>;
pub type GcsSplitEnumerator =
OpendalEnumerator<crate::source::filesystem::opendal_source::OpendalGcs>;
pub type OpendalS3SplitEnumerator =
OpendalEnumerator<crate::source::filesystem::opendal_source::OpendalS3>;
pub type PosixFsSplitEnumerator =
OpendalEnumerator<crate::source::filesystem::opendal_source::OpendalPosixFs>;
pub use crate::source::cdc::enumerator::DebeziumSplitEnumerator;
pub type CitusCdcSplitEnumerator = DebeziumSplitEnumerator<crate::source::cdc::Citus>;
pub type MongodbCdcSplitEnumerator = DebeziumSplitEnumerator<crate::source::cdc::Mongodb>;
pub type PostgresCdcSplitEnumerator = DebeziumSplitEnumerator<crate::source::cdc::Postgres>;
pub type MysqlCdcSplitEnumerator = DebeziumSplitEnumerator<crate::source::cdc::Mysql>;
pub type SqlServerCdcSplitEnumerator = DebeziumSplitEnumerator<crate::source::cdc::SqlServer>;
}

pub mod base;
pub mod cdc;
pub mod data_gen_util;
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/source/mqtt/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

pub mod enumerator;
pub mod source;
pub use enumerator::MqttSplitEnumerator;
pub mod split;

use std::collections::HashMap;
Expand All @@ -25,7 +26,6 @@ use thiserror::Error;
use with_options::WithOptions;

use crate::connector_common::{MqttCommon, MqttQualityOfService};
use crate::source::mqtt::enumerator::MqttSplitEnumerator;
use crate::source::mqtt::source::{MqttSplit, MqttSplitReader};
use crate::source::SourceProperties;

Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/source/nats/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

pub mod enumerator;
pub use enumerator::NatsSplitEnumerator;
pub mod source;
pub mod split;

Expand All @@ -29,7 +30,6 @@ use with_options::WithOptions;

use crate::connector_common::NatsCommon;
use crate::error::{ConnectorError, ConnectorResult};
use crate::source::nats::enumerator::NatsSplitEnumerator;
use crate::source::nats::source::{NatsSplit, NatsSplitReader};
use crate::source::SourceProperties;
use crate::{
Expand Down
20 changes: 12 additions & 8 deletions src/connector/src/source/reader/fs_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,9 @@ use risingwave_common::catalog::ColumnId;
use crate::error::ConnectorResult;
use crate::parser::{CommonParserConfig, ParserConfig, SpecificParserConfig};
use crate::source::{
create_split_reader, BoxSourceChunkStream, ConnectorProperties, ConnectorState,
SourceColumnDesc, SourceContext, SplitReader,
BoxSourceChunkStream, ConnectorProperties, ConnectorState, SourceColumnDesc, SourceContext,
};
use crate::{dispatch_source_prop, WithOptionsSecResolved};
use crate::WithOptionsSecResolved;

#[derive(Clone, Debug)]
pub struct FsSourceReader {
Expand Down Expand Up @@ -92,11 +91,16 @@ impl FsSourceReader {
let stream = match state {
None => pending().boxed(),
Some(splits) => {
dispatch_source_prop!(config, prop, {
create_split_reader(*prop, splits, parser_config, source_ctx, None)
.await?
.into_stream()
})
config
.create_split_reader(
splits,
parser_config,
source_ctx,
None,
Default::default(),
)
.await?
.0
}
};
Ok(stream)
Expand Down
Loading
Loading