Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions presto-clp/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,6 @@
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ public class ClpConfig
private long metadataRefreshInterval = 60;
private long metadataExpireInterval = 600;

private String splitFilterConfig;
private SplitFilterProviderType splitFilterProviderType = SplitFilterProviderType.MYSQL;
private String splitMetadataConfigPath;
private SplitProviderType splitProviderType = SplitProviderType.MYSQL;

public boolean isPolymorphicTypeEnabled()
Expand Down Expand Up @@ -151,27 +150,15 @@ public ClpConfig setMetadataExpireInterval(long metadataExpireInterval)
return this;
}

public String getSplitFilterConfig()
public String getSplitMetadataConfigPath()
{
return splitFilterConfig;
return splitMetadataConfigPath;
}

@Config("clp.split-filter-config")
public ClpConfig setSplitFilterConfig(String splitFilterConfig)
@Config("clp.split-metadata-config-path")
public ClpConfig setSplitMetadataConfigPath(String splitMetadataConfigPath)
{
this.splitFilterConfig = splitFilterConfig;
return this;
}

public SplitFilterProviderType getSplitFilterProviderType()
{
return splitFilterProviderType;
}

@Config("clp.split-filter-provider-type")
public ClpConfig setSplitFilterProviderType(SplitFilterProviderType splitFilterProviderType)
{
this.splitFilterProviderType = splitFilterProviderType;
this.splitMetadataConfigPath = splitMetadataConfigPath;
return this;
}

Expand All @@ -192,11 +179,6 @@ public enum MetadataProviderType
MYSQL
}

public enum SplitFilterProviderType
{
MYSQL
}

public enum SplitProviderType
{
MYSQL
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import com.facebook.airlift.bootstrap.LifeCycleManager;
import com.facebook.airlift.log.Logger;
import com.facebook.presto.plugin.clp.optimization.ClpPlanOptimizerProvider;
import com.facebook.presto.plugin.clp.split.filter.ClpSplitFilterProvider;
import com.facebook.presto.plugin.clp.split.ClpSplitMetadataConfig;
import com.facebook.presto.spi.connector.Connector;
import com.facebook.presto.spi.connector.ConnectorMetadata;
import com.facebook.presto.spi.connector.ConnectorPlanOptimizerProvider;
Expand All @@ -42,7 +42,7 @@ public class ClpConnector
private final ClpSplitManager splitManager;
private final FunctionMetadataManager functionManager;
private final StandardFunctionResolution functionResolution;
private final ClpSplitFilterProvider splitFilterProvider;
private final ClpSplitMetadataConfig clpSplitMetadataConfig;

@Inject
public ClpConnector(
Expand All @@ -52,21 +52,21 @@ public ClpConnector(
ClpSplitManager splitManager,
FunctionMetadataManager functionManager,
StandardFunctionResolution functionResolution,
ClpSplitFilterProvider splitFilterProvider)
ClpSplitMetadataConfig clpSplitMetadataConfig)
{
this.lifeCycleManager = requireNonNull(lifeCycleManager, "lifeCycleManager is null");
this.metadata = requireNonNull(metadata, "metadata is null");
this.recordSetProvider = requireNonNull(recordSetProvider, "recordSetProvider is null");
this.splitManager = requireNonNull(splitManager, "splitManager is null");
this.functionManager = requireNonNull(functionManager, "functionManager is null");
this.functionResolution = requireNonNull(functionResolution, "functionResolution is null");
this.splitFilterProvider = requireNonNull(splitFilterProvider, "splitFilterProvider is null");
this.clpSplitMetadataConfig = requireNonNull(clpSplitMetadataConfig);
}

@Override
public ConnectorPlanOptimizerProvider getConnectorPlanOptimizerProvider()
{
return new ClpPlanOptimizerProvider(functionManager, functionResolution, splitFilterProvider);
return new ClpPlanOptimizerProvider(functionManager, functionResolution, clpSplitMetadataConfig);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public Connector create(String catalogName, Map<String, String> config, Connecto
try {
Bootstrap app = new Bootstrap(new JsonModule(), new ClpModule(), binder -> {
binder.bind(FunctionMetadataManager.class).toInstance(context.getFunctionMetadataManager());
binder.bind(TypeManager.class).toInstance(context.getTypeManager());
binder.bind(NodeManager.class).toInstance(context.getNodeManager());
binder.bind(StandardFunctionResolution.class).toInstance(context.getStandardFunctionResolution());
binder.bind(TypeManager.class).toInstance(context.getTypeManager());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,8 @@ public enum ClpErrorCode
CLP_UNSUPPORTED_TYPE(3, EXTERNAL),
CLP_UNSUPPORTED_CONFIG_OPTION(4, EXTERNAL),

CLP_SPLIT_FILTER_CONFIG_NOT_FOUND(10, USER_ERROR),
CLP_MANDATORY_SPLIT_FILTER_NOT_VALID(11, USER_ERROR),
CLP_UNSUPPORTED_SPLIT_FILTER_SOURCE(12, EXTERNAL);
CLP_SPLIT_METADATA_CONFIG_NOT_FOUND(10, USER_ERROR),
CLP_MANDATORY_COLUMN_NOT_IN_FILTER(11, USER_ERROR);

private final ErrorCode errorCode;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,16 @@
import com.facebook.presto.plugin.clp.metadata.ClpMetadataProvider;
import com.facebook.presto.plugin.clp.metadata.ClpMySqlMetadataProvider;
import com.facebook.presto.plugin.clp.split.ClpMySqlSplitProvider;
import com.facebook.presto.plugin.clp.split.ClpSplitMetadataConfig;
import com.facebook.presto.plugin.clp.split.ClpSplitProvider;
import com.facebook.presto.plugin.clp.split.filter.ClpMySqlSplitFilterProvider;
import com.facebook.presto.plugin.clp.split.filter.ClpSplitFilterProvider;
import com.facebook.presto.spi.PrestoException;
import com.google.inject.Binder;
import com.google.inject.Scopes;

import static com.facebook.airlift.configuration.ConfigBinder.configBinder;
import static com.facebook.presto.plugin.clp.ClpConfig.MetadataProviderType;
import static com.facebook.presto.plugin.clp.ClpConfig.SplitFilterProviderType;
import static com.facebook.presto.plugin.clp.ClpConfig.SplitProviderType;
import static com.facebook.presto.plugin.clp.ClpErrorCode.CLP_UNSUPPORTED_METADATA_SOURCE;
import static com.facebook.presto.plugin.clp.ClpErrorCode.CLP_UNSUPPORTED_SPLIT_FILTER_SOURCE;
import static com.facebook.presto.plugin.clp.ClpErrorCode.CLP_UNSUPPORTED_SPLIT_SOURCE;

public class ClpModule
Expand All @@ -42,17 +39,11 @@ protected void setup(Binder binder)
binder.bind(ClpMetadata.class).in(Scopes.SINGLETON);
binder.bind(ClpRecordSetProvider.class).in(Scopes.SINGLETON);
binder.bind(ClpSplitManager.class).in(Scopes.SINGLETON);
binder.bind(ClpSplitMetadataConfig.class).in(Scopes.SINGLETON);
configBinder(binder).bindConfig(ClpConfig.class);

ClpConfig config = buildConfigObject(ClpConfig.class);

if (SplitFilterProviderType.MYSQL == config.getSplitFilterProviderType()) {
binder.bind(ClpSplitFilterProvider.class).to(ClpMySqlSplitFilterProvider.class).in(Scopes.SINGLETON);
}
else {
throw new PrestoException(CLP_UNSUPPORTED_SPLIT_FILTER_SOURCE, "Unsupported split filter provider type: " + config.getSplitFilterProviderType());
}

if (config.getMetadataProviderType() == MetadataProviderType.MYSQL) {
binder.bind(ClpMetadataProvider.class).to(ClpMySqlMetadataProvider.class).in(Scopes.SINGLETON);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package com.facebook.presto.plugin.clp;

import com.facebook.presto.spi.ConnectorTableLayoutHandle;
import com.facebook.presto.spi.relation.RowExpression;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;

Expand All @@ -27,14 +28,17 @@ public class ClpTableLayoutHandle
{
private final ClpTableHandle table;
private final Optional<String> kqlQuery;
private final Optional<String> metadataSql;
private final Optional<RowExpression> metadataExpression;

@JsonCreator
public ClpTableLayoutHandle(@JsonProperty("table") ClpTableHandle table, @JsonProperty("kqlQuery") Optional<String> kqlQuery, @JsonProperty("metadataFilterQuery") Optional<String> metadataSql)
public ClpTableLayoutHandle(
@JsonProperty("table") ClpTableHandle table,
@JsonProperty("kqlQuery") Optional<String> kqlQuery,
@JsonProperty("metadataExpression") Optional<RowExpression> metadataSqlExpression)
{
this.table = table;
this.kqlQuery = kqlQuery;
this.metadataSql = metadataSql;
this.metadataExpression = metadataSqlExpression;
}

@JsonProperty
Expand All @@ -50,9 +54,9 @@ public Optional<String> getKqlQuery()
}

@JsonProperty
public Optional<String> getMetadataSql()
public Optional<RowExpression> getMetadataExpression()
{
return metadataSql;
return metadataExpression;
}

@Override
Expand All @@ -67,13 +71,13 @@ public boolean equals(Object o)
ClpTableLayoutHandle that = (ClpTableLayoutHandle) o;
return Objects.equals(table, that.table) &&
Objects.equals(kqlQuery, that.kqlQuery) &&
Objects.equals(metadataSql, that.metadataSql);
Objects.equals(metadataExpression, that.metadataExpression);
}

@Override
public int hashCode()
{
return Objects.hash(table, kqlQuery, metadataSql);
return Objects.hash(table, kqlQuery, metadataExpression);
}

@Override
Expand All @@ -82,7 +86,7 @@ public String toString()
return toStringHelper(this)
.add("table", table)
.add("kqlQuery", kqlQuery)
.add("metadataSql", metadataSql)
.add("metadataExpression", metadataExpression)
.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.facebook.presto.plugin.clp.ClpColumnHandle;
import com.facebook.presto.plugin.clp.ClpConfig;
import com.facebook.presto.plugin.clp.ClpTableHandle;
import com.facebook.presto.plugin.clp.split.ClpSplitMetadataConfig;
import com.facebook.presto.spi.SchemaTableName;
import com.google.common.collect.ImmutableList;

Expand All @@ -28,8 +29,10 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;

import static com.google.common.collect.ImmutableList.toImmutableList;
import static java.lang.String.format;

public class ClpMySqlMetadataProvider
Expand All @@ -56,9 +59,10 @@ public class ClpMySqlMetadataProvider
private static final Logger log = Logger.get(ClpMySqlMetadataProvider.class);

private final ClpConfig config;
private final ClpSplitMetadataConfig splitMetadataConfig;

@Inject
public ClpMySqlMetadataProvider(ClpConfig config)
public ClpMySqlMetadataProvider(ClpConfig config, ClpSplitMetadataConfig splitMetadataConfig)
{
try {
Class.forName("com.mysql.cj.jdbc.Driver");
Expand All @@ -68,6 +72,7 @@ public ClpMySqlMetadataProvider(ClpConfig config)
throw new RuntimeException("MySQL JDBC driver not found", e);
}
this.config = config;
this.splitMetadataConfig = splitMetadataConfig;
}

@Override
Expand All @@ -87,7 +92,16 @@ public List<ClpColumnHandle> listColumnHandles(SchemaTableName schemaTableName)
catch (SQLException e) {
log.warn("Failed to load table schema for %s: %s", schemaTableName.getTableName(), e);
}
return schemaTree.collectColumnHandles();

List<ClpColumnHandle> metadataColumns =
splitMetadataConfig.getMetadataColumns(schemaTableName).entrySet().stream()
.map(entry -> new ClpColumnHandle(entry.getKey(), entry.getValue()))
.collect(toImmutableList());
List<ClpColumnHandle> dataColumns = schemaTree.collectColumnHandles();
List<ClpColumnHandle> allColumns = new ArrayList<>(dataColumns);
allColumns.addAll(metadataColumns);

return allColumns;
}

@Override
Expand Down
Loading
Loading