Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add table function #20092

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions proto/expr.proto
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,8 @@ message TableFunction {
POSTGRES_QUERY = 20;
// mysql query
MYSQL_QUERY = 21;
// Internal backfill table function
INTERNAL_BACKFILL_PROGRESS = 30;
// User defined table function
USER_DEFINED = 100;
}
Expand Down
15 changes: 15 additions & 0 deletions src/common/src/catalog/internal_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,21 @@ pub fn generate_internal_table_name_with_type(
)
}

/// TODO(kwannoel): Test if there's underscores in the job name
pub fn internal_table_name_to_parts(table_name: &str) -> Option<(&str, u32, &str, u32)> {
let parts: Vec<&str> = table_name.split('_').collect();
println!("parts: {:?}", parts);
if parts.len() == 7 && parts[2] == "internal" {
let job_name = parts[3];
let fragment_id = parts[4].parse().ok()?;
let table_type = parts[5];
let table_id = parts[6].parse().ok()?;
Some((job_name, fragment_id, table_type, table_id))
} else {
None
}
}

pub fn get_dist_key_in_pk_indices<I: Eq + Copy + Debug, O: TryFrom<usize>>(
dist_key_indices: &[I],
pk_indices: &[I],
Expand Down
9 changes: 9 additions & 0 deletions src/frontend/src/binder/expr/function/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,15 @@ impl Binder {
.context("mysql_query error")?
.into());
}
// `internal_backfill_progress` table function
if func_name.eq("internal_backfill_progress") {
reject_syntax!(
arg_list.variadic,
"`VARIADIC` is not allowed in table function call"
);
self.ensure_table_function_allowed()?;
return Ok(TableFunction::new_internal_backfill_progress().into());
}
// UDTF
if let Some(ref udf) = udf
&& udf.kind.is_table()
Expand Down
4 changes: 4 additions & 0 deletions src/frontend/src/catalog/root_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -690,6 +690,10 @@ impl Catalog {
Err(CatalogError::NotFound("table id", table_id.to_string()))
}

pub fn iter_tables(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
self.table_by_id.values()
}

pub fn get_schema_by_table_id(
&self,
db_name: &str,
Expand Down
14 changes: 14 additions & 0 deletions src/frontend/src/expr/table_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,20 @@ impl TableFunction {
}
}

/// This is a highly specific _internal_ table function meant to scan and aggregate
/// `backfill_table_id`, `row_count` for all MVs which are still being created.
pub fn new_internal_backfill_progress() -> Self {
TableFunction {
args: vec![],
return_type: DataType::Struct(StructType::new(vec![
("job_id".to_owned(), DataType::Int32),
("row_count".to_owned(), DataType::Int64),
])),
function_type: TableFunctionType::InternalBackfillProgress,
user_defined: None,
}
}

pub fn to_protobuf(&self) -> PbTableFunction {
PbTableFunction {
function_type: self.function_type as i32,
Expand Down
14 changes: 12 additions & 2 deletions src/frontend/src/optimizer/logical_optimization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ static TABLE_FUNCTION_CONVERT: LazyLock<OptimizationStage> = LazyLock::new(|| {
"Table Function Convert",
vec![
// Apply file scan rule first
TableFunctionToFileScanRule::create(),
TableFunctionToInternalBackfillProgressRule::create(),
// Apply postgres query rule next
TableFunctionToPostgresQueryRule::create(),
// Apply mysql query rule next
Expand All @@ -149,7 +149,7 @@ static TABLE_FUNCTION_CONVERT: LazyLock<OptimizationStage> = LazyLock::new(|| {
static TABLE_FUNCTION_TO_FILE_SCAN: LazyLock<OptimizationStage> = LazyLock::new(|| {
OptimizationStage::new(
"Table Function To FileScan",
vec![TableFunctionToFileScanRule::create()],
vec![TableFunctionToInternalBackfillProgressRule::create()],
ApplyOrder::TopDown,
)
});
Expand All @@ -170,6 +170,15 @@ static TABLE_FUNCTION_TO_MYSQL_QUERY: LazyLock<OptimizationStage> = LazyLock::ne
)
});

static TABLE_FUNCTION_TO_INTERNAL_BACKFILL_PROGRESS: LazyLock<OptimizationStage> =
LazyLock::new(|| {
OptimizationStage::new(
"Table Function To Internal Backfill Progress",
vec![TableFunctionToInternalBackfillProgressRule::create()],
ApplyOrder::TopDown,
)
});

static VALUES_EXTRACT_PROJECT: LazyLock<OptimizationStage> = LazyLock::new(|| {
OptimizationStage::new(
"Values Extract Project",
Expand Down Expand Up @@ -734,6 +743,7 @@ impl LogicalOptimizer {
plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_FILE_SCAN)?;
plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_POSTGRES_QUERY)?;
plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_MYSQL_QUERY)?;
plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_INTERNAL_BACKFILL_PROGRESS)?;
// In order to unnest a table function, we need to convert it into a `project_set` first.
plan = plan.optimize_by_rules(&TABLE_FUNCTION_CONVERT)?;

Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/optimizer/plan_node/logical_project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ impl LogicalProject {
Self::new(input, exprs).into()
}

// TODO(kwannoel): We only need create/new don't keep both.
pub fn new(input: PlanRef, exprs: Vec<ExprImpl>) -> Self {
let core = generic::Project::new(exprs, input);
Self::with_core(core)
Expand Down
3 changes: 3 additions & 0 deletions src/frontend/src/optimizer/rule/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ mod pull_up_correlated_predicate_agg_rule;
mod source_to_iceberg_scan_rule;
mod source_to_kafka_scan_rule;
mod table_function_to_file_scan_rule;
mod table_function_to_internal_backfill_progress;
mod table_function_to_mysql_query_rule;
mod table_function_to_postgres_query_rule;
mod values_extract_project_rule;
Expand All @@ -254,6 +255,7 @@ pub use pull_up_correlated_predicate_agg_rule::*;
pub use source_to_iceberg_scan_rule::*;
pub use source_to_kafka_scan_rule::*;
pub use table_function_to_file_scan_rule::*;
pub use table_function_to_internal_backfill_progress::*;
pub use table_function_to_mysql_query_rule::*;
pub use table_function_to_postgres_query_rule::*;
pub use values_extract_project_rule::*;
Expand Down Expand Up @@ -321,6 +323,7 @@ macro_rules! for_all_rules {
, { TableFunctionToFileScanRule }
, { TableFunctionToPostgresQueryRule }
, { TableFunctionToMySqlQueryRule }
, { TableFunctionToInternalBackfillProgressRule }
, { ApplyLimitTransposeRule }
, { CommonSubExprExtractRule }
, { BatchProjectMergeRule }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use risingwave_common::types::{DataType, ScalarImpl};
use risingwave_common::util::iter_util::ZipEqDebug;
use risingwave_connector::source::iceberg::{extract_bucket_and_file_name, FileScanBackend};

use super::{BoxedRule, Rule};
use super::Rule;
use crate::expr::{Expr, TableFunctionType};
use crate::optimizer::plan_node::generic::GenericPlanRef;
use crate::optimizer::plan_node::{LogicalFileScan, LogicalTableFunction};
Expand Down Expand Up @@ -113,10 +113,4 @@ impl Rule for TableFunctionToFileScanRule {
unreachable!("TableFunction return type should be struct")
}
}
}

impl TableFunctionToFileScanRule {
pub fn create() -> BoxedRule {
Box::new(TableFunctionToFileScanRule {})
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
// Copyright 2025 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use anyhow::anyhow;
use itertools::Itertools;
use risingwave_common::catalog::{internal_table_name_to_parts, Field, Schema, StreamJobStatus};
use risingwave_common::types::{DataType, ScalarImpl};

use super::{ApplyResult, BoxedRule, FallibleRule};
use crate::catalog::catalog_service::CatalogReadGuard;
use crate::catalog::table_catalog::TableType;
use crate::expr::{ExprImpl, InputRef, Literal, TableFunctionType};
use crate::optimizer::plan_node::generic::GenericPlanRef;
use crate::optimizer::plan_node::{
LogicalAgg, LogicalProject, LogicalScan, LogicalTableFunction, LogicalUnion, LogicalValues,
};
use crate::optimizer::PlanRef;
use crate::utils::GroupBy;
use crate::TableCatalog;

/// Transform a special `TableFunction` (with `FILE_SCAN` table function type) into a `LogicalFileScan`
pub struct TableFunctionToInternalBackfillProgressRule {}
impl FallibleRule for TableFunctionToInternalBackfillProgressRule {
fn apply(&self, plan: PlanRef) -> ApplyResult {
let logical_table_function: &LogicalTableFunction = plan.as_logical_table_function()?;
if logical_table_function.table_function.function_type
!= TableFunctionType::InternalBackfillProgress
{
return ApplyResult::NotApplicable;
}

let fields = vec![
Field::new("job_id", DataType::Int32),
Field::new("row_count", DataType::Int64),
];

let reader = plan.ctx().session_ctx().env().catalog_reader().read_guard();
// TODO(kwannoel): Make sure it reads from source tables as well.
let backfilling_tables = get_backfilling_tables(reader);

// No backfill in progress, just return empty values.
if backfilling_tables.is_empty() {
let plan = LogicalValues::new(vec![], Schema::new(fields), plan.ctx().clone());
return ApplyResult::Ok(plan.into());
}

let mut counts = Vec::with_capacity(backfilling_tables.len());
for table in backfilling_tables {
let Some(job_id) = table.job_id else {
return ApplyResult::Err(
anyhow!("`job_id` column not found in backfill table").into(),
);
};
let Some(row_count_column_index) =
table.columns.iter().position(|c| c.name() == "row_count")
else {
return ApplyResult::Err(
anyhow!("`row_count` column not found in backfill table").into(),
);
};
let scan = LogicalScan::create(
table.name.clone(),
table,
vec![],
plan.ctx(),
None,
Default::default(),
);
let project = LogicalProject::new(
scan.into(),
vec![ExprImpl::InputRef(Box::new(InputRef {
index: row_count_column_index,
data_type: DataType::Int64,
}))],
);
let select_exprs = vec![ExprImpl::Literal(Box::new(Literal::new(
Some(ScalarImpl::Int32(job_id.table_id as i32)),
DataType::Int32,
)))];
let group_key = GroupBy::GroupKey(vec![ExprImpl::InputRef(Box::new(InputRef {
index: 0,
data_type: DataType::Int32,
}))]);
let (count, _rewritten_select_exprs, _) =
LogicalAgg::create(select_exprs, group_key, None, project.into())?;
counts.push(count);
}
println!("counts: {:?}", counts);
ApplyResult::Ok(LogicalUnion::new(true, counts).into())
}
}

fn get_backfilling_tables(reader: CatalogReadGuard) -> Vec<Arc<TableCatalog>> {
reader
.iter_tables()
.filter(|table| {
let name = &table.name;
println!("table_name: {:?}", name);
println!("vnode count: {:?}", table.vnode_count);
match internal_table_name_to_parts(name) {
None => false,
Some((_job_name, _fragment_id, executor_type, _table_id)) => {
let is_backfill = executor_type == "streamscan";
println!("is_backfill: {:?}", is_backfill);
let is_creating = table.stream_job_status == StreamJobStatus::Creating;
println!("is_creating: {:?}", is_creating);
let is_internal = table.table_type == TableType::Internal;
println!("is_internal: {:?}", is_internal);
is_backfill && is_creating && is_internal
}
}
})
.cloned()
.collect_vec()
}

impl TableFunctionToInternalBackfillProgressRule {
pub fn create() -> BoxedRule {
Box::new(TableFunctionToInternalBackfillProgressRule {})
}
}
Loading