Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.linkedin.hoptimator;

import java.util.Map;

/**
* Interface for tables that carry connector configuration.
* This lets planner rules detect K8s-stored tables without
* depending on hoptimator-k8s.
*/
public interface ConnectorConfigurable {

/** Returns the connector options (WITH clause values). */
Map<String, String> connectorOptions();

/** Returns the internal database name for this table. */
String databaseName();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package com.linkedin.hoptimator;

import java.util.Collections;
import java.util.List;
import java.util.Map;

/**
* A Source that carries column definitions alongside options.
* Used for CREATE TABLE ... WITH (...) to store ad-hoc table definitions in K8s.
*/
public class TableSource extends Source {

private final List<ColumnDefinition> columns;

public TableSource(String database, List<String> path, Map<String, String> options,
List<ColumnDefinition> columns) {
super(database, path, options);
this.columns = columns != null ? Collections.unmodifiableList(columns) : Collections.emptyList();
}

public List<ColumnDefinition> columns() {
return columns;
}

/**
* Represents a column definition from a CREATE TABLE statement.
*/
public static class ColumnDefinition {
private final String name;
private final String typeName;
private final boolean nullable;

public ColumnDefinition(String name, String typeName, boolean nullable) {
this.name = name;
this.typeName = typeName;
this.nullable = nullable;
}

public String name() {
return name;
}

public String typeName() {
return typeName;
}

public boolean nullable() {
return nullable;
}

@Override
public String toString() {
return name + " " + typeName + (nullable ? "" : " NOT NULL");
}
}

@Override
public String toString() {
return "TableSource[" + pathString() + "]";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,11 @@ public void execute(String line, DispatchCallback dispatchCallback) {
schemaSnapshot = HoptimatorDdlUtils.snapshotAndSetSinkSchema(conn.createPrepareContext(),
new HoptimatorDriver.Prepare(conn), plan, create, querySql);
}
sqlline.output(plan.sql(conn).apply(SqlDialect.ANSI));
if (create != null) {
sqlline.output(plan.sql(conn).apply(SqlDialect.ANSI));
} else {
sqlline.output(plan.query(conn).apply(SqlDialect.ANSI));
}
} catch (SQLException e) {
sqlline.error(e);
dispatchCallback.setToFailure();
Expand Down
1 change: 1 addition & 0 deletions hoptimator-jdbc/src/main/codegen/config.fmpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ data: {
"org.apache.calcite.sql.ddl.SqlDdlNodes"
"com.linkedin.hoptimator.jdbc.ddl.SqlCreateFunction"
"com.linkedin.hoptimator.jdbc.ddl.SqlCreateMaterializedView"
"com.linkedin.hoptimator.jdbc.ddl.SqlCreateTable"
"com.linkedin.hoptimator.jdbc.ddl.SqlCreateTrigger"
"com.linkedin.hoptimator.jdbc.ddl.SqlFire"
"com.linkedin.hoptimator.jdbc.ddl.SqlFireMaterializedView"
Expand Down
4 changes: 3 additions & 1 deletion hoptimator-jdbc/src/main/codegen/includes/parserImpls.ftl
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ SqlCreate SqlCreateTable(Span s, boolean replace) :
final boolean ifNotExists;
final SqlIdentifier id;
SqlNodeList tableElementList = null;
SqlNodeList optionList = null;
SqlNode query = null;

SqlCreate createTableLike = null;
Expand All @@ -204,9 +205,10 @@ SqlCreate SqlCreateTable(Span s, boolean replace) :
}
|
[ tableElementList = TableElementList() ]
[ optionList = Options() ]
[ <AS> query = OrderedQueryOrExpr(ExprContext.ACCEPT_QUERY) ]
{
return SqlDdlNodes.createTable(s.end(this), replace, ifNotExists, id, tableElementList, query);
return new SqlCreateTable(s.end(this), replace, ifNotExists, id, tableElementList, optionList, query);
}
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.linkedin.hoptimator.MaterializedView;
import com.linkedin.hoptimator.Pipeline;
import com.linkedin.hoptimator.Source;
import com.linkedin.hoptimator.TableSource;
import com.linkedin.hoptimator.Trigger;
import com.linkedin.hoptimator.UserJob;
import com.linkedin.hoptimator.View;
Expand Down Expand Up @@ -70,7 +71,7 @@
import org.apache.calcite.sql.SqlLiteral;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.ddl.SqlColumnDeclaration;
import org.apache.calcite.sql.ddl.SqlCreateTable;
import com.linkedin.hoptimator.jdbc.ddl.SqlCreateTable;
import org.apache.calcite.sql.ddl.SqlCreateView;
import org.apache.calcite.sql.ddl.SqlDropMaterializedView;
import org.apache.calcite.sql.ddl.SqlDropObject;
Expand Down Expand Up @@ -406,7 +407,7 @@ public void execute(SqlCreateTable create, CalcitePrepare.Context context) {
if (pair.left.schema instanceof Database) {
database = ((Database) pair.left.schema).databaseName();
} else {
database = connection.getSchema();
database = null;
}

final JavaTypeFactory typeFactory = context.getTypeFactory();
Expand Down Expand Up @@ -461,7 +462,20 @@ public void execute(SqlCreateTable create, CalcitePrepare.Context context) {
List<String> tablePath = new ArrayList<>(schemaPath);
tablePath.add(tableName);

Source source = new Source(database, tablePath, Collections.emptyMap());
Map<String, String> options = HoptimatorDdlUtils.options(create.options);
Source source;
if (!options.isEmpty()) {
List<TableSource.ColumnDefinition> columnDefs = new ArrayList<>();
for (org.apache.calcite.rel.type.RelDataTypeField field : rowType.getFieldList()) {
columnDefs.add(new TableSource.ColumnDefinition(
field.getName(),
field.getType().getSqlTypeName().getName(),
field.getType().isNullable()));
}
source = new TableSource(database, tablePath, options, columnDefs);
} else {
source = new Source(database, tablePath, Collections.emptyMap());
}
logger.info("Validating new table {}", source);
ValidationService.validateOrThrow(source);
deployers = DeploymentService.deployers(source, connection);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.apache.calcite.sql.ddl.SqlDdlNodes;
import com.linkedin.hoptimator.jdbc.ddl.SqlCreateFunction;
import com.linkedin.hoptimator.jdbc.ddl.SqlCreateMaterializedView;
import com.linkedin.hoptimator.jdbc.ddl.SqlCreateTable;
import com.linkedin.hoptimator.jdbc.ddl.SqlCreateTrigger;
import com.linkedin.hoptimator.jdbc.ddl.SqlFire;
import com.linkedin.hoptimator.jdbc.ddl.SqlFireMaterializedView;
Expand Down Expand Up @@ -6126,6 +6127,7 @@ final public SqlCreate SqlCreateTable(Span s, boolean replace) throws ParseExcep
final boolean ifNotExists;
final SqlIdentifier id;
SqlNodeList tableElementList = null;
SqlNodeList optionList = null;
SqlNode query = null;

SqlCreate createTableLike = null;
Expand All @@ -6149,6 +6151,13 @@ final public SqlCreate SqlCreateTable(Span s, boolean replace) throws ParseExcep
;
}
switch ((jj_ntk==-1)?jj_ntk():jj_ntk) {
case WITH:
optionList = Options();
break;
default:
;
}
switch ((jj_ntk==-1)?jj_ntk():jj_ntk) {
case AS:
jj_consume_token(AS);
query = OrderedQueryOrExpr(ExprContext.ACCEPT_QUERY);
Expand All @@ -6157,7 +6166,7 @@ final public SqlCreate SqlCreateTable(Span s, boolean replace) throws ParseExcep
jj_la1[45] = jj_gen;
;
}
{if (true) return SqlDdlNodes.createTable(s.end(this), replace, ifNotExists, id, tableElementList, query);}
{if (true) return new SqlCreateTable(s.end(this), replace, ifNotExists, id, tableElementList, optionList, query);}
}
throw new Error("Missing return statement in function");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.linkedin.hoptimator.jdbc.ddl;

import org.apache.calcite.sql.SqlCreate;
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.SqlSpecialOperator;
import org.apache.calcite.sql.SqlWriter;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.calcite.util.ImmutableNullableList;

import org.checkerframework.checker.nullness.qual.Nullable;

import java.util.List;

import static java.util.Objects.requireNonNull;

/**
* Parse tree for {@code CREATE TABLE} statement with optional {@code WITH} options.
*/
public class SqlCreateTable extends SqlCreate {
public final SqlIdentifier name;
public final @Nullable SqlNodeList columnList;
public final @Nullable SqlNodeList options;
public final @Nullable SqlNode query;
public final boolean ifNotExists;

private static final SqlOperator OPERATOR =
new SqlSpecialOperator("CREATE TABLE", SqlKind.CREATE_TABLE);

public SqlCreateTable(SqlParserPos pos, boolean replace,
boolean ifNotExists, SqlIdentifier name, @Nullable SqlNodeList columnList,
@Nullable SqlNodeList options, @Nullable SqlNode query) {
super(OPERATOR, pos, replace, ifNotExists);
this.name = requireNonNull(name, "name");
this.columnList = columnList;
this.options = options;
this.query = query;
this.ifNotExists = ifNotExists;
}

@SuppressWarnings("nullness")
@Override public List<SqlNode> getOperandList() {
return ImmutableNullableList.of(name, columnList, options, query);
}

@Override public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
writer.keyword("CREATE");
writer.keyword("TABLE");
if (ifNotExists) {
writer.keyword("IF NOT EXISTS");
}
name.unparse(writer, leftPrec, rightPrec);
if (columnList != null) {
SqlWriter.Frame frame = writer.startList("(", ")");
for (SqlNode c : columnList) {
writer.sep(",");
c.unparse(writer, 0, 0);
}
writer.endList(frame);
}
if (options != null) {
writer.keyword("WITH");
SqlWriter.Frame frame = writer.startList("(", ")");
for (SqlNode c : options) {
writer.sep(",");
c.unparse(writer, 0, 0);
}
writer.endList(frame);
}
if (query != null) {
writer.keyword("AS");
query.unparse(writer, 0, 0);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@ public void createView() throws Exception {
Assertions.assertEquals(expectedLogs, logs);
}

@Test
public void createTableWithOptions() throws Exception {
sql("CREATE TABLE T (X VARCHAR, Y VARCHAR) WITH (connector 'kafka', topic 'test')");
sql("DROP TABLE T");
}

@Test
public void dropNonExistentViewHandlesNullSchema() throws Exception {
// Should not throw when using IF EXISTS on a non-existent view
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import com.linkedin.hoptimator.k8s.models.V1alpha1JobTemplateList;
import com.linkedin.hoptimator.k8s.models.V1alpha1Pipeline;
import com.linkedin.hoptimator.k8s.models.V1alpha1PipelineList;
import com.linkedin.hoptimator.k8s.models.V1alpha1Table;
import com.linkedin.hoptimator.k8s.models.V1alpha1TableList;
import com.linkedin.hoptimator.k8s.models.V1alpha1TableTemplate;
import com.linkedin.hoptimator.k8s.models.V1alpha1TableTemplateList;
import com.linkedin.hoptimator.k8s.models.V1alpha1TableTrigger;
Expand Down Expand Up @@ -52,6 +54,9 @@ public final class K8sApiEndpoints {
public static final K8sApiEndpoint<V1alpha1View, V1alpha1ViewList> VIEWS =
new K8sApiEndpoint<>("View", "hoptimator.linkedin.com", "v1alpha1", "views", false, V1alpha1View.class,
V1alpha1ViewList.class);
public static final K8sApiEndpoint<V1alpha1Table, V1alpha1TableList> TABLES =
new K8sApiEndpoint<>("Table", "hoptimator.linkedin.com", "v1alpha1", "tables", false,
V1alpha1Table.class, V1alpha1TableList.class);
public static final K8sApiEndpoint<V1alpha1TableTemplate, V1alpha1TableTemplateList> TABLE_TEMPLATES =
new K8sApiEndpoint<>("TableTemplate", "hoptimator.linkedin.com", "v1alpha1", "tabletemplates", false,
V1alpha1TableTemplate.class, V1alpha1TableTemplateList.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public void register(Wrapper wrapper) throws SQLException {
schemaPlus.add("k8s", metadata);
metadata.databaseTable().addDatabases(schemaPlus, conn);
metadata.viewTable().addViews(schemaPlus);
metadata.tableTable().addTables(schemaPlus);

// TODO: Explore adding back materializations, but they should not cause backend calls, we may need to
// introduce schemas into the view object itself such that all tables that make up a view do not need to be
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ public Map<String, String> configure() throws SQLException {

Template.Environment env =
new Template.SimpleEnvironment()
.with("name", source.database() + "-" + source.table().toLowerCase(Locale.ROOT))
.with("name", K8sUtils.canonicalizeName(
source.database() != null ? source.database() : source.schema(),
source.table()))
.with("database", source.database())
.with("table", source.table())
.with(options);
Expand Down Expand Up @@ -91,6 +93,10 @@ public Map<String, String> configure() throws SQLException {
Map<String, String> map = new LinkedHashMap<>();
props.stringPropertyNames().stream().sorted().forEach(k ->
map.put(k, props.getProperty(k)));
// Source options override template-derived config (WITH wins)
if (!source.options().isEmpty()) {
map.putAll(source.options());
}
return map;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.linkedin.hoptimator.Job;
import com.linkedin.hoptimator.MaterializedView;
import com.linkedin.hoptimator.Source;
import com.linkedin.hoptimator.TableSource;
import com.linkedin.hoptimator.Trigger;
import com.linkedin.hoptimator.View;

Expand All @@ -29,6 +30,9 @@ public <T extends Deployable> Collection<Deployer> deployers(T obj, Connection c
list.add(new K8sJobDeployer((Job) obj, context));
} else if (obj instanceof Source) {
list.add(new K8sSourceDeployer((Source) obj, context));
if (obj instanceof TableSource) {
list.add(new K8sTableDeployer((TableSource) obj, context));
}
} else if (obj instanceof Trigger) {
list.add(new K8sTriggerDeployer((Trigger) obj, context));
}
Expand Down
Loading
Loading