diff --git a/daft/dataframe/dataframe.py b/daft/dataframe/dataframe.py
index eb07238ceb..220c7787d8 100644
--- a/daft/dataframe/dataframe.py
+++ b/daft/dataframe/dataframe.py
@@ -53,6 +53,7 @@
import torch
from daft.io import DataCatalogTable
+ from daft.unity_catalog import UnityCatalogTable
from daft.logical.schema import Schema
@@ -826,7 +827,7 @@ def write_iceberg(self, table: "pyiceberg.table.Table", mode: str = "append") ->
@DataframePublicAPI
def write_deltalake(
self,
- table: Union[str, pathlib.Path, "DataCatalogTable", "deltalake.DeltaTable"],
+ table: Union[str, pathlib.Path, "DataCatalogTable", "deltalake.DeltaTable", "UnityCatalogTable"],
partition_cols: Optional[List[str]] = None,
mode: Literal["append", "overwrite", "error", "ignore"] = "append",
schema_mode: Optional[Literal["merge", "overwrite"]] = None,
@@ -844,7 +845,7 @@ def write_deltalake(
This call is **blocking** and will execute the DataFrame when called
Args:
- table (Union[str, pathlib.Path, DataCatalogTable, deltalake.DeltaTable]): Destination `Delta Lake Table `__ or table URI to write dataframe to.
+ table (Union[str, pathlib.Path, DataCatalogTable, deltalake.DeltaTable, UnityCatalogTable]): Destination `Delta Lake Table `__ or table URI to write dataframe to.
partition_cols (List[str], optional): How to subpartition each partition further. If table exists, expected to match table's existing partitioning scheme, otherwise creates the table with specified partition columns. Defaults to None.
mode (str, optional): Operation mode of the write. `append` will add new data, `overwrite` will replace table with new data, `error` will raise an error if table already exists, and `ignore` will not write anything if table already exists. Defaults to "append".
schema_mode (str, optional): Schema mode of the write. If set to `overwrite`, allows replacing the schema of the table when doing `mode=overwrite`. Schema mode `merge` is currently not supported.
@@ -872,6 +873,7 @@ def write_deltalake(
from daft.io import DataCatalogTable
from daft.io._deltalake import large_dtypes_kwargs
from daft.io.object_store_options import io_config_to_storage_options
+ from daft.unity_catalog import UnityCatalogTable
if schema_mode == "merge":
raise ValueError("Schema mode' merge' is not currently supported for write_deltalake.")
@@ -881,14 +883,21 @@ def write_deltalake(
io_config = get_context().daft_planning_config.default_io_config if io_config is None else io_config
- if isinstance(table, (str, pathlib.Path, DataCatalogTable)):
+ if isinstance(table, (str, pathlib.Path, DataCatalogTable, UnityCatalogTable)):
if isinstance(table, str):
table_uri = table
elif isinstance(table, pathlib.Path):
table_uri = str(table)
+ elif isinstance(table, UnityCatalogTable):
+ table_uri = table.table_uri
+ io_config = table.io_config
else:
table_uri = table.table_uri(io_config)
+ if io_config is None:
+ raise ValueError(
+ "io_config was not provided to write_deltalake and could not be retrieved from the default configuration."
+ )
storage_options = io_config_to_storage_options(io_config, table_uri) or {}
table = try_get_deltatable(table_uri, storage_options=storage_options)
elif isinstance(table, deltalake.DeltaTable):
diff --git a/daft/unity_catalog/unity_catalog.py b/daft/unity_catalog/unity_catalog.py
index f22e310193..8dc426fd06 100644
--- a/daft/unity_catalog/unity_catalog.py
+++ b/daft/unity_catalog/unity_catalog.py
@@ -90,14 +90,56 @@ def _paginated_list_tables(client: unitycatalog.Unitycatalog, page_token: str |
return self._paginate_to_completion(_paginated_list_tables)
- def load_table(self, table_name: str) -> UnityCatalogTable:
+ def load_table(self, table_name: str, new_table_storage_path: str | None = None) -> UnityCatalogTable:
+ """Loads an existing Unity Catalog table. If the table is not found, and information is provided in the method to create a new table, a new table will be attempted to be registered.
+
+ Args:
+ table_name (str): Name of the table in Unity Catalog in the form of dot-separated, 3-level namespace
+ new_table_storage_path (str, optional): Cloud storage path URI to register a new external table using this path. Unity Catalog will validate if the path is valid and authorized for the principal, else will raise an exception.
+
+ Returns:
+ UnityCatalogTable
+ """
# Load the table ID
- table_info = self._client.tables.retrieve(table_name)
+ try:
+ table_info = self._client.tables.retrieve(table_name)
+ if new_table_storage_path:
+ warnings.warn(
+ f"Table {table_name} is an existing storage table with a valid storage path. The 'new_table_storage_path' argument provided will be ignored."
+ )
+ except unitycatalog.NotFoundError:
+ if not new_table_storage_path:
+ raise ValueError(
+ f"Table {table_name} is not an existing table. If a new table needs to be created, provide 'new_table_storage_path' value."
+ )
+ try:
+ three_part_namesplit = table_name.split(".")
+ if len(three_part_namesplit) != 3 or not all(three_part_namesplit):
+ raise ValueError(
+ f"Expected table name to be in the format of 'catalog.schema.table', received: {table_name}"
+ )
+
+ params = {
+ "catalog_name": three_part_namesplit[0],
+ "schema_name": three_part_namesplit[1],
+ "name": three_part_namesplit[2],
+ "columns": None,
+ "data_source_format": "DELTA",
+ "table_type": "EXTERNAL",
+ "storage_location": new_table_storage_path,
+ "comment": None,
+ }
+
+ table_info = self._client.tables.create(**params)
+ except Exception as e:
+ raise Exception(f"An error occurred while registering the table in Unity Catalog: {e}")
+
table_id = table_info.table_id
storage_location = table_info.storage_location
-
# Grab credentials from Unity catalog and place it into the Table
- temp_table_credentials = self._client.temporary_table_credentials.create(operation="READ", table_id=table_id)
+ temp_table_credentials = self._client.temporary_table_credentials.create(
+ operation="READ_WRITE", table_id=table_id
+ )
scheme = urlparse(storage_location).scheme
if scheme == "s3" or scheme == "s3a":
diff --git a/requirements-dev.txt b/requirements-dev.txt
index 0d47741772..0950a00cf6 100644
--- a/requirements-dev.txt
+++ b/requirements-dev.txt
@@ -7,6 +7,9 @@ docker
# Pinned requests due to docker-py issue: https://github.com/docker/docker-py/issues/3256
requests<2.32.0
+# Pinned httpx due to unitycatalog-python issue: https://github.com/unitycatalog/unitycatalog-python/issues/9
+httpx==0.27.2
+
# Tracing
orjson==3.10.12 # orjson recommended for viztracer
py-spy==0.3.14