Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

load storage credentials from catalog response #152

Merged
merged 5 commits into from
Apr 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
176 changes: 126 additions & 50 deletions catalogs/iceberg-rest-catalog/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,35 +25,41 @@ use iceberg_rust::{
table::Table,
view::View,
};
use object_store::ObjectStore;
use std::{collections::HashMap, path::Path, sync::Arc};
use object_store::{aws::AmazonS3Builder, ObjectStore};
use std::{
collections::HashMap,
path::Path,
sync::{Arc, RwLock},
};

use crate::{
apis::{
self,
catalog_api_api::{self, NamespaceExistsError},
configuration::Configuration,
},
models,
models::{self, StorageCredential},
};

#[derive(Debug)]
pub struct RestCatalog {
name: Option<String>,
configuration: Configuration,
object_store_builder: ObjectStoreBuilder,
default_object_store_builder: Option<ObjectStoreBuilder>,
cache: Arc<RwLock<HashMap<Identifier, Arc<dyn ObjectStore>>>>,
}

impl RestCatalog {
pub fn new(
name: Option<&str>,
configuration: Configuration,
object_store_builder: ObjectStoreBuilder,
default_object_store_builder: Option<ObjectStoreBuilder>,
) -> Self {
RestCatalog {
name: name.map(ToString::to_string),
configuration,
object_store_builder,
default_object_store_builder,
cache: Arc::new(RwLock::new(HashMap::new())),
}
}
}
Expand Down Expand Up @@ -276,7 +282,7 @@ impl Catalog for RestCatalog {
)),
Err(apis::Error::ResponseError(content)) => {
if content.status == 404 {
let table_metadata = catalog_api_api::load_table(
let response = catalog_api_api::load_table(
&self.configuration,
self.name.as_deref(),
&identifier.namespace().to_string(),
Expand All @@ -285,12 +291,27 @@ impl Catalog for RestCatalog {
None,
)
.await
.map(|x| x.metadata)
.map_err(|_| Error::CatalogNotFound)?;

let object_store = self
.object_store_builder
.build(Bucket::from_path(&table_metadata.location)?)?;
let object_store = object_store_from_response(&response)?
.ok_or(Error::NotFound("Object store credentials".to_string()))
.or_else(|_| {
self.default_object_store_builder
.as_ref()
.ok_or(Error::NotFound("Default object store".to_string()))
.and_then(|x| {
let bucket = Bucket::from_path(&response.metadata.location)?;
x.build(bucket)
})
})?;

self.cache
.write()
.unwrap()
.insert(identifier.clone(), object_store.clone());

let table_metadata = response.metadata;

Ok(Tabular::Table(
Table::new(
identifier.clone(),
Expand All @@ -315,50 +336,54 @@ impl Catalog for RestCatalog {
identifier: Identifier,
create_table: CreateTable,
) -> Result<Table, Error> {
catalog_api_api::create_table(
let response = catalog_api_api::create_table(
&self.configuration,
self.name.as_deref(),
&identifier.namespace().to_string(),
create_table,
None,
)
.map_err(Into::<Error>::into)
.and_then(|response| {
let clone = self.clone();
async move {
let object_store = clone
.object_store_builder
.build(Bucket::from_path(&response.metadata.location)?)?;
Table::new(identifier.clone(), clone, object_store, response.metadata).await
}
})
.await
.await?;

let object_store = object_store_from_response(&response)?
.ok_or(Error::NotFound("Object store credentials".to_string()))
.or_else(|_| {
self.default_object_store_builder
.as_ref()
.ok_or(Error::NotFound("Default object store".to_string()))
.and_then(|x| {
let bucket = Bucket::from_path(&response.metadata.location)?;
x.build(bucket)
})
})?;

Table::new(identifier.clone(), self, object_store, response.metadata).await
}
/// Update a table by atomically changing the pointer to the metadata file
async fn update_table(
self: Arc<Self>,
commit: iceberg_rust::catalog::commit::CommitTable,
) -> Result<Table, Error> {
let identifier = commit.identifier.clone();
catalog_api_api::update_table(
let response = catalog_api_api::update_table(
&self.configuration,
self.name.as_deref(),
&identifier.namespace().to_string(),
identifier.name(),
commit,
)
.map_err(Into::<Error>::into)
.and_then(|response| {
let clone = self.clone();
let identifier = identifier.clone();
async move {
let object_store = clone
.object_store_builder
.build(Bucket::from_path(&response.metadata.location)?)?;
Table::new(identifier, clone, object_store, response.metadata).await
}
})
.await
.map_err(Into::<Error>::into)?;

let Some(object_store) = self.cache.read().unwrap().get(&identifier).cloned() else {
return Err(Error::NotFound(format!(
"Object store for table {}",
&identifier
)));
};

Table::new(identifier, self, object_store, response.metadata).await
}
async fn create_view(
self: Arc<Self>,
Expand Down Expand Up @@ -489,34 +514,41 @@ impl Catalog for RestCatalog {
metadata_location.to_owned(),
);

catalog_api_api::register_table(
let response = catalog_api_api::register_table(
&self.configuration,
self.name.as_deref(),
&identifier.namespace().to_string(),
request,
)
.map_err(Into::<Error>::into)
.and_then(|response| {
let clone = self.clone();
async move {
let object_store = clone
.object_store_builder
.build(Bucket::from_path(&response.metadata.location)?)?;
Table::new(identifier.clone(), clone, object_store, response.metadata).await
}
})
.await
.await?;
let object_store = object_store_from_response(&response)?
.ok_or(Error::NotFound("Object store credentials".to_string()))
.or_else(|_| {
self.default_object_store_builder
.as_ref()
.ok_or(Error::NotFound("Default object store".to_string()))
.and_then(|x| {
let bucket = Bucket::from_path(&response.metadata.location)?;
x.build(bucket)
})
})?;

Table::new(identifier.clone(), self, object_store, response.metadata).await
}
}

#[derive(Debug, Clone)]
pub struct RestCatalogList {
configuration: Configuration,
object_store_builder: ObjectStoreBuilder,
object_store_builder: Option<ObjectStoreBuilder>,
}

impl RestCatalogList {
pub fn new(configuration: Configuration, object_store_builder: ObjectStoreBuilder) -> Self {
pub fn new(
configuration: Configuration,
object_store_builder: Option<ObjectStoreBuilder>,
) -> Self {
Self {
configuration,
object_store_builder,
Expand All @@ -542,14 +574,14 @@ impl CatalogList for RestCatalogList {
pub struct RestNoPrefixCatalogList {
name: String,
configuration: Configuration,
object_store_builder: ObjectStoreBuilder,
object_store_builder: Option<ObjectStoreBuilder>,
}

impl RestNoPrefixCatalogList {
pub fn new(
name: &str,
configuration: Configuration,
object_store_builder: ObjectStoreBuilder,
object_store_builder: Option<ObjectStoreBuilder>,
) -> Self {
Self {
name: name.to_owned(),
Expand Down Expand Up @@ -577,6 +609,50 @@ impl CatalogList for RestNoPrefixCatalogList {
}
}

const CLIENT_REGION: &str = "client.region";
const AWS_ACCESS_KEY_ID: &str = "s3.access-key-id";
const AWS_SECRET_ACCESS_KEY: &str = "s3.secret-access-key";
const AWS_SESSION_TOKEN: &str = "s3.session-token";

fn object_store_from_response(
response: &models::LoadTableResult,
) -> Result<Option<Arc<dyn ObjectStore>>, Error> {
let config = match (&response.storage_credentials, &response.config) {
(Some(credentials), _) => Some(&credentials[0].config),
(None, Some(config)) => Some(config),
(None, None) => None,
};

let Some(config) = config else {
return Ok(None);
};

let region = config.get(CLIENT_REGION);
if config.contains_key(AWS_ACCESS_KEY_ID) {
let access_key_id = config.get(AWS_ACCESS_KEY_ID);
let secret_access_key = config.get(AWS_SECRET_ACCESS_KEY);
let session_token = config.get(AWS_SESSION_TOKEN);
let mut builder = AmazonS3Builder::new();

if let Some(region) = region {
builder = builder.with_region(region)
}
if let Some(access_key_id) = access_key_id {
builder = builder.with_access_key_id(access_key_id)
}
if let Some(secret_access_key) = secret_access_key {
builder = builder.with_secret_access_key(secret_access_key)
}
if let Some(session_token) = session_token {
builder = builder.with_token(session_token)
}

Ok(Some(Arc::new(builder.build()?)))
} else {
Ok(None)
}
}

#[cfg(test)]
pub mod tests {
use datafusion::{
Expand Down Expand Up @@ -685,7 +761,7 @@ pub mod tests {
let iceberg_catalog = Arc::new(RestCatalog::new(
None,
configuration(&format!("http://{rest_host}:{rest_port}")),
object_store,
Some(object_store),
));

iceberg_catalog
Expand Down
2 changes: 1 addition & 1 deletion datafusion_iceberg/tests/integration_trino.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ async fn integration_trino_rest() {
let catalog = Arc::new(RestCatalog::new(
None,
configuration(&rest_host.to_string(), rest_port),
object_store,
Some(object_store),
));

let tables = catalog
Expand Down