Skip to content

Commit

Permalink
Use our object store in delta lake to properly handle rotating creden…
Browse files Browse the repository at this point in the history
…tials
  • Loading branch information
mwylde committed Jan 8, 2025
1 parent 5443b5c commit 3b50980
Show file tree
Hide file tree
Showing 6 changed files with 168 additions and 207 deletions.
19 changes: 19 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/arroyo-connectors/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ uuid = { version = "1.7.0", features = ["v4"] }
# Filesystem
parquet = { workspace = true, features = ["async"]}
object_store = { workspace = true }
deltalake = { workspace = true, features = ["s3"] }
deltalake = { workspace = true, features = ["s3", "gcs"] }
async-compression = { version = "0.4.3", features = ["tokio", "zstd", "gzip"] }

# MQTT
Expand Down
150 changes: 54 additions & 96 deletions crates/arroyo-connectors/src/filesystem/sink/delta.rs
Original file line number Diff line number Diff line change
@@ -1,52 +1,40 @@
use super::FinishedFile;
use anyhow::{Context, Result};
use arrow::datatypes::{Schema, SchemaRef};
use arroyo_storage::{get_current_credentials, StorageProvider};
use arrow::datatypes::Schema;
use arroyo_storage::{BackendConfig, StorageProvider};
use arroyo_types::to_millis;
use deltalake::aws::storage::S3StorageBackend;
use deltalake::TableProperty::{MinReaderVersion, MinWriterVersion};
use deltalake::{
aws::storage::s3_constants::AWS_S3_ALLOW_UNSAFE_RENAME,
kernel::{Action, Add},
operations::create::CreateBuilder,
protocol::SaveMode,
table::PeekCommit,
DeltaTableBuilder,
DeltaTable, DeltaTableBuilder,
};
use object_store::{aws::AmazonS3ConfigKey, path::Path};
use once_cell::sync::Lazy;
use object_store::{path::Path, ObjectStore};
use std::sync::Arc;
use std::{
collections::{HashMap, HashSet},
time::SystemTime,
};
use tracing::debug;

static INIT: Lazy<()> = Lazy::new(|| {
deltalake::aws::register_handlers(None);
});
use url::Url;

pub(crate) async fn commit_files_to_delta(
finished_files: &[FinishedFile],
relative_table_path: &Path,
storage_provider: &StorageProvider,
table: &mut DeltaTable,
last_version: i64,
schema: SchemaRef,
) -> Result<Option<i64>> {
if finished_files.is_empty() {
return Ok(None);
}

let add_actions = create_add_actions(finished_files, relative_table_path)?;
let table_path = build_table_path(storage_provider, relative_table_path);
let storage_options = configure_storage_options(&table_path, storage_provider).await?;
let mut table = load_or_create_table(&table_path, storage_options, &schema).await?;

if let Some(new_version) = check_existing_files(
&mut table,
last_version,
finished_files,
relative_table_path,
)
.await?

if let Some(new_version) =
check_existing_files(table, last_version, finished_files, relative_table_path).await?
{
return Ok(Some(new_version));
}
Expand All @@ -55,72 +43,50 @@ pub(crate) async fn commit_files_to_delta(
Ok(Some(new_version))
}

async fn load_or_create_table(
table_path: &str,
storage_options: HashMap<String, String>,
pub(crate) async fn load_or_create_table(
table_path: &Path,
storage_provider: &StorageProvider,
schema: &Schema,
) -> Result<deltalake::DeltaTable> {
Lazy::force(&INIT);
) -> Result<DeltaTable> {
deltalake::aws::register_handlers(None);
match DeltaTableBuilder::from_uri(table_path)
.with_storage_options(storage_options.clone())
.load()
.await
{
Ok(table) => Ok(table),
Err(deltalake::DeltaTableError::NotATable(_)) => {
create_new_table(table_path, storage_options, schema).await
}
Err(err) => Err(err.into()),
}
}

async fn create_new_table(
table_path: &str,
storage_options: HashMap<String, String>,
schema: &Schema,
) -> Result<deltalake::DeltaTable> {
let delta_object_store = DeltaTableBuilder::from_uri(table_path)
.with_storage_options(storage_options)
.build_storage()?;
let delta_schema: deltalake::kernel::Schema = (schema).try_into()?;
CreateBuilder::new()
.with_log_store(delta_object_store)
.with_columns(delta_schema.fields().cloned())
.with_configuration_property(MinReaderVersion, Some("3"))
.with_configuration_property(MinWriterVersion, Some("7"))
.await
.map_err(Into::into)
}

async fn configure_storage_options(
table_path: &str,
storage_provider: &StorageProvider,
) -> Result<HashMap<String, String>> {
let mut options = storage_provider.storage_options().clone();
if table_path.starts_with("s3://") {
update_s3_credentials(&mut options).await?;
}
Ok(options)
}

async fn update_s3_credentials(options: &mut HashMap<String, String>) -> Result<()> {
if !options.contains_key(AmazonS3ConfigKey::SecretAccessKey.as_ref()) {
let tmp_credentials = get_current_credentials().await?;
options.insert(
AmazonS3ConfigKey::AccessKeyId.as_ref().to_string(),
tmp_credentials.key_id.clone(),
);
options.insert(
AmazonS3ConfigKey::SecretAccessKey.as_ref().to_string(),
tmp_credentials.secret_key.clone(),
);
if let Some(token) = tmp_credentials.token.as_ref() {
options.insert(AmazonS3ConfigKey::Token.as_ref().to_string(), token.clone());
}
deltalake::gcp::register_handlers(None);

let (backing_store, url): (Arc<dyn ObjectStore>, _) = match storage_provider.config() {
BackendConfig::S3(_) => (
Arc::new(S3StorageBackend::try_new(
storage_provider.get_backing_store(),
true,
)?),
format!("s3://{}", storage_provider.qualify_path(table_path)),
),
BackendConfig::GCS(_) => (
storage_provider.get_backing_store(),
format!("gs://{}", storage_provider.qualify_path(table_path)),
),
BackendConfig::Local(_) => (storage_provider.get_backing_store(), table_path.to_string()),
};

let mut delta = DeltaTableBuilder::from_uri(&url)
.with_storage_backend(
backing_store,
Url::parse(&storage_provider.canonical_url())?,
)
.build()?;

println!("Table uri = {}", delta.table_uri());

if delta.verify_deltatable_existence().await? {
delta.load().await?;
Ok(delta)
} else {
let delta_schema: deltalake::kernel::Schema = schema.try_into()?;
Ok(CreateBuilder::new()
.with_log_store(delta.log_store())
.with_columns(delta_schema.fields().cloned())
.with_configuration_property(MinReaderVersion, Some("3"))
.with_configuration_property(MinWriterVersion, Some("7"))
.await?)
}
options.insert(AWS_S3_ALLOW_UNSAFE_RENAME.to_string(), "true".to_string());
Ok(())
}

fn create_add_actions(
Expand Down Expand Up @@ -158,7 +124,7 @@ fn create_add_action(file: &FinishedFile, relative_table_path: &Path) -> Result<
}

async fn check_existing_files(
table: &mut deltalake::DeltaTable,
table: &mut DeltaTable,
last_version: i64,
finished_files: &[FinishedFile],
relative_table_path: &Path,
Expand Down Expand Up @@ -191,7 +157,7 @@ async fn check_existing_files(
Ok(None)
}

async fn commit_to_delta(table: deltalake::DeltaTable, add_actions: Vec<Action>) -> Result<i64> {
async fn commit_to_delta(table: &mut DeltaTable, add_actions: Vec<Action>) -> Result<i64> {
Ok(deltalake::operations::transaction::CommitBuilder::default()
.with_actions(add_actions)
.build(
Expand All @@ -206,11 +172,3 @@ async fn commit_to_delta(table: deltalake::DeltaTable, add_actions: Vec<Action>)
.await?
.version)
}

fn build_table_path(storage_provider: &StorageProvider, relative_table_path: &Path) -> String {
format!(
"{}/{}",
storage_provider.object_store_base_url(),
relative_table_path
)
}
Loading

0 comments on commit 3b50980

Please sign in to comment.