diff --git a/presto-clp/pom.xml b/presto-clp/pom.xml
index d13b59968aff5..7a654371416e9 100644
--- a/presto-clp/pom.xml
+++ b/presto-clp/pom.xml
@@ -105,6 +105,12 @@
test
+
+ com.facebook.presto
+ presto-main
+ test
+
+
com.facebook.presto
presto-main-base
diff --git a/presto-clp/src/main/java/com/facebook/presto/plugin/clp/optimization/ClpFilterToKqlConverter.java b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/optimization/ClpFilterToKqlConverter.java
index f99369b4a48bd..f6247e8e46725 100644
--- a/presto-clp/src/main/java/com/facebook/presto/plugin/clp/optimization/ClpFilterToKqlConverter.java
+++ b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/optimization/ClpFilterToKqlConverter.java
@@ -16,6 +16,7 @@
import com.facebook.presto.common.function.OperatorType;
import com.facebook.presto.common.type.DecimalType;
import com.facebook.presto.common.type.RowType;
+import com.facebook.presto.common.type.TimestampType;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.common.type.VarcharType;
import com.facebook.presto.plugin.clp.ClpColumnHandle;
@@ -57,12 +58,15 @@
import static com.facebook.presto.common.type.IntegerType.INTEGER;
import static com.facebook.presto.common.type.RealType.REAL;
import static com.facebook.presto.common.type.SmallintType.SMALLINT;
+import static com.facebook.presto.common.type.TimestampType.TIMESTAMP;
+import static com.facebook.presto.common.type.TimestampType.TIMESTAMP_MICROSECONDS;
import static com.facebook.presto.common.type.TinyintType.TINYINT;
import static com.facebook.presto.plugin.clp.ClpErrorCode.CLP_PUSHDOWN_UNSUPPORTED_EXPRESSION;
import static com.facebook.presto.spi.relation.SpecialFormExpression.Form.AND;
import static java.lang.Integer.parseInt;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
+import static java.util.concurrent.TimeUnit.SECONDS;
/**
* A translator to translate Presto {@link RowExpression}s into:
@@ -256,8 +260,10 @@ private ClpExpression handleBetween(CallExpression node)
}
String variable = variableOpt.get();
- String lowerBound = getLiteralString((ConstantExpression) second);
- String upperBound = getLiteralString((ConstantExpression) third);
+ Type lowerBoundType = second.getType();
+ String lowerBound = tryEnsureNanosecondTimestamp(lowerBoundType, getLiteralString((ConstantExpression) second));
+ Type upperBoundType = third.getType();
+ String upperBound = tryEnsureNanosecondTimestamp(upperBoundType, getLiteralString((ConstantExpression) third));
String kql = String.format("%s >= %s AND %s <= %s", variable, lowerBound, variable, upperBound);
String metadataSqlQuery = metadataFilterColumns.contains(variable)
? String.format("\"%s\" >= %s AND \"%s\" <= %s", variable, lowerBound, variable, upperBound)
@@ -440,6 +446,7 @@ private ClpExpression buildClpExpression(
RowExpression originalNode)
{
String metadataSqlQuery = null;
+ literalString = tryEnsureNanosecondTimestamp(literalType, literalString);
if (operator.equals(EQUAL)) {
if (literalType instanceof VarcharType) {
return new ClpExpression(format("%s: \"%s\"", variableName, escapeKqlSpecialCharsForStringValue(literalString)));
@@ -914,9 +921,31 @@ public static boolean isClpCompatibleNumericType(Type type)
|| type.equals(TINYINT)
|| type.equals(DOUBLE)
|| type.equals(REAL)
+ || type.equals(TIMESTAMP)
+ || type.equals(TIMESTAMP_MICROSECONDS)
|| type instanceof DecimalType;
}
+ private static String tryEnsureNanosecondTimestamp(Type type, String literalString)
+ {
+ if (type == TIMESTAMP) {
+ return ensureNanosecondTimestamp(TIMESTAMP, literalString);
+ }
+ else if (type == TIMESTAMP_MICROSECONDS) {
+ return ensureNanosecondTimestamp(TIMESTAMP_MICROSECONDS, literalString);
+ }
+ return literalString;
+ }
+
+ private static String ensureNanosecondTimestamp(TimestampType type, String literalString)
+ {
+ long literalNumber = Long.parseLong(literalString);
+ long seconds = type.getEpochSecond(literalNumber);
+ long nanosecondFraction = type.getNanos(literalNumber);
+ long nanoseconds = SECONDS.toNanos(seconds) + nanosecondFraction;
+ return Long.toString(nanoseconds);
+ }
+
private static class SubstrInfo
{
String variableName;
diff --git a/presto-clp/src/test/java/com/facebook/presto/plugin/clp/TestClpPushDown.java b/presto-clp/src/test/java/com/facebook/presto/plugin/clp/TestClpPushDown.java
new file mode 100644
index 0000000000000..bfc529cd62750
--- /dev/null
+++ b/presto-clp/src/test/java/com/facebook/presto/plugin/clp/TestClpPushDown.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.plugin.clp;
+
+import com.facebook.presto.dispatcher.DispatchManager;
+import com.facebook.presto.execution.QueryManager;
+import com.facebook.presto.plugin.clp.metadata.ClpMetadataProvider;
+import com.facebook.presto.plugin.clp.metadata.ClpMySqlMetadataProvider;
+import com.facebook.presto.plugin.clp.mockdb.ClpMockMetadataDatabase;
+import com.facebook.presto.plugin.clp.mockdb.table.ColumnMetadataTableRows;
+import com.facebook.presto.spi.QueryId;
+import com.facebook.presto.spi.plan.PlanNode;
+import com.facebook.presto.spi.plan.PlanNodeId;
+import com.facebook.presto.spi.plan.TableScanNode;
+import com.facebook.presto.tests.DistributedQueryRunner;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.intellij.lang.annotations.Language;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.Map;
+import java.util.Optional;
+
+import static com.facebook.presto.execution.QueryState.RUNNING;
+import static com.facebook.presto.plugin.clp.ClpQueryRunner.createQueryRunner;
+import static com.facebook.presto.plugin.clp.metadata.ClpSchemaTreeNodeType.Boolean;
+import static com.facebook.presto.plugin.clp.metadata.ClpSchemaTreeNodeType.DateString;
+import static com.facebook.presto.plugin.clp.metadata.ClpSchemaTreeNodeType.Float;
+import static com.facebook.presto.plugin.clp.metadata.ClpSchemaTreeNodeType.Integer;
+import static com.facebook.presto.plugin.clp.metadata.ClpSchemaTreeNodeType.VarString;
+import static java.lang.String.format;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+@Test(singleThreaded = true)
+public class TestClpPushDown
+{
+ private static final String TABLE_NAME = "test_pushdown";
+ private static final Long TEST_TS_SECONDS_LOWER_BOUND = 1746003005L;
+ private static final Long TEST_TS_NANOSECONDS_LOWER_BOUND = 1746003005000000000L;
+ private static final Long TEST_TS_SECONDS_UPPER_BOUND = 1746003015L;
+ private static final Long TEST_TS_NANOSECONDS_UPPER_BOUND = 1746003015000000000L;
+
+ private ClpMockMetadataDatabase mockMetadataDatabase;
+ private DistributedQueryRunner queryRunner;
+ private QueryManager queryManager;
+ private DispatchManager dispatchManager;
+
+ @BeforeMethod
+ public void setUp()
+ throws Exception
+ {
+ mockMetadataDatabase = ClpMockMetadataDatabase
+ .builder()
+ .build();
+ mockMetadataDatabase.addTableToDatasetsTableIfNotExist(ImmutableList.of(TABLE_NAME));
+ mockMetadataDatabase.addColumnMetadata(ImmutableMap.of(TABLE_NAME, new ColumnMetadataTableRows(
+ ImmutableList.of(
+ "city.Name",
+ "city.Region.id",
+ "city.Region.Name",
+ "fare",
+ "isHoliday",
+ "ts"),
+ ImmutableList.of(
+ VarString,
+ Integer,
+ VarString,
+ Float,
+ Boolean,
+ DateString))));
+ ClpConfig config = new ClpConfig()
+ .setPolymorphicTypeEnabled(true)
+ .setMetadataDbUrl(mockMetadataDatabase.getUrl())
+ .setMetadataDbUser(mockMetadataDatabase.getUsername())
+ .setMetadataDbPassword(mockMetadataDatabase.getPassword())
+ .setMetadataTablePrefix(mockMetadataDatabase.getTablePrefix());
+ ClpMetadataProvider metadataProvider = new ClpMySqlMetadataProvider(config);
+ queryRunner = createQueryRunner(
+ mockMetadataDatabase.getUrl(),
+ mockMetadataDatabase.getUsername(),
+ mockMetadataDatabase.getPassword(),
+ mockMetadataDatabase.getTablePrefix(),
+ Optional.of(0),
+ Optional.empty());
+ queryManager = queryRunner.getCoordinator().getQueryManager();
+ dispatchManager = queryRunner.getCoordinator().getDispatchManager();
+ }
+
+ @AfterMethod
+ public void tearDown()
+ throws InterruptedException
+ {
+ long maxCleanUpTime = 5 * 1000L; // 5 seconds
+ long currentCleanUpTime = 0L;
+ while (!queryManager.getQueries().isEmpty() && currentCleanUpTime < maxCleanUpTime) {
+ Thread.sleep(1000L);
+ currentCleanUpTime += 1000L;
+ }
+ if (null != mockMetadataDatabase) {
+ mockMetadataDatabase.teardown();
+ }
+ }
+
+ public void testTimestampComparisons()
+ {
+ // Test logical binary
+ testPushDown(format("ts > from_unixtime(%s)", TEST_TS_SECONDS_LOWER_BOUND), format("ts > %s", TEST_TS_NANOSECONDS_LOWER_BOUND));
+ testPushDown(format("ts >= from_unixtime(%s)", TEST_TS_SECONDS_LOWER_BOUND), format("ts >= %s", TEST_TS_NANOSECONDS_LOWER_BOUND));
+ testPushDown(format("ts < from_unixtime(%s)", TEST_TS_SECONDS_LOWER_BOUND), format("ts < %s", TEST_TS_NANOSECONDS_LOWER_BOUND));
+ testPushDown(format("ts <= from_unixtime(%s)", TEST_TS_SECONDS_LOWER_BOUND), format("ts <= %s", TEST_TS_NANOSECONDS_LOWER_BOUND));
+ testPushDown(format("ts = from_unixtime(%s)", TEST_TS_SECONDS_LOWER_BOUND), format("ts: %s", TEST_TS_NANOSECONDS_LOWER_BOUND));
+ testPushDown(format("ts != from_unixtime(%s)", TEST_TS_SECONDS_LOWER_BOUND), format("NOT ts: %s", TEST_TS_NANOSECONDS_LOWER_BOUND));
+ testPushDown(format("ts <> from_unixtime(%s)", TEST_TS_SECONDS_LOWER_BOUND), format("NOT ts: %s", TEST_TS_NANOSECONDS_LOWER_BOUND));
+ testPushDown(format("from_unixtime(%s) < ts", TEST_TS_SECONDS_LOWER_BOUND), format("ts > %s", TEST_TS_NANOSECONDS_LOWER_BOUND));
+ testPushDown(format("from_unixtime(%s) <= ts", TEST_TS_SECONDS_LOWER_BOUND), format("ts >= %s", TEST_TS_NANOSECONDS_LOWER_BOUND));
+ testPushDown(format("from_unixtime(%s) > ts", TEST_TS_SECONDS_LOWER_BOUND), format("ts < %s", TEST_TS_NANOSECONDS_LOWER_BOUND));
+ testPushDown(format("from_unixtime(%s) >= ts", TEST_TS_SECONDS_LOWER_BOUND), format("ts <= %s", TEST_TS_NANOSECONDS_LOWER_BOUND));
+ testPushDown(format("from_unixtime(%s) = ts", TEST_TS_SECONDS_LOWER_BOUND), format("ts: %s", TEST_TS_NANOSECONDS_LOWER_BOUND));
+ testPushDown(format("from_unixtime(%s) != ts", TEST_TS_SECONDS_LOWER_BOUND), format("NOT ts: %s", TEST_TS_NANOSECONDS_LOWER_BOUND));
+ testPushDown(format("from_unixtime(%s) <> ts", TEST_TS_SECONDS_LOWER_BOUND), format("NOT ts: %s", TEST_TS_NANOSECONDS_LOWER_BOUND));
+
+ // Test BETWEEN
+ testPushDown(format("ts >= from_unixtime(%s) AND ts <= from_unixtime(%s)", TEST_TS_SECONDS_LOWER_BOUND, TEST_TS_SECONDS_UPPER_BOUND),
+ format("ts >= %s AND ts <= %s", TEST_TS_NANOSECONDS_LOWER_BOUND, TEST_TS_NANOSECONDS_UPPER_BOUND));
+ testPushDown(format("ts BETWEEN from_unixtime(%s) AND from_unixtime(%s)", TEST_TS_SECONDS_LOWER_BOUND, TEST_TS_SECONDS_UPPER_BOUND),
+ format("ts >= %s AND ts <= %s", TEST_TS_NANOSECONDS_LOWER_BOUND, TEST_TS_NANOSECONDS_UPPER_BOUND));
+ }
+
+ private void testPushDown(String filter, String expectedPushDown)
+ {
+ try {
+ QueryId id = queryRunner.getCoordinator().getDispatchManager().createQueryId();
+ @Language("SQL") String sql = format("SELECT * FROM clp.default.test_pushdown WHERE %s LIMIT 1", filter);
+ dispatchManager.createQuery(
+ id,
+ "slug",
+ 0,
+ new TestingClpSessionContext(queryRunner.getDefaultSession()),
+ sql).get();
+ long maxDispatchingAndPlanningTime = 60 * 1000L; // 1 minute
+ long currentWaitingTime = 0L;
+ while (dispatchManager.getQueryInfo(id).getState().ordinal() != RUNNING.ordinal() && currentWaitingTime < maxDispatchingAndPlanningTime) {
+ Thread.sleep(1000L);
+ currentWaitingTime += 1000L;
+ }
+ assertTrue(currentWaitingTime < maxDispatchingAndPlanningTime);
+ boolean isPushDownGenerated = false;
+ for (Map.Entry entry : queryManager.getFullQueryInfo(id).getPlanIdNodeMap().entrySet()) {
+ if (!(entry.getValue() instanceof TableScanNode)
+ || (!(((TableScanNode) entry.getValue()).getTable().getLayout().orElse(null) instanceof ClpTableLayoutHandle))) {
+ continue;
+ }
+ ClpTableLayoutHandle clpTableLayoutHandle = (ClpTableLayoutHandle) ((TableScanNode) entry.getValue()).getTable().getLayout().orElseThrow(AssertionError::new);
+ String actualPushDown = clpTableLayoutHandle.getKqlQuery().orElse(null);
+ assertEquals(actualPushDown, expectedPushDown);
+ isPushDownGenerated = true;
+ break;
+ }
+ assertTrue(isPushDownGenerated);
+ queryManager.cancelQuery(id);
+ }
+ catch (Exception e) {
+ fail(e.getMessage());
+ }
+ }
+}
diff --git a/presto-clp/src/test/java/com/facebook/presto/plugin/clp/TestingClpSessionContext.java b/presto-clp/src/test/java/com/facebook/presto/plugin/clp/TestingClpSessionContext.java
new file mode 100644
index 0000000000000..883cc8475c6c1
--- /dev/null
+++ b/presto-clp/src/test/java/com/facebook/presto/plugin/clp/TestingClpSessionContext.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.plugin.clp;
+
+import com.facebook.presto.Session;
+import com.facebook.presto.common.RuntimeStats;
+import com.facebook.presto.common.transaction.TransactionId;
+import com.facebook.presto.server.SessionContext;
+import com.facebook.presto.spi.ConnectorId;
+import com.facebook.presto.spi.function.SqlFunctionId;
+import com.facebook.presto.spi.function.SqlInvokedFunction;
+import com.facebook.presto.spi.security.Identity;
+import com.facebook.presto.spi.session.ResourceEstimates;
+import com.facebook.presto.spi.tracing.Tracer;
+import com.google.common.collect.ImmutableMap;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static java.util.Map.Entry;
+import static java.util.Objects.requireNonNull;
+
+public class TestingClpSessionContext
+ implements SessionContext
+{
+ private final Session session;
+
+ public TestingClpSessionContext(Session session)
+ {
+ this.session = requireNonNull(session, "session is null");
+ }
+
+ @Override
+ public Identity getIdentity()
+ {
+ return session.getIdentity();
+ }
+
+ @Override
+ public String getCatalog()
+ {
+ return session.getCatalog().orElse("clp");
+ }
+
+ @Override
+ public String getSchema()
+ {
+ return session.getSchema().orElse("default");
+ }
+
+ @Override
+ public String getSource()
+ {
+ return session.getSource().orElse(null);
+ }
+
+ @Override
+ public Optional getTraceToken()
+ {
+ return session.getTraceToken();
+ }
+
+ @Override
+ public String getRemoteUserAddress()
+ {
+ return session.getRemoteUserAddress().orElse(null);
+ }
+
+ @Override
+ public String getUserAgent()
+ {
+ return session.getUserAgent().orElse(null);
+ }
+
+ @Override
+ public String getClientInfo()
+ {
+ return session.getClientInfo().orElse(null);
+ }
+
+ @Override
+ public Set getClientTags()
+ {
+ return session.getClientTags();
+ }
+
+ @Override
+ public ResourceEstimates getResourceEstimates()
+ {
+ return session.getResourceEstimates();
+ }
+
+ @Override
+ public String getTimeZoneId()
+ {
+ return session.getTimeZoneKey().getId();
+ }
+
+ @Override
+ public String getLanguage()
+ {
+ return session.getLocale().getLanguage();
+ }
+
+ @Override
+ public Optional getTracer()
+ {
+ return session.getTracer();
+ }
+
+ @Override
+ public Map getSystemProperties()
+ {
+ return session.getSystemProperties();
+ }
+
+ @Override
+ public Map> getCatalogSessionProperties()
+ {
+ ImmutableMap.Builder> catalogSessionProperties = ImmutableMap.builder();
+ for (Entry> entry : session.getConnectorProperties().entrySet()) {
+ catalogSessionProperties.put(entry.getKey().getCatalogName(), entry.getValue());
+ }
+ return catalogSessionProperties.build();
+ }
+
+ @Override
+ public Map getPreparedStatements()
+ {
+ return session.getPreparedStatements();
+ }
+
+ @Override
+ public Optional getTransactionId()
+ {
+ return session.getTransactionId();
+ }
+
+ @Override
+ public boolean supportClientTransaction()
+ {
+ return session.isClientTransactionSupport();
+ }
+
+ @Override
+ public Map getSessionFunctions()
+ {
+ return session.getSessionFunctions();
+ }
+
+ @Override
+ public RuntimeStats getRuntimeStats()
+ {
+ return session.getRuntimeStats();
+ }
+}