From dd2dc23a8f6c38c4ac1caf79c0edae29252c7276 Mon Sep 17 00:00:00 2001 From: Anil Menon Date: Wed, 11 Dec 2024 20:59:22 +0100 Subject: [PATCH] feat: Unity Catalog writes using `daft.DataFrame.write_deltalake()` (#3522) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This pull request covers the below 4 workflows that were tested internally (on Databricks on Azure and AWS) after building the package in a local environment: - Load existing table in Unity Catalog and append to it without schema change : `df.write_deltalake( uc_table, mode=‘append’)` to existing table in UC retrieved using `unity.load_table(table_name)` - Load existing table in Unity Catalog and overwrite it without schema change : `df.write_deltalake( uc_table, mode=‘overwrite’)` overwrite existing table in UC retrieved using `unity.load_table(table_name)` - Load existing table in Unity Catalog and overwrite it with schema change : `df.w rite_deltalake( uc_table, mode=‘overwrite’, schema_mode = ‘overwrite’)` overwrite existing table, with schema change, in UC retrieved using `unity.load_table(table_name)` - Create new table in Unity Catalog using Daft engine and populate it with data : Register a new table in UC without any schema using `unity.load_table(table_name, storage_path=“”)` and `df.write_deltalake( uc_table, mode=‘overwrite’ , schema_mode = ‘overwrite’)` A few notes : - `deltalake` (0.22.3) does not support writes to table with Deletion vectors enabled. For appends to existing table, to avoid `CommitFailedError: Unsupported reader features required: [DeletionVectors]`, ensure the tables being written to do not have Deletion vector enabled. - `httpx==0.27.2` pinned dependency is due to a defect with unitycatalog-python, which is affecting Daft as well for all the previous versions. Fixing it from this PR. - If schema updates are performed by Daft, readers will immediately see the new schema since Delta log is self-containing. However, in Unity Catalog UI for the schema to update, will need to use `REPAIR TABLE catalog.schema.table SYNC METADATA;` from Databricks compute to update UC metadata to match what is in Delta log. - In this version, append to an existing table after changing schema is not supported. Only overwrites are supported. - For AWS, needed to set environment variable using `export AWS_S3_ALLOW_UNSAFE_RENAME=true`. - There appears to be a defect with the `allow_unsafe_rename` parameter in df.write_deltalake as it did not work during internal testing. This could be a new issue to log , once confirmed. --------- Co-authored-by: Kev Wang --- daft/dataframe/dataframe.py | 15 +++++++-- daft/unity_catalog/unity_catalog.py | 50 ++++++++++++++++++++++++++--- requirements-dev.txt | 3 ++ 3 files changed, 61 insertions(+), 7 deletions(-) diff --git a/daft/dataframe/dataframe.py b/daft/dataframe/dataframe.py index eb07238ceb..220c7787d8 100644 --- a/daft/dataframe/dataframe.py +++ b/daft/dataframe/dataframe.py @@ -53,6 +53,7 @@ import torch from daft.io import DataCatalogTable + from daft.unity_catalog import UnityCatalogTable from daft.logical.schema import Schema @@ -826,7 +827,7 @@ def write_iceberg(self, table: "pyiceberg.table.Table", mode: str = "append") -> @DataframePublicAPI def write_deltalake( self, - table: Union[str, pathlib.Path, "DataCatalogTable", "deltalake.DeltaTable"], + table: Union[str, pathlib.Path, "DataCatalogTable", "deltalake.DeltaTable", "UnityCatalogTable"], partition_cols: Optional[List[str]] = None, mode: Literal["append", "overwrite", "error", "ignore"] = "append", schema_mode: Optional[Literal["merge", "overwrite"]] = None, @@ -844,7 +845,7 @@ def write_deltalake( This call is **blocking** and will execute the DataFrame when called Args: - table (Union[str, pathlib.Path, DataCatalogTable, deltalake.DeltaTable]): Destination `Delta Lake Table `__ or table URI to write dataframe to. + table (Union[str, pathlib.Path, DataCatalogTable, deltalake.DeltaTable, UnityCatalogTable]): Destination `Delta Lake Table `__ or table URI to write dataframe to. partition_cols (List[str], optional): How to subpartition each partition further. If table exists, expected to match table's existing partitioning scheme, otherwise creates the table with specified partition columns. Defaults to None. mode (str, optional): Operation mode of the write. `append` will add new data, `overwrite` will replace table with new data, `error` will raise an error if table already exists, and `ignore` will not write anything if table already exists. Defaults to "append". schema_mode (str, optional): Schema mode of the write. If set to `overwrite`, allows replacing the schema of the table when doing `mode=overwrite`. Schema mode `merge` is currently not supported. @@ -872,6 +873,7 @@ def write_deltalake( from daft.io import DataCatalogTable from daft.io._deltalake import large_dtypes_kwargs from daft.io.object_store_options import io_config_to_storage_options + from daft.unity_catalog import UnityCatalogTable if schema_mode == "merge": raise ValueError("Schema mode' merge' is not currently supported for write_deltalake.") @@ -881,14 +883,21 @@ def write_deltalake( io_config = get_context().daft_planning_config.default_io_config if io_config is None else io_config - if isinstance(table, (str, pathlib.Path, DataCatalogTable)): + if isinstance(table, (str, pathlib.Path, DataCatalogTable, UnityCatalogTable)): if isinstance(table, str): table_uri = table elif isinstance(table, pathlib.Path): table_uri = str(table) + elif isinstance(table, UnityCatalogTable): + table_uri = table.table_uri + io_config = table.io_config else: table_uri = table.table_uri(io_config) + if io_config is None: + raise ValueError( + "io_config was not provided to write_deltalake and could not be retrieved from the default configuration." + ) storage_options = io_config_to_storage_options(io_config, table_uri) or {} table = try_get_deltatable(table_uri, storage_options=storage_options) elif isinstance(table, deltalake.DeltaTable): diff --git a/daft/unity_catalog/unity_catalog.py b/daft/unity_catalog/unity_catalog.py index f22e310193..8dc426fd06 100644 --- a/daft/unity_catalog/unity_catalog.py +++ b/daft/unity_catalog/unity_catalog.py @@ -90,14 +90,56 @@ def _paginated_list_tables(client: unitycatalog.Unitycatalog, page_token: str | return self._paginate_to_completion(_paginated_list_tables) - def load_table(self, table_name: str) -> UnityCatalogTable: + def load_table(self, table_name: str, new_table_storage_path: str | None = None) -> UnityCatalogTable: + """Loads an existing Unity Catalog table. If the table is not found, and information is provided in the method to create a new table, a new table will be attempted to be registered. + + Args: + table_name (str): Name of the table in Unity Catalog in the form of dot-separated, 3-level namespace + new_table_storage_path (str, optional): Cloud storage path URI to register a new external table using this path. Unity Catalog will validate if the path is valid and authorized for the principal, else will raise an exception. + + Returns: + UnityCatalogTable + """ # Load the table ID - table_info = self._client.tables.retrieve(table_name) + try: + table_info = self._client.tables.retrieve(table_name) + if new_table_storage_path: + warnings.warn( + f"Table {table_name} is an existing storage table with a valid storage path. The 'new_table_storage_path' argument provided will be ignored." + ) + except unitycatalog.NotFoundError: + if not new_table_storage_path: + raise ValueError( + f"Table {table_name} is not an existing table. If a new table needs to be created, provide 'new_table_storage_path' value." + ) + try: + three_part_namesplit = table_name.split(".") + if len(three_part_namesplit) != 3 or not all(three_part_namesplit): + raise ValueError( + f"Expected table name to be in the format of 'catalog.schema.table', received: {table_name}" + ) + + params = { + "catalog_name": three_part_namesplit[0], + "schema_name": three_part_namesplit[1], + "name": three_part_namesplit[2], + "columns": None, + "data_source_format": "DELTA", + "table_type": "EXTERNAL", + "storage_location": new_table_storage_path, + "comment": None, + } + + table_info = self._client.tables.create(**params) + except Exception as e: + raise Exception(f"An error occurred while registering the table in Unity Catalog: {e}") + table_id = table_info.table_id storage_location = table_info.storage_location - # Grab credentials from Unity catalog and place it into the Table - temp_table_credentials = self._client.temporary_table_credentials.create(operation="READ", table_id=table_id) + temp_table_credentials = self._client.temporary_table_credentials.create( + operation="READ_WRITE", table_id=table_id + ) scheme = urlparse(storage_location).scheme if scheme == "s3" or scheme == "s3a": diff --git a/requirements-dev.txt b/requirements-dev.txt index 0d47741772..0950a00cf6 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -7,6 +7,9 @@ docker # Pinned requests due to docker-py issue: https://github.com/docker/docker-py/issues/3256 requests<2.32.0 +# Pinned httpx due to unitycatalog-python issue: https://github.com/unitycatalog/unitycatalog-python/issues/9 +httpx==0.27.2 + # Tracing orjson==3.10.12 # orjson recommended for viztracer py-spy==0.3.14