Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: iso8601 support in Time-travel & Incremental API #302

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 61 additions & 4 deletions crates/core/src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ use crate::Result;
use crate::config::read::HudiReadConfig;
use arrow::record_batch::RecordBatch;
use arrow_schema::{Field, Schema};
use chrono::DateTime;
use std::collections::{HashMap, HashSet};
use std::str::FromStr;
use std::sync::Arc;
Expand Down Expand Up @@ -240,8 +241,9 @@ impl Table {
timestamp: &str,
filters: &[(&str, &str, &str)],
) -> Result<Vec<Vec<FileSlice>>> {
let timestamp = format_timestamp(timestamp);
let filters = from_str_tuples(filters)?;
self.get_file_slices_splits_internal(n, timestamp, &filters)
self.get_file_slices_splits_internal(n, &timestamp, &filters)
.await
}

Expand Down Expand Up @@ -294,8 +296,9 @@ impl Table {
timestamp: &str,
filters: &[(&str, &str, &str)],
) -> Result<Vec<FileSlice>> {
let timestamp = format_timestamp(timestamp);
let filters = from_str_tuples(filters)?;
self.get_file_slices_internal(timestamp, &filters).await
self.get_file_slices_internal(&timestamp, &filters).await
}

async fn get_file_slices_internal(
Expand Down Expand Up @@ -405,8 +408,9 @@ impl Table {
timestamp: &str,
filters: &[(&str, &str, &str)],
) -> Result<Vec<RecordBatch>> {
let timestamp = format_timestamp(timestamp);
let filters = from_str_tuples(filters)?;
self.read_snapshot_internal(timestamp, &filters).await
self.read_snapshot_internal(&timestamp, &filters).await
}

async fn read_snapshot_internal(
Expand Down Expand Up @@ -445,8 +449,11 @@ impl Table {
return Ok(Vec::new());
};

let start_timestamp = format_timestamp(start_timestamp);
let end_timestamp = format_timestamp(end_timestamp);

let file_slices = self
.get_file_slices_between_internal(start_timestamp, end_timestamp)
.get_file_slices_between_internal(&start_timestamp, &end_timestamp)
.await?;

let fg_reader = self.create_file_group_reader_with_options([
Expand Down Expand Up @@ -474,6 +481,36 @@ impl Table {
}
}

/// Formats a timestamp string to `yyyyMMddHHmmSSSSS` format.
///
/// The function attempts to parse the input timestamp string (`timestamp`) into a `DateTime` object.
/// It first tries parsing the ISO 8601 format, and if that fails, it tries two other formats: `yyyyMMddHHmmSSSSS`
/// and `yyyyMMddHHmmSS`. If none of the formats match, it returns the original input string.
///
/// # Arguments
/// - `timestamp`: The input timestamp string to be formatted.
///
/// # Returns
/// A string formatted as `yyyyMMddHHmmSSSSS`. If the input cannot be parsed, the original string is returned.
fn format_timestamp(timestamp: &str) -> String {
if let Ok(datetime) = DateTime::parse_from_rfc3339(timestamp) {
return datetime.format("%Y%m%d%H%M%S%3f").to_string();
}

let formats = ["yyyyMMddHHmmSSSSS", "yyyyMMddHHmmSS"];
for format in formats.iter() {
if let Ok(datetime) = DateTime::parse_from_str(timestamp, format) {
return datetime.format("%Y%m%d%H%M%S%3f").to_string();
}
}

if timestamp.len() == 10 && timestamp.chars().all(|c| c.is_digit(10) || c == '-') {
return format!("{}000000000", timestamp.replace("-", ""));
}

timestamp.to_string()
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -1357,4 +1394,24 @@ mod tests {
Ok(())
}
}

#[test]
fn test_format_timestamp() {
let timestamps = vec![
("2019-01-23T12:34:56.123456789+00:00", "20190123123456123"),
("2019-01-23T12:34:56.123456+00:00", "20190123123456123"),
("2019-01-23T12:34:56.123+00:00", "20190123123456123"),
("2019-01-23T12:34:56+00:00", "20190123123456000"),
("2019-01-23T12:34:56Z", "20190123123456000"),
("2019-01-23", "20190123000000000"),
("20190123123456", "20190123123456"),
("20190123000000000", "20190123000000000"),
("20190123123456000", "20190123123456000"),
("20190123123456123", "20190123123456123"),
];

for (timestamp, result) in timestamps {
assert_eq!(crate::table::format_timestamp(timestamp), result)
}
}
}
Loading