From e705ca4a5c60be542e296372b3e557f44d191ea0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9B=BE=E6=96=B0=E4=BA=AE?= Date: Wed, 12 Mar 2025 15:40:00 +0800 Subject: [PATCH 1/3] [KYUUBI #5376] Change the GetSchemas and GetTables functions on kyuubi to use DDL to obtain information, thereby resolving the issue of DBeaver bypassing Ranger's permission restrictions. The ranger plugin has been enabled to control permissions. When using DBeaver to connect to Kyuubi, users without permission for the database and tables can still see the database and tables on the left side. After applying this modification, the access to the database tables can be controlled through the Ranger plugin. When users use DBeaver, they can only see the database tables for which they have permissions in the "Tables" column on the left side. Use the ranger plugin of Kyuubi for authorization, and use DBeaver to connect to Kyuubi for testing. No --- .../engine/spark/operation/GetTables.scala | 11 +- .../engine/spark/util/SparkCatalogUtils.scala | 136 +++++++----------- 2 files changed, 52 insertions(+), 95 deletions(-) diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetTables.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetTables.scala index 75ce9492176..3115470df46 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetTables.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetTables.scala @@ -77,17 +77,8 @@ class GetTables( catalog, schemaPattern, tablePattern, - tableTypes, ignoreTableProperties) - - val allTableAndViews = - if (tableTypes.exists("VIEW".equalsIgnoreCase)) { - catalogTablesAndViews ++ - SparkCatalogUtils.getTempViews(spark, catalog, schemaPattern, tablePattern) - } else { - catalogTablesAndViews - } - iter = new IterableFetchIterator(allTableAndViews) + iter = new IterableFetchIterator(catalogTablesAndViews) } catch { onError() } diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/util/SparkCatalogUtils.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/util/SparkCatalogUtils.scala index b9a5028acdc..349b8d819e5 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/util/SparkCatalogUtils.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/util/SparkCatalogUtils.scala @@ -24,7 +24,6 @@ import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.connector.catalog.{CatalogExtension, CatalogPlugin, SupportsNamespaces, TableCatalog} import org.apache.spark.sql.types.StructField - import org.apache.kyuubi.Logging import org.apache.kyuubi.engine.spark.schema.SchemaHelper import org.apache.kyuubi.util.reflect.ReflectUtils._ @@ -89,14 +88,12 @@ object SparkCatalogUtils extends Logging { spark: SparkSession, catalogName: String, schemaPattern: String): Seq[Row] = { - if (catalogName == SparkCatalogUtils.SESSION_CATALOG) { - (spark.sessionState.catalog.listDatabases(schemaPattern) ++ - getGlobalTempViewManager(spark, schemaPattern)) - .map(Row(_, SparkCatalogUtils.SESSION_CATALOG)) - } else { - val catalog = getCatalog(spark, catalogName) - getSchemasWithPattern(catalog, schemaPattern).map(Row(_, catalog.name)) - } + val catalog = getCatalog(spark, catalogName) + val showSql = s"SHOW DATABASES IN ${catalog.name} LIKE '${schemaPattern}'" + val databaseResult = spark.sql(showSql) + val databases = databaseResult.collect().toSeq + val globalTempViewsAsRows = getGlobalTempViewManager(spark, schemaPattern).map(Row(_)) + (databases ++ globalTempViewsAsRows).map(row => Row(row(0), catalog.name)) } private def getGlobalTempViewManager( @@ -157,71 +154,63 @@ object SparkCatalogUtils extends Logging { catalogName: String, schemaPattern: String, tablePattern: String, - tableTypes: Set[String], ignoreTableProperties: Boolean = false): Seq[Row] = { val catalog = getCatalog(spark, catalogName) - val namespaces = listNamespacesWithPattern(catalog, schemaPattern) - catalog match { - case builtin if builtin.name() == SESSION_CATALOG => - val sessionCatalog = spark.sessionState.catalog - val databases = sessionCatalog.listDatabases(schemaPattern) - - def isMatchedTableType(tableTypes: Set[String], tableType: String): Boolean = { - val typ = if (tableType.equalsIgnoreCase(VIEW)) VIEW else TABLE - tableTypes.exists(typ.equalsIgnoreCase) - } - - databases.flatMap { db => - val identifiers = - sessionCatalog.listTables(db, tablePattern, includeLocalTempViews = false) - if (ignoreTableProperties) { - identifiers.map { ti: TableIdentifier => + val databases = getSchemas(spark, catalog.name, schemaPattern) + databases.flatMap { + row => + try { + val database = row.getString(0) + val catalogRes = row.getString(1) + val showSql = s"SHOW TABLES IN ${catalogRes}.${database} LIKE '${tablePattern}'" + val tables = spark.sql(showSql).collect().toSeq + tables.map { row => + val table = row.getString(1) + if (ignoreTableProperties) { Row( - catalogName, - ti.database.getOrElse("default"), - ti.table, - TABLE, // ignore tableTypes criteria and simply treat all table type as TABLE + catalogRes, + database, + table, + TABLE, "", null, null, null, null, null) - } - } else { - sessionCatalog.getTablesByName(identifiers) - .filter(t => isMatchedTableType(tableTypes, t.tableType.name)).map { t => - val typ = if (t.tableType.name == VIEW) VIEW else TABLE - Row( - catalogName, - t.database, - t.identifier.table, - typ, - t.comment.getOrElse(""), - null, - null, - null, - null, - null) + } else { + val descSql = s"DESC EXTENDED ${catalogRes}.${database}.${table}" + val tblInfo = spark.sql(descSql).collect + var tblType = "" + var comment = "" + tblInfo.foreach { row => + val elements = row.toSeq + if (elements.length >= 2 && elements(0).toString.equalsIgnoreCase("type")) { + tblType = elements(1).toString + } + if (elements.length >= 2 && elements(0).toString.equalsIgnoreCase("comment")) { + comment = elements(1).toString + } } + val typ = if (tblType.equalsIgnoreCase(VIEW)) VIEW else TABLE + Row( + catalogRes, + database, + table, + typ, + comment, + null, + null, + null, + null, + null) + } } + } catch { + case e: Exception => + error(s"Failed to get tables from catalog $catalogName", e) + Seq.empty[Row] } - case tc: TableCatalog => - val tp = tablePattern.r.pattern - val identifiers = namespaces.flatMap { ns => - tc.listTables(ns).filter(i => tp.matcher(quoteIfNeeded(i.name())).matches()) - } - identifiers.map { ident => - // TODO: restore view type for session catalog - val comment = if (ignoreTableProperties) "" - else { // load table is a time consuming operation - tc.loadTable(ident).properties().getOrDefault(TableCatalog.PROP_COMMENT, "") - } - val schema = ident.namespace().map(quoteIfNeeded).mkString(".") - val tableName = quoteIfNeeded(ident.name()) - Row(catalog.name(), schema, tableName, TABLE, comment, null, null, null, null, null) - } - case _ => Seq.empty[Row] } } @@ -264,29 +253,6 @@ object SparkCatalogUtils extends Logging { } } - def getTempViews( - spark: SparkSession, - catalogName: String, - schemaPattern: String, - tablePattern: String): Seq[Row] = { - val views = getViews(spark, schemaPattern, tablePattern) - views.map { ident => - Row(catalogName, ident.database.orNull, ident.table, VIEW, "", null, null, null, null, null) - } - } - - private def getViews( - spark: SparkSession, - schemaPattern: String, - tablePattern: String): Seq[TableIdentifier] = { - val db = getGlobalTempViewManager(spark, schemaPattern) - if (db.nonEmpty) { - spark.sessionState.catalog.listTables(db.head, tablePattern) - } else { - spark.sessionState.catalog.listLocalTempViews(tablePattern) - } - } - // /////////////////////////////////////////////////////////////////////////////////////////////// // Columns // // /////////////////////////////////////////////////////////////////////////////////////////////// From 7b73e90f7e3361105742a55ffc7dd003fe5c15f1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9B=BE=E6=96=B0=E4=BA=AE?= Date: Wed, 12 Mar 2025 18:04:09 +0800 Subject: [PATCH 2/3] remove unused import --- .../org/apache/kyuubi/engine/spark/util/SparkCatalogUtils.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/util/SparkCatalogUtils.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/util/SparkCatalogUtils.scala index 349b8d819e5..2e51236fa8f 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/util/SparkCatalogUtils.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/util/SparkCatalogUtils.scala @@ -21,7 +21,6 @@ import java.util.regex.Pattern import org.apache.commons.lang3.StringUtils import org.apache.spark.sql.{Row, SparkSession} -import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.connector.catalog.{CatalogExtension, CatalogPlugin, SupportsNamespaces, TableCatalog} import org.apache.spark.sql.types.StructField import org.apache.kyuubi.Logging From 2ffe6d64436b7159b84fb80fbbba7af3c69e0064 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9B=BE=E6=96=B0=E4=BA=AE?= Date: Thu, 13 Mar 2025 15:59:27 +0800 Subject: [PATCH 3/3] add one blank line --- .../org/apache/kyuubi/engine/spark/util/SparkCatalogUtils.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/util/SparkCatalogUtils.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/util/SparkCatalogUtils.scala index 2e51236fa8f..e2f501c6ae1 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/util/SparkCatalogUtils.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/util/SparkCatalogUtils.scala @@ -23,6 +23,7 @@ import org.apache.commons.lang3.StringUtils import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.connector.catalog.{CatalogExtension, CatalogPlugin, SupportsNamespaces, TableCatalog} import org.apache.spark.sql.types.StructField + import org.apache.kyuubi.Logging import org.apache.kyuubi.engine.spark.schema.SchemaHelper import org.apache.kyuubi.util.reflect.ReflectUtils._