From 48518e1a224139a7e1c6f6f3d02eac8d4c2642bd Mon Sep 17 00:00:00 2001 From: Ryanne Dolan Date: Mon, 23 Feb 2026 21:02:35 -0600 Subject: [PATCH 1/2] Add CREATE TABLE ... WITH (...) support for ad-hoc K8s-stored tables Enable users to define tables with explicit connector configuration that are stored as Table CRDs in Kubernetes, without requiring a pre-existing database or catalog entry. This is similar to Flink's CREATE TABLE. Syntax: CREATE TABLE foo.bar (id INT, name VARCHAR) WITH (connector 'kafka', topic 'my-topic') Key changes: - Custom SqlCreateTable replacing Calcite's built-in (adds options field) - Grammar/parser updated to accept WITH clause on CREATE TABLE - TableSource extends Source to carry column definitions + options - Table CRD (tables.crd.yaml) stores columns, options, and schema path - K8sStoredTable + StoredTableScanRule enables pure K8s tables in the planner pipeline without requiring a backing JDBC database - K8sConnector merges stored options with template-derived config (WITH options take precedence) - Fix !pipeline command to use query() for plain SELECTs (no sink) Safe to deploy without the Table CRD installed -- only CREATE TABLE WITH will fail; all other functionality is unaffected. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../hoptimator/ConnectorConfigurable.java | 17 ++ .../com/linkedin/hoptimator/TableSource.java | 61 +++++++ .../java/sqlline/HoptimatorAppConfig.java | 6 +- hoptimator-jdbc/src/main/codegen/config.fmpp | 1 + .../src/main/codegen/includes/parserImpls.ftl | 4 +- .../jdbc/HoptimatorDdlExecutor.java | 18 +- .../jdbc/ddl/HoptimatorDdlParserImpl.java | 11 +- .../hoptimator/jdbc/ddl/SqlCreateTable.java | 94 ++++++++++ .../hoptimator/jdbc/TestBasicSql.java | 6 + .../hoptimator/k8s/K8sApiEndpoints.java | 5 + .../linkedin/hoptimator/k8s/K8sCatalog.java | 1 + .../linkedin/hoptimator/k8s/K8sConnector.java | 4 + .../hoptimator/k8s/K8sDeployerProvider.java | 4 + .../linkedin/hoptimator/k8s/K8sMetadata.java | 7 + .../hoptimator/k8s/K8sStoredTable.java | 91 ++++++++++ .../hoptimator/k8s/K8sTableDeployer.java | 48 ++++++ .../hoptimator/k8s/K8sTableTable.java | 153 ++++++++++++++++ .../hoptimator/k8s/models/V1alpha1Table.java | 144 ++++++++++++++++ .../k8s/models/V1alpha1TableList.java | 130 ++++++++++++++ .../k8s/models/V1alpha1TableSpec.java | 163 ++++++++++++++++++ .../k8s/models/V1alpha1TableSpecColumn.java | 101 +++++++++++ .../k8s/models/V1alpha1TableStatus.java | 83 +++++++++ .../src/main/resources/tables.crd.yaml | 103 +++++++++++ .../util/planner/PipelineRules.java | 56 +++++- 24 files changed, 1302 insertions(+), 9 deletions(-) create mode 100644 hoptimator-api/src/main/java/com/linkedin/hoptimator/ConnectorConfigurable.java create mode 100644 hoptimator-api/src/main/java/com/linkedin/hoptimator/TableSource.java create mode 100644 hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/ddl/SqlCreateTable.java create mode 100644 hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sStoredTable.java create mode 100644 hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sTableDeployer.java create mode 100644 hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sTableTable.java create mode 100644 hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1Table.java create mode 100644 hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1TableList.java create mode 100644 hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1TableSpec.java create mode 100644 hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1TableSpecColumn.java create mode 100644 hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1TableStatus.java create mode 100644 hoptimator-k8s/src/main/resources/tables.crd.yaml diff --git a/hoptimator-api/src/main/java/com/linkedin/hoptimator/ConnectorConfigurable.java b/hoptimator-api/src/main/java/com/linkedin/hoptimator/ConnectorConfigurable.java new file mode 100644 index 00000000..57263345 --- /dev/null +++ b/hoptimator-api/src/main/java/com/linkedin/hoptimator/ConnectorConfigurable.java @@ -0,0 +1,17 @@ +package com.linkedin.hoptimator; + +import java.util.Map; + +/** + * Interface for tables that carry connector configuration. + * This lets planner rules detect K8s-stored tables without + * depending on hoptimator-k8s. + */ +public interface ConnectorConfigurable { + + /** Returns the connector options (WITH clause values). */ + Map connectorOptions(); + + /** Returns the internal database name for this table. */ + String databaseName(); +} diff --git a/hoptimator-api/src/main/java/com/linkedin/hoptimator/TableSource.java b/hoptimator-api/src/main/java/com/linkedin/hoptimator/TableSource.java new file mode 100644 index 00000000..482ab7a6 --- /dev/null +++ b/hoptimator-api/src/main/java/com/linkedin/hoptimator/TableSource.java @@ -0,0 +1,61 @@ +package com.linkedin.hoptimator; + +import java.util.Collections; +import java.util.List; +import java.util.Map; + +/** + * A Source that carries column definitions alongside options. + * Used for CREATE TABLE ... WITH (...) to store ad-hoc table definitions in K8s. + */ +public class TableSource extends Source { + + private final List columns; + + public TableSource(String database, List path, Map options, + List columns) { + super(database, path, options); + this.columns = columns != null ? Collections.unmodifiableList(columns) : Collections.emptyList(); + } + + public List columns() { + return columns; + } + + /** + * Represents a column definition from a CREATE TABLE statement. + */ + public static class ColumnDefinition { + private final String name; + private final String typeName; + private final boolean nullable; + + public ColumnDefinition(String name, String typeName, boolean nullable) { + this.name = name; + this.typeName = typeName; + this.nullable = nullable; + } + + public String name() { + return name; + } + + public String typeName() { + return typeName; + } + + public boolean nullable() { + return nullable; + } + + @Override + public String toString() { + return name + " " + typeName + (nullable ? "" : " NOT NULL"); + } + } + + @Override + public String toString() { + return "TableSource[" + pathString() + "]"; + } +} diff --git a/hoptimator-cli/src/main/java/sqlline/HoptimatorAppConfig.java b/hoptimator-cli/src/main/java/sqlline/HoptimatorAppConfig.java index 838e6744..444c9d0b 100644 --- a/hoptimator-cli/src/main/java/sqlline/HoptimatorAppConfig.java +++ b/hoptimator-cli/src/main/java/sqlline/HoptimatorAppConfig.java @@ -135,7 +135,11 @@ public void execute(String line, DispatchCallback dispatchCallback) { schemaSnapshot = HoptimatorDdlUtils.snapshotAndSetSinkSchema(conn.createPrepareContext(), new HoptimatorDriver.Prepare(conn), plan, create, querySql); } - sqlline.output(plan.sql(conn).apply(SqlDialect.ANSI)); + if (create != null) { + sqlline.output(plan.sql(conn).apply(SqlDialect.ANSI)); + } else { + sqlline.output(plan.query(conn).apply(SqlDialect.ANSI)); + } } catch (SQLException e) { sqlline.error(e); dispatchCallback.setToFailure(); diff --git a/hoptimator-jdbc/src/main/codegen/config.fmpp b/hoptimator-jdbc/src/main/codegen/config.fmpp index c136a8d6..8df8201d 100644 --- a/hoptimator-jdbc/src/main/codegen/config.fmpp +++ b/hoptimator-jdbc/src/main/codegen/config.fmpp @@ -34,6 +34,7 @@ data: { "org.apache.calcite.sql.ddl.SqlDdlNodes" "com.linkedin.hoptimator.jdbc.ddl.SqlCreateFunction" "com.linkedin.hoptimator.jdbc.ddl.SqlCreateMaterializedView" + "com.linkedin.hoptimator.jdbc.ddl.SqlCreateTable" "com.linkedin.hoptimator.jdbc.ddl.SqlCreateTrigger" "com.linkedin.hoptimator.jdbc.ddl.SqlFire" "com.linkedin.hoptimator.jdbc.ddl.SqlFireMaterializedView" diff --git a/hoptimator-jdbc/src/main/codegen/includes/parserImpls.ftl b/hoptimator-jdbc/src/main/codegen/includes/parserImpls.ftl index e2a3fad5..40c80ee3 100644 --- a/hoptimator-jdbc/src/main/codegen/includes/parserImpls.ftl +++ b/hoptimator-jdbc/src/main/codegen/includes/parserImpls.ftl @@ -192,6 +192,7 @@ SqlCreate SqlCreateTable(Span s, boolean replace) : final boolean ifNotExists; final SqlIdentifier id; SqlNodeList tableElementList = null; + SqlNodeList optionList = null; SqlNode query = null; SqlCreate createTableLike = null; @@ -204,9 +205,10 @@ SqlCreate SqlCreateTable(Span s, boolean replace) : } | [ tableElementList = TableElementList() ] + [ optionList = Options() ] [ query = OrderedQueryOrExpr(ExprContext.ACCEPT_QUERY) ] { - return SqlDdlNodes.createTable(s.end(this), replace, ifNotExists, id, tableElementList, query); + return new SqlCreateTable(s.end(this), replace, ifNotExists, id, tableElementList, optionList, query); } ) } diff --git a/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/HoptimatorDdlExecutor.java b/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/HoptimatorDdlExecutor.java index d42f1217..0e3bc58f 100644 --- a/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/HoptimatorDdlExecutor.java +++ b/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/HoptimatorDdlExecutor.java @@ -25,6 +25,7 @@ import com.linkedin.hoptimator.MaterializedView; import com.linkedin.hoptimator.Pipeline; import com.linkedin.hoptimator.Source; +import com.linkedin.hoptimator.TableSource; import com.linkedin.hoptimator.Trigger; import com.linkedin.hoptimator.UserJob; import com.linkedin.hoptimator.View; @@ -70,7 +71,7 @@ import org.apache.calcite.sql.SqlLiteral; import org.apache.calcite.sql.SqlNode; import org.apache.calcite.sql.ddl.SqlColumnDeclaration; -import org.apache.calcite.sql.ddl.SqlCreateTable; +import com.linkedin.hoptimator.jdbc.ddl.SqlCreateTable; import org.apache.calcite.sql.ddl.SqlCreateView; import org.apache.calcite.sql.ddl.SqlDropMaterializedView; import org.apache.calcite.sql.ddl.SqlDropObject; @@ -461,7 +462,20 @@ public void execute(SqlCreateTable create, CalcitePrepare.Context context) { List tablePath = new ArrayList<>(schemaPath); tablePath.add(tableName); - Source source = new Source(database, tablePath, Collections.emptyMap()); + Map options = HoptimatorDdlUtils.options(create.options); + Source source; + if (!options.isEmpty()) { + List columnDefs = new ArrayList<>(); + for (org.apache.calcite.rel.type.RelDataTypeField field : rowType.getFieldList()) { + columnDefs.add(new TableSource.ColumnDefinition( + field.getName(), + field.getType().getSqlTypeName().getName(), + field.getType().isNullable())); + } + source = new TableSource(database, tablePath, options, columnDefs); + } else { + source = new Source(database, tablePath, Collections.emptyMap()); + } logger.info("Validating new table {}", source); ValidationService.validateOrThrow(source); deployers = DeploymentService.deployers(source, connection); diff --git a/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/ddl/HoptimatorDdlParserImpl.java b/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/ddl/HoptimatorDdlParserImpl.java index 60c682ca..f7bcc62d 100644 --- a/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/ddl/HoptimatorDdlParserImpl.java +++ b/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/ddl/HoptimatorDdlParserImpl.java @@ -9,6 +9,7 @@ import org.apache.calcite.sql.ddl.SqlDdlNodes; import com.linkedin.hoptimator.jdbc.ddl.SqlCreateFunction; import com.linkedin.hoptimator.jdbc.ddl.SqlCreateMaterializedView; +import com.linkedin.hoptimator.jdbc.ddl.SqlCreateTable; import com.linkedin.hoptimator.jdbc.ddl.SqlCreateTrigger; import com.linkedin.hoptimator.jdbc.ddl.SqlFire; import com.linkedin.hoptimator.jdbc.ddl.SqlFireMaterializedView; @@ -6126,6 +6127,7 @@ final public SqlCreate SqlCreateTable(Span s, boolean replace) throws ParseExcep final boolean ifNotExists; final SqlIdentifier id; SqlNodeList tableElementList = null; + SqlNodeList optionList = null; SqlNode query = null; SqlCreate createTableLike = null; @@ -6149,6 +6151,13 @@ final public SqlCreate SqlCreateTable(Span s, boolean replace) throws ParseExcep ; } switch ((jj_ntk==-1)?jj_ntk():jj_ntk) { + case WITH: + optionList = Options(); + break; + default: + ; + } + switch ((jj_ntk==-1)?jj_ntk():jj_ntk) { case AS: jj_consume_token(AS); query = OrderedQueryOrExpr(ExprContext.ACCEPT_QUERY); @@ -6157,7 +6166,7 @@ final public SqlCreate SqlCreateTable(Span s, boolean replace) throws ParseExcep jj_la1[45] = jj_gen; ; } - {if (true) return SqlDdlNodes.createTable(s.end(this), replace, ifNotExists, id, tableElementList, query);} + {if (true) return new SqlCreateTable(s.end(this), replace, ifNotExists, id, tableElementList, optionList, query);} } throw new Error("Missing return statement in function"); } diff --git a/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/ddl/SqlCreateTable.java b/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/ddl/SqlCreateTable.java new file mode 100644 index 00000000..4da16c58 --- /dev/null +++ b/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/ddl/SqlCreateTable.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.linkedin.hoptimator.jdbc.ddl; + +import org.apache.calcite.sql.SqlCreate; +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlNodeList; +import org.apache.calcite.sql.SqlOperator; +import org.apache.calcite.sql.SqlSpecialOperator; +import org.apache.calcite.sql.SqlWriter; +import org.apache.calcite.sql.parser.SqlParserPos; +import org.apache.calcite.util.ImmutableNullableList; + +import org.checkerframework.checker.nullness.qual.Nullable; + +import java.util.List; + +import static java.util.Objects.requireNonNull; + +/** + * Parse tree for {@code CREATE TABLE} statement with optional {@code WITH} options. + */ +public class SqlCreateTable extends SqlCreate { + public final SqlIdentifier name; + public final @Nullable SqlNodeList columnList; + public final @Nullable SqlNodeList options; + public final @Nullable SqlNode query; + public final boolean ifNotExists; + + private static final SqlOperator OPERATOR = + new SqlSpecialOperator("CREATE TABLE", SqlKind.CREATE_TABLE); + + public SqlCreateTable(SqlParserPos pos, boolean replace, + boolean ifNotExists, SqlIdentifier name, @Nullable SqlNodeList columnList, + @Nullable SqlNodeList options, @Nullable SqlNode query) { + super(OPERATOR, pos, replace, ifNotExists); + this.name = requireNonNull(name, "name"); + this.columnList = columnList; + this.options = options; + this.query = query; + this.ifNotExists = ifNotExists; + } + + @SuppressWarnings("nullness") + @Override public List getOperandList() { + return ImmutableNullableList.of(name, columnList, options, query); + } + + @Override public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { + writer.keyword("CREATE"); + writer.keyword("TABLE"); + if (ifNotExists) { + writer.keyword("IF NOT EXISTS"); + } + name.unparse(writer, leftPrec, rightPrec); + if (columnList != null) { + SqlWriter.Frame frame = writer.startList("(", ")"); + for (SqlNode c : columnList) { + writer.sep(","); + c.unparse(writer, 0, 0); + } + writer.endList(frame); + } + if (options != null) { + writer.keyword("WITH"); + SqlWriter.Frame frame = writer.startList("(", ")"); + for (SqlNode c : options) { + writer.sep(","); + c.unparse(writer, 0, 0); + } + writer.endList(frame); + } + if (query != null) { + writer.keyword("AS"); + query.unparse(writer, 0, 0); + } + } +} diff --git a/hoptimator-jdbc/src/test/java/com/linkedin/hoptimator/jdbc/TestBasicSql.java b/hoptimator-jdbc/src/test/java/com/linkedin/hoptimator/jdbc/TestBasicSql.java index 06d7d4e8..3f231110 100644 --- a/hoptimator-jdbc/src/test/java/com/linkedin/hoptimator/jdbc/TestBasicSql.java +++ b/hoptimator-jdbc/src/test/java/com/linkedin/hoptimator/jdbc/TestBasicSql.java @@ -62,6 +62,12 @@ public void createView() throws Exception { Assertions.assertEquals(expectedLogs, logs); } + @Test + public void createTableWithOptions() throws Exception { + sql("CREATE TABLE T (X VARCHAR, Y VARCHAR) WITH (connector 'kafka', topic 'test')"); + sql("DROP TABLE T"); + } + @Test public void dropNonExistentViewHandlesNullSchema() throws Exception { // Should not throw when using IF EXISTS on a non-existent view diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sApiEndpoints.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sApiEndpoints.java index ff5fcccb..6e4dba89 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sApiEndpoints.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sApiEndpoints.java @@ -17,6 +17,8 @@ import com.linkedin.hoptimator.k8s.models.V1alpha1JobTemplateList; import com.linkedin.hoptimator.k8s.models.V1alpha1Pipeline; import com.linkedin.hoptimator.k8s.models.V1alpha1PipelineList; +import com.linkedin.hoptimator.k8s.models.V1alpha1Table; +import com.linkedin.hoptimator.k8s.models.V1alpha1TableList; import com.linkedin.hoptimator.k8s.models.V1alpha1TableTemplate; import com.linkedin.hoptimator.k8s.models.V1alpha1TableTemplateList; import com.linkedin.hoptimator.k8s.models.V1alpha1TableTrigger; @@ -52,6 +54,9 @@ public final class K8sApiEndpoints { public static final K8sApiEndpoint VIEWS = new K8sApiEndpoint<>("View", "hoptimator.linkedin.com", "v1alpha1", "views", false, V1alpha1View.class, V1alpha1ViewList.class); + public static final K8sApiEndpoint TABLES = + new K8sApiEndpoint<>("Table", "hoptimator.linkedin.com", "v1alpha1", "tables", false, + V1alpha1Table.class, V1alpha1TableList.class); public static final K8sApiEndpoint TABLE_TEMPLATES = new K8sApiEndpoint<>("TableTemplate", "hoptimator.linkedin.com", "v1alpha1", "tabletemplates", false, V1alpha1TableTemplate.class, V1alpha1TableTemplateList.class); diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sCatalog.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sCatalog.java index 01d0ed1a..801fec90 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sCatalog.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sCatalog.java @@ -36,6 +36,7 @@ public void register(Wrapper wrapper) throws SQLException { schemaPlus.add("k8s", metadata); metadata.databaseTable().addDatabases(schemaPlus, conn); metadata.viewTable().addViews(schemaPlus); + metadata.tableTable().addTables(schemaPlus); // TODO: Explore adding back materializations, but they should not cause backend calls, we may need to // introduce schemas into the view object itself such that all tables that make up a view do not need to be diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sConnector.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sConnector.java index 8473a6c4..d63d9c6a 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sConnector.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sConnector.java @@ -91,6 +91,10 @@ public Map configure() throws SQLException { Map map = new LinkedHashMap<>(); props.stringPropertyNames().stream().sorted().forEach(k -> map.put(k, props.getProperty(k))); + // Source options override template-derived config (WITH wins) + if (!source.options().isEmpty()) { + map.putAll(source.options()); + } return map; } diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sDeployerProvider.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sDeployerProvider.java index bf66193a..f0b07c00 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sDeployerProvider.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sDeployerProvider.java @@ -11,6 +11,7 @@ import com.linkedin.hoptimator.Job; import com.linkedin.hoptimator.MaterializedView; import com.linkedin.hoptimator.Source; +import com.linkedin.hoptimator.TableSource; import com.linkedin.hoptimator.Trigger; import com.linkedin.hoptimator.View; @@ -29,6 +30,9 @@ public Collection deployers(T obj, Connection c list.add(new K8sJobDeployer((Job) obj, context)); } else if (obj instanceof Source) { list.add(new K8sSourceDeployer((Source) obj, context)); + if (obj instanceof TableSource) { + list.add(new K8sTableDeployer((TableSource) obj, context)); + } } else if (obj instanceof Trigger) { list.add(new K8sTriggerDeployer((Trigger) obj, context)); } diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sMetadata.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sMetadata.java index 56b5ca17..ef08efd0 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sMetadata.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sMetadata.java @@ -32,6 +32,10 @@ public K8sViewTable viewTable() { return (K8sViewTable) tables().get("VIEWS"); } + public K8sTableTable tableTable() { + return (K8sTableTable) tables().get("TABLES"); + } + @Override public Lookup tables() { return tables.getOrCompute(() -> new LazyTableLookup<>() { @@ -48,6 +52,7 @@ protected Map loadAllTables() { tableMap.put("PIPELINES", new K8sPipelineTable(context)); tableMap.put("PIPELINE_ELEMENTS", new K8sPipelineElementTable(pipelineElementApi)); tableMap.put("PIPELINE_ELEMENT_MAP", new K8sPipelineElementMapTable(pipelineElementMapApi)); + tableMap.put("TABLES", new K8sTableTable(context)); tableMap.put("TABLE_TRIGGERS", new K8sTableTriggerTable(context)); tableMap.put("VIEWS", new K8sViewTable(connection, context)); return tableMap; @@ -70,6 +75,8 @@ protected Map loadAllTables() { K8sPipelineElementApi api = new K8sPipelineElementApi(context); K8sPipelineElementMapApi mapApi = new K8sPipelineElementMapApi(api); return new K8sPipelineElementMapTable(mapApi); + case "TABLES": + return new K8sTableTable(context); case "TABLE_TRIGGERS": return new K8sTableTriggerTable(context); case "VIEWS": diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sStoredTable.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sStoredTable.java new file mode 100644 index 00000000..8ce9d8f2 --- /dev/null +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sStoredTable.java @@ -0,0 +1,91 @@ +package com.linkedin.hoptimator.k8s; + +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.schema.impl.AbstractTable; +import org.apache.calcite.sql.type.SqlTypeName; + +import com.linkedin.hoptimator.ConnectorConfigurable; +import com.linkedin.hoptimator.k8s.models.V1alpha1TableSpecColumn; + + +/** + * A Calcite table backed by a K8s Table CRD. + * Loaded from stored column definitions and connector options. + * Does NOT implement TranslatableTable; Calcite creates a LogicalTableScan, + * which StoredTableScanRule converts to PipelineTableScan. + */ +class K8sStoredTable extends AbstractTable implements ConnectorConfigurable { + + private final String databaseName; + private final List columns; + private final Map options; + + K8sStoredTable(String databaseName, List columns, + Map options) { + this.databaseName = databaseName; + this.columns = columns != null ? columns : Collections.emptyList(); + this.options = options != null ? options : Collections.emptyMap(); + } + + @Override + public RelDataType getRowType(RelDataTypeFactory typeFactory) { + RelDataTypeFactory.Builder builder = typeFactory.builder(); + for (V1alpha1TableSpecColumn col : columns) { + SqlTypeName typeName = mapType(col.getType()); + RelDataType type = typeFactory.createSqlType(typeName); + boolean nullable = col.getNullable() == null || col.getNullable(); + type = typeFactory.createTypeWithNullability(type, nullable); + builder.add(col.getName(), type); + } + return builder.build(); + } + + @Override + public Map connectorOptions() { + return options; + } + + @Override + public String databaseName() { + return databaseName; + } + + @Override + public C unwrap(Class aClass) { + if (aClass.isInstance(this)) { + return aClass.cast(this); + } + return super.unwrap(aClass); + } + + private static SqlTypeName mapType(String typeName) { + if (typeName == null) { + return SqlTypeName.VARCHAR; + } + try { + return SqlTypeName.valueOf(typeName.toUpperCase()); + } catch (IllegalArgumentException e) { + // Handle common aliases + switch (typeName.toUpperCase()) { + case "INT": + return SqlTypeName.INTEGER; + case "STRING": + case "TEXT": + return SqlTypeName.VARCHAR; + case "LONG": + return SqlTypeName.BIGINT; + case "BOOL": + return SqlTypeName.BOOLEAN; + case "BYTES": + return SqlTypeName.VARBINARY; + default: + return SqlTypeName.VARCHAR; + } + } + } +} diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sTableDeployer.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sTableDeployer.java new file mode 100644 index 00000000..4203a172 --- /dev/null +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sTableDeployer.java @@ -0,0 +1,48 @@ +package com.linkedin.hoptimator.k8s; + +import java.sql.SQLException; +import java.util.LinkedList; +import java.util.stream.Collectors; + +import io.kubernetes.client.openapi.models.V1ObjectMeta; + +import com.linkedin.hoptimator.TableSource; +import com.linkedin.hoptimator.k8s.models.V1alpha1Table; +import com.linkedin.hoptimator.k8s.models.V1alpha1TableList; +import com.linkedin.hoptimator.k8s.models.V1alpha1TableSpec; +import com.linkedin.hoptimator.k8s.models.V1alpha1TableSpecColumn; + + +/** Deploys a Table CRD object from a TableSource. */ +class K8sTableDeployer extends K8sDeployer { + + private final TableSource tableSource; + + K8sTableDeployer(TableSource tableSource, K8sContext context) { + super(context, K8sApiEndpoints.TABLES); + this.tableSource = tableSource; + } + + @Override + protected V1alpha1Table toK8sObject() throws SQLException { + String name = K8sUtils.canonicalizeName(tableSource.database(), tableSource.table()); + LinkedList q = new LinkedList<>(tableSource.path()); + return new V1alpha1Table() + .kind(K8sApiEndpoints.TABLES.kind()) + .apiVersion(K8sApiEndpoints.TABLES.apiVersion()) + .metadata(new V1ObjectMeta() + .name(name)) + .spec(new V1alpha1TableSpec() + .table(q.pollLast()) + .schema(q.pollLast()) + .catalog(q.pollLast()) + .database(tableSource.database()) + .columns(tableSource.columns().stream() + .map(col -> new V1alpha1TableSpecColumn() + .name(col.name()) + .type(col.typeName()) + .nullable(col.nullable())) + .collect(Collectors.toList())) + .options(tableSource.options())); + } +} diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sTableTable.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sTableTable.java new file mode 100644 index 00000000..355acd08 --- /dev/null +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sTableTable.java @@ -0,0 +1,153 @@ +package com.linkedin.hoptimator.k8s; + +import java.util.Objects; + +import io.kubernetes.client.openapi.models.V1ObjectMeta; +import org.apache.calcite.schema.Schema; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.schema.impl.AbstractSchema; + +import com.linkedin.hoptimator.k8s.models.V1alpha1Table; +import com.linkedin.hoptimator.k8s.models.V1alpha1TableList; +import com.linkedin.hoptimator.k8s.models.V1alpha1TableSpec; +import com.linkedin.hoptimator.k8s.models.V1alpha1TableStatus; +import com.linkedin.hoptimator.Validated; +import com.linkedin.hoptimator.Validator; + + +/** System table that lists Table CRDs and registers them in the Calcite schema. */ +class K8sTableTable extends K8sTable + implements Validated { + + // CHECKSTYLE:OFF + public static class Row { + public final String NAME; + public final String CATALOG; + public final String SCHEMA; + public final String TABLE; + public final String DATABASE; + public final Boolean READY; + + public Row(String name, String catalog, String schema, String table, String database, Boolean ready) { + this.NAME = name; + this.CATALOG = catalog; + this.SCHEMA = schema; + this.TABLE = table; + this.DATABASE = database; + this.READY = ready; + } + + /** The schema path components (catalog, schema) leading up to the table name. */ + java.util.List schemaPath() { + java.util.List path = new java.util.ArrayList<>(); + if (CATALOG != null && !CATALOG.isEmpty()) { + path.add(CATALOG); + } + if (SCHEMA != null && !SCHEMA.isEmpty()) { + path.add(SCHEMA); + } + return path; + } + + String tableName() { + return TABLE; + } + + @Override + public String toString() { + return String.join("\t", NAME, CATALOG, SCHEMA, TABLE, DATABASE, + Boolean.toString(READY != null && READY)); + } + } + // CHECKSTYLE:ON + + private final K8sContext context; + + K8sTableTable(K8sContext context) { + super(context, K8sApiEndpoints.TABLES, Row.class); + this.context = context; + } + + /** Loads Table CRs and registers them as K8sStoredTable instances in the schema hierarchy. */ + public void addTables(SchemaPlus parentSchema) { + K8sApi api = new K8sApi<>(context, K8sApiEndpoints.TABLES); + try { + for (V1alpha1Table table : api.list()) { + V1alpha1TableSpec spec = table.getSpec(); + if (spec == null || spec.getTable() == null || spec.getDatabase() == null) { + continue; + } + + // Build schema path, filling in any missing schemas + SchemaPlus schema = parentSchema; + if (spec.getCatalog() != null && !spec.getCatalog().isEmpty()) { + SchemaPlus next = Objects.requireNonNull(schema).subSchemas().get(spec.getCatalog()); + if (next == null) { + schema.add(spec.getCatalog(), new AbstractSchema()); + next = schema.subSchemas().get(spec.getCatalog()); + } + schema = next; + } + if (spec.getSchema() != null && !spec.getSchema().isEmpty()) { + SchemaPlus next = Objects.requireNonNull(schema).subSchemas().get(spec.getSchema()); + if (next == null) { + schema.add(spec.getSchema(), new AbstractSchema()); + next = schema.subSchemas().get(spec.getSchema()); + } + schema = next; + } + + K8sStoredTable storedTable = new K8sStoredTable( + spec.getDatabase(), + spec.getColumns(), + spec.getOptions()); + Objects.requireNonNull(schema).add(spec.getTable(), storedTable); + } + } catch (RuntimeException e) { + // If Tables CRD is not installed, silently skip + } catch (java.sql.SQLException e) { + // If Tables CRD is not installed, silently skip + } + } + + @Override + public Row toRow(V1alpha1Table obj) { + V1alpha1TableSpec spec = Objects.requireNonNull(obj.getSpec()); + V1alpha1TableStatus status = obj.getStatus(); + return new Row( + Objects.requireNonNull(obj.getMetadata()).getName(), + spec.getCatalog(), + spec.getSchema(), + spec.getTable(), + spec.getDatabase(), + status != null ? status.getReady() : null); + } + + @Override + public V1alpha1Table fromRow(Row row) { + K8sUtils.checkK8sName(row.NAME); + return new V1alpha1Table() + .kind(K8sApiEndpoints.TABLES.kind()) + .apiVersion(K8sApiEndpoints.TABLES.apiVersion()) + .metadata(new V1ObjectMeta().name(row.NAME)) + .spec(new V1alpha1TableSpec() + .table(row.TABLE) + .database(row.DATABASE) + .catalog(row.CATALOG) + .schema(row.SCHEMA)); + } + + @Override + public Schema.TableType getJdbcTableType() { + return Schema.TableType.SYSTEM_TABLE; + } + + @Override + public void validate(Validator.Issues issues) { + for (Row row : rows()) { + Validator.Issues issues2 = issues.child(row.toString()); + Validator.validateSubdomainName(row.NAME, issues2.child("NAME")); + } + Validator.validateUnique(rows(), x -> x.NAME, issues); + } +} diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1Table.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1Table.java new file mode 100644 index 00000000..73ad3cff --- /dev/null +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1Table.java @@ -0,0 +1,144 @@ +package com.linkedin.hoptimator.k8s.models; + +import java.util.Objects; + +import com.google.gson.annotations.SerializedName; +import io.kubernetes.client.openapi.models.V1ObjectMeta; + + +/** + * A SQL table with connector configuration. + */ +public class V1alpha1Table implements io.kubernetes.client.common.KubernetesObject { + public static final String SERIALIZED_NAME_API_VERSION = "apiVersion"; + @SerializedName(SERIALIZED_NAME_API_VERSION) + private String apiVersion; + + public static final String SERIALIZED_NAME_KIND = "kind"; + @SerializedName(SERIALIZED_NAME_KIND) + private String kind; + + public static final String SERIALIZED_NAME_METADATA = "metadata"; + @SerializedName(SERIALIZED_NAME_METADATA) + private V1ObjectMeta metadata = null; + + public static final String SERIALIZED_NAME_SPEC = "spec"; + @SerializedName(SERIALIZED_NAME_SPEC) + private V1alpha1TableSpec spec; + + public static final String SERIALIZED_NAME_STATUS = "status"; + @SerializedName(SERIALIZED_NAME_STATUS) + private V1alpha1TableStatus status; + + + public V1alpha1Table apiVersion(String apiVersion) { + this.apiVersion = apiVersion; + return this; + } + + @javax.annotation.Nullable + public String getApiVersion() { + return apiVersion; + } + + public void setApiVersion(String apiVersion) { + this.apiVersion = apiVersion; + } + + public V1alpha1Table kind(String kind) { + this.kind = kind; + return this; + } + + @javax.annotation.Nullable + public String getKind() { + return kind; + } + + public void setKind(String kind) { + this.kind = kind; + } + + public V1alpha1Table metadata(V1ObjectMeta metadata) { + this.metadata = metadata; + return this; + } + + @javax.annotation.Nullable + public V1ObjectMeta getMetadata() { + return metadata; + } + + public void setMetadata(V1ObjectMeta metadata) { + this.metadata = metadata; + } + + public V1alpha1Table spec(V1alpha1TableSpec spec) { + this.spec = spec; + return this; + } + + @javax.annotation.Nullable + public V1alpha1TableSpec getSpec() { + return spec; + } + + public void setSpec(V1alpha1TableSpec spec) { + this.spec = spec; + } + + public V1alpha1Table status(V1alpha1TableStatus status) { + this.status = status; + return this; + } + + @javax.annotation.Nullable + public V1alpha1TableStatus getStatus() { + return status; + } + + public void setStatus(V1alpha1TableStatus status) { + this.status = status; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + V1alpha1Table v1alpha1Table = (V1alpha1Table) o; + return Objects.equals(this.apiVersion, v1alpha1Table.apiVersion) && + Objects.equals(this.kind, v1alpha1Table.kind) && + Objects.equals(this.metadata, v1alpha1Table.metadata) && + Objects.equals(this.spec, v1alpha1Table.spec) && + Objects.equals(this.status, v1alpha1Table.status); + } + + @Override + public int hashCode() { + return Objects.hash(apiVersion, kind, metadata, spec, status); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("class V1alpha1Table {\n"); + sb.append(" apiVersion: ").append(toIndentedString(apiVersion)).append("\n"); + sb.append(" kind: ").append(toIndentedString(kind)).append("\n"); + sb.append(" metadata: ").append(toIndentedString(metadata)).append("\n"); + sb.append(" spec: ").append(toIndentedString(spec)).append("\n"); + sb.append(" status: ").append(toIndentedString(status)).append("\n"); + sb.append("}"); + return sb.toString(); + } + + private String toIndentedString(Object o) { + if (o == null) { + return "null"; + } + return o.toString().replace("\n", "\n "); + } +} diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1TableList.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1TableList.java new file mode 100644 index 00000000..c8221646 --- /dev/null +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1TableList.java @@ -0,0 +1,130 @@ +package com.linkedin.hoptimator.k8s.models; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + +import com.google.gson.annotations.SerializedName; +import io.kubernetes.client.openapi.models.V1ListMeta; + + +/** + * TableList is a list of Table + */ +public class V1alpha1TableList implements io.kubernetes.client.common.KubernetesListObject { + public static final String SERIALIZED_NAME_API_VERSION = "apiVersion"; + @SerializedName(SERIALIZED_NAME_API_VERSION) + private String apiVersion; + + public static final String SERIALIZED_NAME_ITEMS = "items"; + @SerializedName(SERIALIZED_NAME_ITEMS) + private List items = new ArrayList<>(); + + public static final String SERIALIZED_NAME_KIND = "kind"; + @SerializedName(SERIALIZED_NAME_KIND) + private String kind; + + public static final String SERIALIZED_NAME_METADATA = "metadata"; + @SerializedName(SERIALIZED_NAME_METADATA) + private V1ListMeta metadata = null; + + + public V1alpha1TableList apiVersion(String apiVersion) { + this.apiVersion = apiVersion; + return this; + } + + @javax.annotation.Nullable + public String getApiVersion() { + return apiVersion; + } + + public void setApiVersion(String apiVersion) { + this.apiVersion = apiVersion; + } + + public V1alpha1TableList items(List items) { + this.items = items; + return this; + } + + public V1alpha1TableList addItemsItem(V1alpha1Table itemsItem) { + this.items.add(itemsItem); + return this; + } + + public List getItems() { + return items; + } + + public void setItems(List items) { + this.items = items; + } + + public V1alpha1TableList kind(String kind) { + this.kind = kind; + return this; + } + + @javax.annotation.Nullable + public String getKind() { + return kind; + } + + public void setKind(String kind) { + this.kind = kind; + } + + public V1alpha1TableList metadata(V1ListMeta metadata) { + this.metadata = metadata; + return this; + } + + @javax.annotation.Nullable + public V1ListMeta getMetadata() { + return metadata; + } + + public void setMetadata(V1ListMeta metadata) { + this.metadata = metadata; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + V1alpha1TableList v1alpha1TableList = (V1alpha1TableList) o; + return Objects.equals(this.apiVersion, v1alpha1TableList.apiVersion) && + Objects.equals(this.items, v1alpha1TableList.items) && + Objects.equals(this.kind, v1alpha1TableList.kind) && + Objects.equals(this.metadata, v1alpha1TableList.metadata); + } + + @Override + public int hashCode() { + return Objects.hash(apiVersion, items, kind, metadata); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("class V1alpha1TableList {\n"); + sb.append(" apiVersion: ").append(toIndentedString(apiVersion)).append("\n"); + sb.append(" items: ").append(toIndentedString(items)).append("\n"); + sb.append(" kind: ").append(toIndentedString(kind)).append("\n"); + sb.append(" metadata: ").append(toIndentedString(metadata)).append("\n"); + sb.append("}"); + return sb.toString(); + } + + private String toIndentedString(Object o) { + if (o == null) { + return "null"; + } + return o.toString().replace("\n", "\n "); + } +} diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1TableSpec.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1TableSpec.java new file mode 100644 index 00000000..cb41ca13 --- /dev/null +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1TableSpec.java @@ -0,0 +1,163 @@ +package com.linkedin.hoptimator.k8s.models; + +import java.util.List; +import java.util.Map; +import java.util.Objects; + +import com.google.gson.annotations.SerializedName; + + +/** + * Table spec. + */ +public class V1alpha1TableSpec { + public static final String SERIALIZED_NAME_CATALOG = "catalog"; + @SerializedName(SERIALIZED_NAME_CATALOG) + private String catalog; + + public static final String SERIALIZED_NAME_SCHEMA = "schema"; + @SerializedName(SERIALIZED_NAME_SCHEMA) + private String schema; + + public static final String SERIALIZED_NAME_TABLE = "table"; + @SerializedName(SERIALIZED_NAME_TABLE) + private String table; + + public static final String SERIALIZED_NAME_DATABASE = "database"; + @SerializedName(SERIALIZED_NAME_DATABASE) + private String database; + + public static final String SERIALIZED_NAME_COLUMNS = "columns"; + @SerializedName(SERIALIZED_NAME_COLUMNS) + private List columns; + + public static final String SERIALIZED_NAME_OPTIONS = "options"; + @SerializedName(SERIALIZED_NAME_OPTIONS) + private Map options; + + + public V1alpha1TableSpec catalog(String catalog) { + this.catalog = catalog; + return this; + } + + @javax.annotation.Nullable + public String getCatalog() { + return catalog; + } + + public void setCatalog(String catalog) { + this.catalog = catalog; + } + + public V1alpha1TableSpec schema(String schema) { + this.schema = schema; + return this; + } + + @javax.annotation.Nullable + public String getSchema() { + return schema; + } + + public void setSchema(String schema) { + this.schema = schema; + } + + public V1alpha1TableSpec table(String table) { + this.table = table; + return this; + } + + public String getTable() { + return table; + } + + public void setTable(String table) { + this.table = table; + } + + public V1alpha1TableSpec database(String database) { + this.database = database; + return this; + } + + public String getDatabase() { + return database; + } + + public void setDatabase(String database) { + this.database = database; + } + + public V1alpha1TableSpec columns(List columns) { + this.columns = columns; + return this; + } + + @javax.annotation.Nullable + public List getColumns() { + return columns; + } + + public void setColumns(List columns) { + this.columns = columns; + } + + public V1alpha1TableSpec options(Map options) { + this.options = options; + return this; + } + + @javax.annotation.Nullable + public Map getOptions() { + return options; + } + + public void setOptions(Map options) { + this.options = options; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + V1alpha1TableSpec v1alpha1TableSpec = (V1alpha1TableSpec) o; + return Objects.equals(this.catalog, v1alpha1TableSpec.catalog) && + Objects.equals(this.schema, v1alpha1TableSpec.schema) && + Objects.equals(this.table, v1alpha1TableSpec.table) && + Objects.equals(this.database, v1alpha1TableSpec.database) && + Objects.equals(this.columns, v1alpha1TableSpec.columns) && + Objects.equals(this.options, v1alpha1TableSpec.options); + } + + @Override + public int hashCode() { + return Objects.hash(catalog, schema, table, database, columns, options); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("class V1alpha1TableSpec {\n"); + sb.append(" catalog: ").append(toIndentedString(catalog)).append("\n"); + sb.append(" schema: ").append(toIndentedString(schema)).append("\n"); + sb.append(" table: ").append(toIndentedString(table)).append("\n"); + sb.append(" database: ").append(toIndentedString(database)).append("\n"); + sb.append(" columns: ").append(toIndentedString(columns)).append("\n"); + sb.append(" options: ").append(toIndentedString(options)).append("\n"); + sb.append("}"); + return sb.toString(); + } + + private String toIndentedString(Object o) { + if (o == null) { + return "null"; + } + return o.toString().replace("\n", "\n "); + } +} diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1TableSpecColumn.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1TableSpecColumn.java new file mode 100644 index 00000000..aa4ec770 --- /dev/null +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1TableSpecColumn.java @@ -0,0 +1,101 @@ +package com.linkedin.hoptimator.k8s.models; + +import java.util.Objects; + +import com.google.gson.annotations.SerializedName; + + +/** + * A column definition within a Table spec. + */ +public class V1alpha1TableSpecColumn { + public static final String SERIALIZED_NAME_NAME = "name"; + @SerializedName(SERIALIZED_NAME_NAME) + private String name; + + public static final String SERIALIZED_NAME_TYPE = "type"; + @SerializedName(SERIALIZED_NAME_TYPE) + private String type; + + public static final String SERIALIZED_NAME_NULLABLE = "nullable"; + @SerializedName(SERIALIZED_NAME_NULLABLE) + private Boolean nullable; + + + public V1alpha1TableSpecColumn name(String name) { + this.name = name; + return this; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public V1alpha1TableSpecColumn type(String type) { + this.type = type; + return this; + } + + public String getType() { + return type; + } + + public void setType(String type) { + this.type = type; + } + + public V1alpha1TableSpecColumn nullable(Boolean nullable) { + this.nullable = nullable; + return this; + } + + @javax.annotation.Nullable + public Boolean getNullable() { + return nullable; + } + + public void setNullable(Boolean nullable) { + this.nullable = nullable; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + V1alpha1TableSpecColumn that = (V1alpha1TableSpecColumn) o; + return Objects.equals(this.name, that.name) && + Objects.equals(this.type, that.type) && + Objects.equals(this.nullable, that.nullable); + } + + @Override + public int hashCode() { + return Objects.hash(name, type, nullable); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("class V1alpha1TableSpecColumn {\n"); + sb.append(" name: ").append(toIndentedString(name)).append("\n"); + sb.append(" type: ").append(toIndentedString(type)).append("\n"); + sb.append(" nullable: ").append(toIndentedString(nullable)).append("\n"); + sb.append("}"); + return sb.toString(); + } + + private String toIndentedString(Object o) { + if (o == null) { + return "null"; + } + return o.toString().replace("\n", "\n "); + } +} diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1TableStatus.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1TableStatus.java new file mode 100644 index 00000000..0d240fee --- /dev/null +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1TableStatus.java @@ -0,0 +1,83 @@ +package com.linkedin.hoptimator.k8s.models; + +import java.util.Objects; + +import com.google.gson.annotations.SerializedName; + + +/** + * V1alpha1TableStatus + */ +public class V1alpha1TableStatus { + public static final String SERIALIZED_NAME_READY = "ready"; + @SerializedName(SERIALIZED_NAME_READY) + private Boolean ready; + + public static final String SERIALIZED_NAME_MESSAGE = "message"; + @SerializedName(SERIALIZED_NAME_MESSAGE) + private String message; + + + public V1alpha1TableStatus ready(Boolean ready) { + this.ready = ready; + return this; + } + + @javax.annotation.Nullable + public Boolean getReady() { + return ready; + } + + public void setReady(Boolean ready) { + this.ready = ready; + } + + public V1alpha1TableStatus message(String message) { + this.message = message; + return this; + } + + @javax.annotation.Nullable + public String getMessage() { + return message; + } + + public void setMessage(String message) { + this.message = message; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + V1alpha1TableStatus that = (V1alpha1TableStatus) o; + return Objects.equals(this.ready, that.ready) && + Objects.equals(this.message, that.message); + } + + @Override + public int hashCode() { + return Objects.hash(ready, message); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("class V1alpha1TableStatus {\n"); + sb.append(" ready: ").append(toIndentedString(ready)).append("\n"); + sb.append(" message: ").append(toIndentedString(message)).append("\n"); + sb.append("}"); + return sb.toString(); + } + + private String toIndentedString(Object o) { + if (o == null) { + return "null"; + } + return o.toString().replace("\n", "\n "); + } +} diff --git a/hoptimator-k8s/src/main/resources/tables.crd.yaml b/hoptimator-k8s/src/main/resources/tables.crd.yaml new file mode 100644 index 00000000..bf959b1a --- /dev/null +++ b/hoptimator-k8s/src/main/resources/tables.crd.yaml @@ -0,0 +1,103 @@ +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + name: tables.hoptimator.linkedin.com +spec: + group: hoptimator.linkedin.com + names: + kind: Table + listKind: TableList + plural: tables + singular: table + shortNames: + preserveUnknownFields: false + scope: Namespaced + versions: + - name: v1alpha1 + served: true + storage: true + schema: + openAPIV3Schema: + description: A SQL table with connector configuration. + type: object + properties: + apiVersion: + type: string + kind: + type: string + metadata: + type: object + spec: + description: Table spec. + type: object + properties: + catalog: + description: Catalog name. + type: string + schema: + description: Schema name. + type: string + table: + description: Table name. + type: string + database: + description: Internal database name. + type: string + columns: + description: Column definitions. + type: array + items: + type: object + properties: + name: + description: Column name. + type: string + type: + description: Column type. + type: string + nullable: + description: Whether the column is nullable. + type: boolean + required: + - name + - type + options: + description: Connector options. + type: object + additionalProperties: + type: string + required: + - table + - database + status: + type: object + properties: + ready: + description: Whether the table is ready. + type: boolean + message: + description: Status message. + type: string + subresources: + status: {} + additionalPrinterColumns: + - name: CATALOG + type: string + description: Catalog name. + jsonPath: .spec.catalog + - name: SCHEMA + type: string + description: Schema name. + jsonPath: .spec.schema + - name: TABLE + type: string + description: Table name. + jsonPath: .spec.table + - name: DATABASE + type: string + description: Database name. + jsonPath: .spec.database + - name: READY + type: boolean + description: Whether the table is ready. + jsonPath: .status.ready diff --git a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/PipelineRules.java b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/PipelineRules.java index f49cd33c..c5986f1a 100644 --- a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/PipelineRules.java +++ b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/PipelineRules.java @@ -5,6 +5,7 @@ import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Set; @@ -12,6 +13,7 @@ import org.apache.calcite.plan.Convention; import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; import org.apache.calcite.plan.RelOptTable; import org.apache.calcite.plan.RelTraitSet; import org.apache.calcite.prepare.Prepare; @@ -30,6 +32,7 @@ import org.apache.calcite.rel.core.Uncollect; import org.apache.calcite.rel.hint.RelHint; import org.apache.calcite.rel.logical.LogicalAggregate; + import org.apache.calcite.rel.logical.LogicalCalc; import org.apache.calcite.rel.logical.LogicalFilter; import org.apache.calcite.rel.logical.LogicalJoin; @@ -46,6 +49,7 @@ import com.google.common.collect.ImmutableSet; +import com.linkedin.hoptimator.ConnectorConfigurable; import com.linkedin.hoptimator.util.DataTypeUtils; @@ -56,7 +60,8 @@ private PipelineRules() { public static Collection rules() { return Arrays.asList(PipelineFilterRule.INSTANCE, PipelineProjectRule.INSTANCE, PipelineJoinRule.INSTANCE, - PipelineCalcRule.INSTANCE, PipelineAggregateRule.INSTANCE, PipelineUncollectRule.INSTANCE); + PipelineCalcRule.INSTANCE, PipelineAggregateRule.INSTANCE, PipelineUncollectRule.INSTANCE, + StoredTableScanRule.INSTANCE); } public static class PipelineTableScanRule extends ConverterRule { @@ -85,17 +90,60 @@ public RelNode convert(RelNode rel) { static class PipelineTableScan extends TableScan implements PipelineRel { private final String database; + private final Map options; + + PipelineTableScan(RelOptCluster cluster, RelTraitSet traitSet, List hints, String database, + RelOptTable table) { + this(cluster, traitSet, hints, database, table, Collections.emptyMap()); + } - PipelineTableScan(RelOptCluster cluster, RelTraitSet traitSet, List hints, String database, RelOptTable table) { + PipelineTableScan(RelOptCluster cluster, RelTraitSet traitSet, List hints, String database, + RelOptTable table, Map options) { super(cluster, traitSet, hints, table); assert getConvention() == PipelineRel.CONVENTION; this.database = database; + this.options = options; } @Override public void implement(Implementor implementor) throws SQLException { - implementor.addSource(database, table.getQualifiedName(), table.getRowType(), - Collections.emptyMap()); // TODO pass in table scan hints + implementor.addSource(database, table.getQualifiedName(), table.getRowType(), options); + } + } + + /** Converts a TableScan on a ConnectorConfigurable table to PipelineTableScan. */ + public static final class StoredTableScanRule extends RelOptRule { + public static final StoredTableScanRule INSTANCE = new StoredTableScanRule(); + + private StoredTableScanRule() { + super(operand(TableScan.class, any()), "StoredTableScanRule"); + } + + @Override + public boolean matches(RelOptRuleCall call) { + TableScan scan = call.rel(0); + RelOptTable table = scan.getTable(); + if (table == null) { + return false; + } + return table.unwrap(ConnectorConfigurable.class) != null; + } + + @Override + public void onMatch(RelOptRuleCall call) { + TableScan scan = call.rel(0); + RelOptTable table = scan.getTable(); + if (table == null) { + return; + } + ConnectorConfigurable cc = table.unwrap(ConnectorConfigurable.class); + if (cc == null) { + return; + } + RelTraitSet traits = scan.getTraitSet().replace(PipelineRel.CONVENTION); + call.transformTo(new PipelineTableScan( + scan.getCluster(), traits, scan.getHints(), + cc.databaseName(), table, cc.connectorOptions())); } } From 5cb1d704b34d3d84601e01f2f23e03e36cfe0a45 Mon Sep 17 00:00:00 2001 From: Ryanne Dolan Date: Wed, 25 Feb 2026 08:27:52 -0600 Subject: [PATCH 2/2] Make database field optional for ad-hoc tables Tables created without a physical database backing (e.g. `CREATE TABLE foo (...)`) now have a null database rather than incorrectly using the schema name. This better represents that the table is a pure K8s table. - DDL executor sets database=null when schema is not a Database - Table CRD no longer requires the database field - K8sTableTable loads tables without a database - K8sConnector falls back to schema name for template name variable - VeniceDeployerProvider is now null-safe on source.database() Co-Authored-By: Claude Opus 4.6 (1M context) --- .../com/linkedin/hoptimator/jdbc/HoptimatorDdlExecutor.java | 2 +- .../main/java/com/linkedin/hoptimator/k8s/K8sConnector.java | 4 +++- .../main/java/com/linkedin/hoptimator/k8s/K8sTableTable.java | 2 +- hoptimator-k8s/src/main/resources/tables.crd.yaml | 1 - .../linkedin/hoptimator/venice/VeniceDeployerProvider.java | 2 +- 5 files changed, 6 insertions(+), 5 deletions(-) diff --git a/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/HoptimatorDdlExecutor.java b/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/HoptimatorDdlExecutor.java index 0e3bc58f..fcaf5ff0 100644 --- a/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/HoptimatorDdlExecutor.java +++ b/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/HoptimatorDdlExecutor.java @@ -407,7 +407,7 @@ public void execute(SqlCreateTable create, CalcitePrepare.Context context) { if (pair.left.schema instanceof Database) { database = ((Database) pair.left.schema).databaseName(); } else { - database = connection.getSchema(); + database = null; } final JavaTypeFactory typeFactory = context.getTypeFactory(); diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sConnector.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sConnector.java index d63d9c6a..cc2cd6d3 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sConnector.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sConnector.java @@ -50,7 +50,9 @@ public Map configure() throws SQLException { Template.Environment env = new Template.SimpleEnvironment() - .with("name", source.database() + "-" + source.table().toLowerCase(Locale.ROOT)) + .with("name", K8sUtils.canonicalizeName( + source.database() != null ? source.database() : source.schema(), + source.table())) .with("database", source.database()) .with("table", source.table()) .with(options); diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sTableTable.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sTableTable.java index 355acd08..af0586d3 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sTableTable.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sTableTable.java @@ -74,7 +74,7 @@ public void addTables(SchemaPlus parentSchema) { try { for (V1alpha1Table table : api.list()) { V1alpha1TableSpec spec = table.getSpec(); - if (spec == null || spec.getTable() == null || spec.getDatabase() == null) { + if (spec == null || spec.getTable() == null) { continue; } diff --git a/hoptimator-k8s/src/main/resources/tables.crd.yaml b/hoptimator-k8s/src/main/resources/tables.crd.yaml index bf959b1a..df911a20 100644 --- a/hoptimator-k8s/src/main/resources/tables.crd.yaml +++ b/hoptimator-k8s/src/main/resources/tables.crd.yaml @@ -68,7 +68,6 @@ spec: type: string required: - table - - database status: type: object properties: diff --git a/hoptimator-venice/src/main/java/com/linkedin/hoptimator/venice/VeniceDeployerProvider.java b/hoptimator-venice/src/main/java/com/linkedin/hoptimator/venice/VeniceDeployerProvider.java index ae2be36c..4246ddc2 100644 --- a/hoptimator-venice/src/main/java/com/linkedin/hoptimator/venice/VeniceDeployerProvider.java +++ b/hoptimator-venice/src/main/java/com/linkedin/hoptimator/venice/VeniceDeployerProvider.java @@ -18,7 +18,7 @@ public Collection deployers(T obj, Connection c List list = new ArrayList<>(); if (obj instanceof Source) { Source source = (Source) obj; - if (source.database().equalsIgnoreCase(VeniceDriver.CATALOG_NAME)) { + if (VeniceDriver.CATALOG_NAME.equalsIgnoreCase(source.database())) { list.add(new VeniceDeployer(source, (HoptimatorConnection) connection)); } }