From 7340bbad30dc67142c74f0cd5e20589252ee4132 Mon Sep 17 00:00:00 2001 From: Abhishek Balaji Radhakrishnan Date: Tue, 12 May 2026 20:09:12 -0700 Subject: [PATCH 1/8] fix: make system server properties table filterable & add some robustness --- docs/querying/sql-metadata-tables.md | 3 +- .../SystemServerPropertiesTableTest.java | 18 ++- .../schema/SystemServerPropertiesTable.java | 149 ++++++++++++++---- .../sql/calcite/schema/SystemSchemaTest.java | 124 ++++++++++++++- 4 files changed, 247 insertions(+), 47 deletions(-) diff --git a/docs/querying/sql-metadata-tables.md b/docs/querying/sql-metadata-tables.md index ebf5da39eba6..c39c942c340a 100644 --- a/docs/querying/sql-metadata-tables.md +++ b/docs/querying/sql-metadata-tables.md @@ -322,7 +322,7 @@ SELECT * FROM sys.supervisors WHERE healthy=0; ### SERVER_PROPERTIES table -The `server_properties` table exposes the runtime properties configured on for each Druid server. Each row represents a single property key-value pair associated with a specific server. +The `server_properties` table exposes the runtime properties configured on each Druid server. Each row represents a single property key-value pair associated with a specific server. This table supports filter and projection pushdown for efficient querying. If a server is unreachable, the table still returns a row for that server with the `error` column populated instead of failing the entire query. |Column|Type|Notes| |------|-----|-----| @@ -331,6 +331,7 @@ The `server_properties` table exposes the runtime properties configured on for e |node_roles|VARCHAR|Comma-separated list of roles that the server performs. For example, `[coordinator,overlord]` if the server functions as both a Coordinator and an Overlord.| |property|VARCHAR|Name of the property| |value|VARCHAR|Value of the property| +|error_message|VARCHAR|Describes why properties could not be retrieved from the server (e.g., connection refused, HTTP error). Null when properties were fetched successfully.| For example, to retrieve properties for a specific server, use the query diff --git a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/schema/SystemServerPropertiesTableTest.java b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/schema/SystemServerPropertiesTableTest.java index d5f062e0d47d..e3a331681c23 100644 --- a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/schema/SystemServerPropertiesTableTest.java +++ b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/schema/SystemServerPropertiesTableTest.java @@ -119,14 +119,14 @@ public void test_serverPropertiesTable_specificProperty() ); Assertions.assertEquals( - StringUtils.format("localhost:%s,%s,[%s],test.onlyBroker,brokerValue", BROKER_PORT, BROKER_SERVICE, NodeRole.BROKER_JSON_NAME), + StringUtils.format("localhost:%s,%s,[%s],test.onlyBroker,brokerValue,", BROKER_PORT, BROKER_SERVICE, NodeRole.BROKER_JSON_NAME), cluster.runSql("SELECT * FROM sys.server_properties WHERE server = 'localhost:%s' AND property = 'test.onlyBroker'", BROKER_PORT) ); String[] expectedRows = new String[] { - StringUtils.format("localhost:%s,%s,[%s],test.nonUniqueProperty,brokerNonUniqueValue", BROKER_PORT, BROKER_SERVICE, NodeRole.BROKER_JSON_NAME), - StringUtils.format("localhost:%s,%s,[%s],test.nonUniqueProperty,overlordNonUniqueValue", OVERLORD_PORT, OVERLORD_SERVICE, NodeRole.OVERLORD_JSON_NAME), - StringUtils.format("localhost:%s,%s,[%s],test.nonUniqueProperty,coordinatorNonUniqueValue", COORDINATOR_PORT, COORDINATOR_SERVICE, NodeRole.COORDINATOR_JSON_NAME), + StringUtils.format("localhost:%s,%s,[%s],test.nonUniqueProperty,brokerNonUniqueValue,", BROKER_PORT, BROKER_SERVICE, NodeRole.BROKER_JSON_NAME), + StringUtils.format("localhost:%s,%s,[%s],test.nonUniqueProperty,overlordNonUniqueValue,", OVERLORD_PORT, OVERLORD_SERVICE, NodeRole.OVERLORD_JSON_NAME), + StringUtils.format("localhost:%s,%s,[%s],test.nonUniqueProperty,coordinatorNonUniqueValue,", COORDINATOR_PORT, COORDINATOR_SERVICE, NodeRole.COORDINATOR_JSON_NAME), }; Arrays.sort(expectedRows, String::compareTo); final String result = cluster.runSql("SELECT * FROM sys.server_properties WHERE property='test.nonUniqueProperty'"); @@ -146,6 +146,13 @@ public void test_serverPropertiesTable_hiddenProperties() Assertions.assertFalse(brokerProps.containsKey("password")); } + @Test + public void test_serverPropertiesTable_errorMessageColumnExists() + { + final String result = cluster.runSql("SELECT COUNT(*) FROM sys.server_properties WHERE error_message IS NULL"); + Assertions.assertFalse(result.isEmpty(), "Should return count of servers with null error_message"); + } + private void verifyPropertiesForServer(Map properties, String serivceName, String hostAndPort, String nodeRole) { String[] expectedRows = properties.entrySet().stream().map(entry -> String.join( @@ -154,7 +161,8 @@ private void verifyPropertiesForServer(Map properties, String se escapeCsvField(serivceName), escapeCsvField(ImmutableList.of(nodeRole).toString()), escapeCsvField(entry.getKey()), - escapeCsvField(entry.getValue()) + escapeCsvField(entry.getValue()), + escapeCsvField(null) )).toArray(String[]::new); Arrays.sort(expectedRows, String::compareTo); final String result = cluster.runSql("SELECT * FROM sys.server_properties WHERE server='%s'", hostAndPort); diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemServerPropertiesTable.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemServerPropertiesTable.java index df8b313c8e1c..3751babbfc7d 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemServerPropertiesTable.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemServerPropertiesTable.java @@ -27,13 +27,14 @@ import org.apache.calcite.linq4j.Linq4j; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeFactory; -import org.apache.calcite.schema.ScannableTable; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.schema.ProjectableFilterableTable; import org.apache.calcite.schema.Schema; import org.apache.calcite.schema.impl.AbstractTable; import org.apache.druid.discovery.DiscoveryDruidNode; import org.apache.druid.discovery.DruidNodeDiscoveryProvider; -import org.apache.druid.error.InternalServerError; -import org.apache.druid.java.util.common.RE; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.http.client.HttpClient; import org.apache.druid.java.util.http.client.Request; import org.apache.druid.java.util.http.client.response.StringFullResponseHandler; @@ -47,11 +48,13 @@ import org.apache.druid.sql.calcite.table.RowSignatures; import org.jboss.netty.handler.codec.http.HttpMethod; +import javax.annotation.Nullable; import javax.servlet.http.HttpServletResponse; import java.net.URL; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -64,8 +67,10 @@ * that server would have multiple values in the column {@code node_roles} rather than duplicating all the * rows. */ -public class SystemServerPropertiesTable extends AbstractTable implements ScannableTable +public class SystemServerPropertiesTable extends AbstractTable implements ProjectableFilterableTable { + private static final Logger log = new Logger(SystemServerPropertiesTable.class); + public static final String TABLE_NAME = "server_properties"; static final RowSignature ROW_SIGNATURE = RowSignature @@ -75,8 +80,16 @@ public class SystemServerPropertiesTable extends AbstractTable implements Scanna .add("node_roles", ColumnType.STRING) .add("property", ColumnType.STRING) .add("value", ColumnType.STRING) + .add("error_message", ColumnType.STRING) .build(); + private static final int SERVER_INDEX = ROW_SIGNATURE.indexOf("server"); + private static final int SERVICE_NAME_INDEX = ROW_SIGNATURE.indexOf("service_name"); + private static final int NODE_ROLES_INDEX = ROW_SIGNATURE.indexOf("node_roles"); + private static final int PROPERTY_INDEX = ROW_SIGNATURE.indexOf("property"); + private static final int VALUE_INDEX = ROW_SIGNATURE.indexOf("value"); + private static final int ERROR_MESSAGE_INDEX = ROW_SIGNATURE.indexOf("error_message"); + private final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider; private final AuthorizerMapper authorizerMapper; private final HttpClient httpClient; @@ -108,42 +121,62 @@ public Schema.TableType getJdbcTableType() } @Override - public Enumerable scan(DataContext root) + public Enumerable scan( + final DataContext root, + final List filters, + @Nullable final int[] projects + ) { final AuthenticationResult authenticationResult = (AuthenticationResult) Preconditions.checkNotNull( root.get(PlannerContext.DATA_CTX_AUTHENTICATION_RESULT), "authenticationResult in dataContext" ); SystemSchema.checkStateReadAccessForServers(authenticationResult, authorizerMapper); + final Iterator druidServers = SystemSchema.getDruidServers(druidNodeDiscoveryProvider); final Map serverToPropertiesMap = new HashMap<>(); druidServers.forEachRemaining(discoveryDruidNode -> { final DruidNode druidNode = discoveryDruidNode.getDruidNode(); - final Map propertiesMap = getProperties(druidNode); - if (serverToPropertiesMap.containsKey(druidNode.getHostAndPortToUse())) { - ServerProperties serverProperties = serverToPropertiesMap.get(druidNode.getHostAndPortToUse()); - serverProperties.addNodeRole(discoveryDruidNode.getNodeRole().getJsonName()); + final String nodeRole = discoveryDruidNode.getNodeRole().getJsonName(); + + final String serverKey = druidNode.getHostAndPortToUse(); + final ServerProperties serverProperties = serverToPropertiesMap.get(serverKey); + if (serverProperties != null) { + serverProperties.addNodeRole(nodeRole); } else { serverToPropertiesMap.put( - druidNode.getHostAndPortToUse(), + serverKey, new ServerProperties( - druidNode.getServiceName(), - druidNode.getHostAndPortToUse(), - new ArrayList<>(Arrays.asList(discoveryDruidNode.getNodeRole().getJsonName())), - propertiesMap - ) + druidNode.getServiceName(), + serverKey, + new ArrayList<>(Arrays.asList(nodeRole)), + druidNode + ) ); } }); - ArrayList rows = new ArrayList<>(); + + final List rows = new ArrayList<>(); for (ServerProperties serverProperties : serverToPropertiesMap.values()) { - rows.addAll(serverProperties.toRows()); + rows.addAll(serverProperties.buildRows(this, projects)); } return Linq4j.asEnumerable(rows); } - private Map getProperties(DruidNode druidNode) + private static Object[] projectRow(final Object[] row, @Nullable final int[] projects) + { + if (projects == null) { + return row; + } + final Object[] projectedRow = new Object[projects.length]; + for (int i = 0; i < projects.length; i++) { + projectedRow[i] = row[projects[i]]; + } + return projectedRow; + } + + private PropertiesResult getProperties(DruidNode druidNode) { final String url = druidNode.getUriToUse().resolve("/status/properties").toString(); try { @@ -154,20 +187,34 @@ private Map getProperties(DruidNode druidNode) .get(); if (response.getStatus().getCode() != HttpServletResponse.SC_OK) { - throw new RE( - "Failed to get properties from node[%s]. Error code[%d], description[%s].", - url, - response.getStatus().getCode(), - response.getStatus().getReasonPhrase() - ); + final String errorMsg = StringUtils.format("HTTP %d: %s", + response.getStatus().getCode(), + response.getStatus().getReasonPhrase()); + log.warn("Failed to get properties from node[%s]: error[%s]", url, errorMsg); + return new PropertiesResult(new HashMap<>(), errorMsg); } - return jsonMapper.readValue( - response.getContent(), - new TypeReference<>(){} + return new PropertiesResult( + jsonMapper.readValue(response.getContent(), new TypeReference<>(){}), + null ); } catch (Exception e) { - throw InternalServerError.exception(e, "HTTP request to[%s] failed", url); + final String errorMsg = e.getMessage() != null ? e.getMessage() : e.getClass().getSimpleName(); + log.warn(e, "Failed to get properties from node[%s]", url); + return new PropertiesResult(new HashMap<>(), errorMsg); + } + } + + private static class PropertiesResult + { + final Map properties; + @Nullable + final String error; + + PropertiesResult(Map properties, @Nullable String error) + { + this.properties = properties; + this.error = error; } } @@ -176,14 +223,19 @@ private static class ServerProperties final String serviceName; final String server; final List nodeRoles; - final Map properties; + final DruidNode druidNode; - public ServerProperties(String serviceName, String server, List nodeRoles, Map properties) + public ServerProperties( + String serviceName, + String server, + List nodeRoles, + DruidNode druidNode + ) { this.serviceName = serviceName; this.server = server; this.nodeRoles = nodeRoles; - this.properties = properties; + this.druidNode = druidNode; } public void addNodeRole(String nodeRole) @@ -191,10 +243,39 @@ public void addNodeRole(String nodeRole) nodeRoles.add(nodeRole); } - public List toRows() + private List buildRows( + final SystemServerPropertiesTable table, + @Nullable final int[] projects + ) { - String nodeRolesString = nodeRoles.toString(); - return properties.entrySet().stream().map(entry -> new Object[]{server, serviceName, nodeRolesString, entry.getKey(), entry.getValue()}).collect(Collectors.toList()); + final String nodeRolesString = nodeRoles.toString(); + final PropertiesResult result = table.getProperties(druidNode); + final Map properties = result.properties; + final String error = result.error; + + if (properties.isEmpty()) { + final Object[] row = new Object[ROW_SIGNATURE.size()]; + row[SERVER_INDEX] = server; + row[SERVICE_NAME_INDEX] = serviceName; + row[NODE_ROLES_INDEX] = nodeRolesString; + row[PROPERTY_INDEX] = null; + row[VALUE_INDEX] = null; + row[ERROR_MESSAGE_INDEX] = error; + return Collections.singletonList(projectRow(row, projects)); + } + + return properties.entrySet().stream() + .map(entry -> { + final Object[] row = new Object[ROW_SIGNATURE.size()]; + row[SERVER_INDEX] = server; + row[SERVICE_NAME_INDEX] = serviceName; + row[NODE_ROLES_INDEX] = nodeRolesString; + row[PROPERTY_INDEX] = entry.getKey(); + row[VALUE_INDEX] = entry.getValue(); + row[ERROR_MESSAGE_INDEX] = error; + return projectRow(row, projects); + }) + .collect(Collectors.toList()); } } } diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java index bb381b18718f..ff8272f3dfa1 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java @@ -596,7 +596,7 @@ public void testGetTableMap() .get("server_properties"); final RelDataType propertiesRowType = propertiesTable.getRowType(new JavaTypeFactoryImpl()); final List propertiesFields = propertiesRowType.getFieldList(); - Assert.assertEquals(5, propertiesFields.size()); + Assert.assertEquals(6, propertiesFields.size()); } @Test @@ -1586,7 +1586,9 @@ public void testPropertiesTable() coordinator.getDruidNode().getHostAndPortToUse(), coordinator.getDruidNode().getServiceName(), ImmutableList.of(coordinator.getNodeRole().getJsonName()).toString(), - "druid.test-key", "test-value" + "druid.test-key", + "test-value", + null }); HttpResponse coordinator2HttpResponse = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); @@ -1598,7 +1600,9 @@ public void testPropertiesTable() coordinator2.getDruidNode().getHostAndPortToUse(), coordinator2.getDruidNode().getServiceName(), ImmutableList.of(coordinator2.getNodeRole().getJsonName()).toString(), - "druid.test-key3", "test-value3" + "druid.test-key3", + "test-value3", + null }); mockNodeDiscovery(NodeRole.MIDDLE_MANAGER, middleManager); @@ -1614,14 +1618,18 @@ public void testPropertiesTable() middleManager.getDruidNode().getHostAndPortToUse(), middleManager.getDruidNode().getServiceName(), ImmutableList.of(middleManager.getNodeRole().getJsonName()).toString(), - "druid.test-key", "test-value" + "druid.test-key", + "test-value", + null }); expectedRows .add(new Object[]{ middleManager.getDruidNode().getHostAndPortToUse(), - middleManager.getDruidNode().getServiceName(), + middleManager.getDruidNode().getServiceName(), ImmutableList.of(middleManager.getNodeRole().getJsonName()).toString(), - "druid.test-key2", "test-value2" + "druid.test-key2", + "test-value2", + null }); Map> urlToResponse = ImmutableMap.of( @@ -1649,7 +1657,7 @@ public void testPropertiesTable() EasyMock.replay(druidNodeDiscoveryProvider, responseHandler, httpClient); DataContext dataContext = createDataContext(Users.SUPER); - final List rows = propertiesTable.scan(dataContext).toList(); + final List rows = propertiesTable.scan(dataContext, Collections.emptyList(), null).toList(); expectedRows.sort((Object[] row1, Object[] row2) -> ((Comparable) row1[0]).compareTo(row2[0])); rows.sort((Object[] row1, Object[] row2) -> ((Comparable) row1[0]).compareTo(row2[0])); Assert.assertEquals(expectedRows.size(), rows.size()); @@ -1659,6 +1667,108 @@ public void testPropertiesTable() } + @Test + public void testPropertiesTable_withUnreachableServer() + { + SystemServerPropertiesTable propertiesTable = new SystemServerPropertiesTable( + druidNodeDiscoveryProvider, + authMapper, + httpClient, + MAPPER + ); + + // Mock all node roles (getDruidServers iterates through all of them) + mockNodeDiscovery(NodeRole.BROKER); + mockNodeDiscovery(NodeRole.ROUTER); + mockNodeDiscovery(NodeRole.HISTORICAL); + mockNodeDiscovery(NodeRole.OVERLORD); + mockNodeDiscovery(NodeRole.PEON); + mockNodeDiscovery(NodeRole.INDEXER); + mockNodeDiscovery(NodeRole.MIDDLE_MANAGER); + + // Mock a single coordinator that will fail to respond + mockNodeDiscovery(NodeRole.COORDINATOR, coordinator); + + // Mock HTTP client to throw exception (connection refused) + EasyMock.expect( + httpClient.go( + EasyMock.isA(Request.class), + EasyMock.isA(StringFullResponseHandler.class) + ) + ).andThrow(new RuntimeException("Connection refused")).once(); + + EasyMock.replay(druidNodeDiscoveryProvider, httpClient); + + DataContext dataContext = createDataContext(Users.SUPER); + final List rows = propertiesTable.scan(dataContext, Collections.emptyList(), null).toList(); + + // Should return 1 row even though properties fetch failed + Assert.assertEquals(1, rows.size()); + + // Verify server info is present + Assert.assertEquals(coordinator.getDruidNode().getHostAndPortToUse(), rows.get(0)[0]); + Assert.assertEquals(coordinator.getDruidNode().getServiceName(), rows.get(0)[1]); + + // Property and value should be null + Assert.assertNull(rows.get(0)[3]); + Assert.assertNull(rows.get(0)[4]); + + // Error column (index 5) should contain error message + Assert.assertNotNull(rows.get(0)[5]); + String error = (String) rows.get(0)[5]; + Assert.assertTrue("Error should mention connection refused", error.contains("Connection refused")); + + EasyMock.verify(druidNodeDiscoveryProvider, httpClient); + } + + @Test + public void testPropertiesTable_withHttpError() + { + SystemServerPropertiesTable propertiesTable = new SystemServerPropertiesTable( + druidNodeDiscoveryProvider, + authMapper, + httpClient, + MAPPER + ); + + // Mock all node roles + mockNodeDiscovery(NodeRole.BROKER); + mockNodeDiscovery(NodeRole.ROUTER); + mockNodeDiscovery(NodeRole.HISTORICAL); + mockNodeDiscovery(NodeRole.OVERLORD); + mockNodeDiscovery(NodeRole.PEON); + mockNodeDiscovery(NodeRole.INDEXER); + mockNodeDiscovery(NodeRole.MIDDLE_MANAGER); + + mockNodeDiscovery(NodeRole.COORDINATOR, coordinator); + + // Mock HTTP client to return 503 error + HttpResponse errorHttpResponse = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.SERVICE_UNAVAILABLE); + StringFullResponseHolder errorResponseHolder = new StringFullResponseHolder(errorHttpResponse, StandardCharsets.UTF_8); + errorResponseHolder.addChunk("Service temporarily unavailable"); + + EasyMock.expect( + httpClient.go( + EasyMock.isA(Request.class), + EasyMock.isA(StringFullResponseHandler.class) + ) + ).andReturn(Futures.immediateFuture(errorResponseHolder)).once(); + + EasyMock.replay(druidNodeDiscoveryProvider, httpClient); + + DataContext dataContext = createDataContext(Users.SUPER); + final List rows = propertiesTable.scan(dataContext, Collections.emptyList(), null).toList(); + + Assert.assertEquals(1, rows.size()); + + // Error column should contain HTTP status + Assert.assertNotNull(rows.get(0)[5]); + String error = (String) rows.get(0)[5]; + Assert.assertTrue("Error should mention HTTP 503", error.contains("503")); + + EasyMock.verify(druidNodeDiscoveryProvider, httpClient); + } + @Test public void testQueriesTable() { From e48554326f9d2730c5d7f7bde5fe5f353d9ad137 Mon Sep 17 00:00:00 2001 From: Abhishek Balaji Radhakrishnan Date: Tue, 12 May 2026 20:55:18 -0700 Subject: [PATCH 2/8] spell check fix --- website/.spelling | 1 + 1 file changed, 1 insertion(+) diff --git a/website/.spelling b/website/.spelling index 28701817f362..9703e03a22df 100644 --- a/website/.spelling +++ b/website/.spelling @@ -715,6 +715,7 @@ druid.sql.planner.useApproximateTopN druid.sql.planner.useLexicographicTopN druid.sql.planner.useGroupingSetForExactDistinct druid.sql.planner.useNativeQueryExplain +error_message error_msg exprs group_id From ec68fd336b9d985a8ec6ffdf48e3131d0cefb1d9 Mon Sep 17 00:00:00 2001 From: Abhishek Radhakrishnan Date: Wed, 13 May 2026 07:53:12 -0700 Subject: [PATCH 3/8] Update docs/querying/sql-metadata-tables.md Co-authored-by: Virushade --- docs/querying/sql-metadata-tables.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/querying/sql-metadata-tables.md b/docs/querying/sql-metadata-tables.md index c39c942c340a..53184a034a90 100644 --- a/docs/querying/sql-metadata-tables.md +++ b/docs/querying/sql-metadata-tables.md @@ -322,7 +322,7 @@ SELECT * FROM sys.supervisors WHERE healthy=0; ### SERVER_PROPERTIES table -The `server_properties` table exposes the runtime properties configured on each Druid server. Each row represents a single property key-value pair associated with a specific server. This table supports filter and projection pushdown for efficient querying. If a server is unreachable, the table still returns a row for that server with the `error` column populated instead of failing the entire query. +The `server_properties` table exposes the runtime properties configured on each Druid server. Each row represents a single property key-value pair associated with a specific server. This table supports filter and projection pushdown for efficient querying. If a server is unreachable, the table still returns a row for that server with the `error_message` column populated instead of failing the entire query. |Column|Type|Notes| |------|-----|-----| From e7d609ac0c191ff1b314d9bf9d13fee65466ecba Mon Sep 17 00:00:00 2001 From: Abhishek Balaji Radhakrishnan Date: Wed, 13 May 2026 08:00:11 -0700 Subject: [PATCH 4/8] filter pushdown for server & service_name (only equality filters for now) --- .../SystemServerPropertiesTableTest.java | 99 +++++++++++++++++++ .../schema/SystemServerPropertiesTable.java | 66 +++++++++++++ 2 files changed, 165 insertions(+) diff --git a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/schema/SystemServerPropertiesTableTest.java b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/schema/SystemServerPropertiesTableTest.java index e3a331681c23..1a5ed0df8d0d 100644 --- a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/schema/SystemServerPropertiesTableTest.java +++ b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/schema/SystemServerPropertiesTableTest.java @@ -146,6 +146,105 @@ public void test_serverPropertiesTable_hiddenProperties() Assertions.assertFalse(brokerProps.containsKey("password")); } + @Test + public void test_serverPropertiesTable_serverFilterPushdown() + { + final String brokerHost = StringUtils.format("localhost:%s", BROKER_PORT); + + // Equality filter returns only matching server rows + final String result = cluster.runSql( + "SELECT server, service_name, property FROM sys.server_properties WHERE server = '%s'", + brokerHost + ); + Assertions.assertFalse(result.isEmpty(), "Should return properties for the broker"); + for (String row : result.split("\n")) { + Assertions.assertTrue( + row.startsWith(brokerHost + ","), + "Row should belong to filtered server: " + row + ); + } + + // Non-existent server returns no rows + final String emptyResult = cluster.runSql( + "SELECT * FROM sys.server_properties WHERE server = 'nonexistent:9999'" + ); + Assertions.assertTrue(emptyResult.isEmpty(), "Non-existent server filter should return no rows"); + + // != is not consumed — falls back to Calcite post-filter, still correct + final String neResult = cluster.runSql( + "SELECT DISTINCT server FROM sys.server_properties WHERE server != '%s'", + brokerHost + ); + Assertions.assertFalse(neResult.isEmpty(), "!= filter should still return other servers"); + for (String row : neResult.split("\n")) { + Assertions.assertFalse( + row.trim().equals(brokerHost), + "!= filter should exclude the broker: " + row + ); + } + + // AND with a non-pushdown predicate — server filter consumed, rest handled by Calcite + final String andResult = cluster.runSql( + "SELECT server, property FROM sys.server_properties WHERE server = '%s' AND node_roles LIKE '%%broker%%'", + brokerHost + ); + Assertions.assertFalse(andResult.isEmpty(), "AND with node_roles filter should return rows"); + for (String row : andResult.split("\n")) { + Assertions.assertTrue( + row.startsWith(brokerHost + ","), + "Row should belong to filtered server: " + row + ); + } + } + + @Test + public void test_serverPropertiesTable_serviceNameFilterPushdown() + { + final String brokerHost = StringUtils.format("localhost:%s", BROKER_PORT); + + // Equality filter on service_name returns only matching rows + final String result = cluster.runSql( + "SELECT server, service_name, property FROM sys.server_properties WHERE service_name = '%s'", + BROKER_SERVICE + ); + Assertions.assertFalse(result.isEmpty(), "Should return properties for the broker service"); + for (String row : result.split("\n")) { + String[] cols = row.split(",", -1); + Assertions.assertEquals(BROKER_SERVICE, cols[1], "Row should belong to filtered service_name: " + row); + } + + // Non-existent service_name returns no rows + final String emptyResult = cluster.runSql( + "SELECT * FROM sys.server_properties WHERE service_name = 'nonexistent/service'" + ); + Assertions.assertTrue(emptyResult.isEmpty(), "Non-existent service_name filter should return no rows"); + + // != falls back to Calcite post-filter + final String neResult = cluster.runSql( + "SELECT DISTINCT service_name FROM sys.server_properties WHERE service_name != '%s'", + BROKER_SERVICE + ); + Assertions.assertFalse(neResult.isEmpty(), "!= filter should still return other services"); + for (String row : neResult.split("\n")) { + Assertions.assertFalse( + row.trim().equals(BROKER_SERVICE), + "!= filter should exclude the broker service: " + row + ); + } + + // Both server and service_name filters consumed together + final String andResult = cluster.runSql( + "SELECT server, service_name, property FROM sys.server_properties WHERE service_name = '%s' AND server = '%s'", + BROKER_SERVICE, brokerHost + ); + Assertions.assertFalse(andResult.isEmpty(), "AND with server and service_name should return rows"); + for (String row : andResult.split("\n")) { + String[] cols = row.split(",", -1); + Assertions.assertEquals(brokerHost, cols[0], "Row server should match: " + row); + Assertions.assertEquals(BROKER_SERVICE, cols[1], "Row service_name should match: " + row); + } + } + @Test public void test_serverPropertiesTable_errorMessageColumnExists() { diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemServerPropertiesTable.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemServerPropertiesTable.java index 3751babbfc7d..6c11ea9ca5a4 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemServerPropertiesTable.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemServerPropertiesTable.java @@ -27,10 +27,14 @@ import org.apache.calcite.linq4j.Linq4j; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexLiteral; import org.apache.calcite.rex.RexNode; import org.apache.calcite.schema.ProjectableFilterableTable; import org.apache.calcite.schema.Schema; import org.apache.calcite.schema.impl.AbstractTable; +import org.apache.calcite.sql.SqlKind; import org.apache.druid.discovery.DiscoveryDruidNode; import org.apache.druid.discovery.DruidNodeDiscoveryProvider; import org.apache.druid.java.util.common.StringUtils; @@ -56,9 +60,11 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; /** @@ -133,6 +139,11 @@ public Enumerable scan( ); SystemSchema.checkStateReadAccessForServers(authenticationResult, authorizerMapper); + // Extract equality filters to skip fetching properties from non-matching servers. + final Map> columnFilters = extractColumnFilters(filters, SERVER_INDEX, SERVICE_NAME_INDEX); + final Set serverFilter = columnFilters.get(SERVER_INDEX); + final Set serviceNameFilter = columnFilters.get(SERVICE_NAME_INDEX); + final Iterator druidServers = SystemSchema.getDruidServers(druidNodeDiscoveryProvider); final Map serverToPropertiesMap = new HashMap<>(); @@ -141,6 +152,14 @@ public Enumerable scan( final String nodeRole = discoveryDruidNode.getNodeRole().getJsonName(); final String serverKey = druidNode.getHostAndPortToUse(); + + if (serverFilter != null && !serverFilter.contains(serverKey)) { + return; + } + if (serviceNameFilter != null && !serviceNameFilter.contains(druidNode.getServiceName())) { + return; + } + final ServerProperties serverProperties = serverToPropertiesMap.get(serverKey); if (serverProperties != null) { serverProperties.addNodeRole(nodeRole); @@ -164,6 +183,53 @@ public Enumerable scan( return Linq4j.asEnumerable(rows); } + /** + * Single-pass extraction of equality filters for multiple columns. + * Consumed filters are removed from the list so Calcite won't re-apply them. + */ + private static Map> extractColumnFilters(final List filters, final int... columnIndices) + { + final Map> result = new HashMap<>(); + final Iterator iterator = filters.iterator(); + while (iterator.hasNext()) { + final RexNode filter = iterator.next(); + for (final int columnIndex : columnIndices) { + final String value = extractEqualityOnColumn(filter, columnIndex); + if (value != null) { + result.computeIfAbsent(columnIndex, k -> new HashSet<>()).add(value); + iterator.remove(); + break; + } + } + } + return result; + } + + @Nullable + private static String extractEqualityOnColumn(final RexNode node, final int columnIndex) + { + if (!(node instanceof RexCall)) { + return null; + } + final RexCall call = (RexCall) node; + if (call.getKind() != SqlKind.EQUALS) { + return null; + } + final RexNode left = call.getOperands().get(0); + final RexNode right = call.getOperands().get(1); + + if (left instanceof RexInputRef && right instanceof RexLiteral) { + if (((RexInputRef) left).getIndex() == columnIndex) { + return RexLiteral.stringValue(right); + } + } else if (right instanceof RexInputRef && left instanceof RexLiteral) { + if (((RexInputRef) right).getIndex() == columnIndex) { + return RexLiteral.stringValue(left); + } + } + return null; + } + private static Object[] projectRow(final Object[] row, @Nullable final int[] projects) { if (projects == null) { From c204ae6d28a92ad532d4daa4085dc3d8117a7d4a Mon Sep 17 00:00:00 2001 From: Abhishek Balaji Radhakrishnan Date: Wed, 13 May 2026 08:07:09 -0700 Subject: [PATCH 5/8] other changes from review --- .../schema/SystemServerPropertiesTableTest.java | 12 +++++++++--- .../calcite/schema/SystemServerPropertiesTable.java | 13 +++++-------- 2 files changed, 14 insertions(+), 11 deletions(-) diff --git a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/schema/SystemServerPropertiesTableTest.java b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/schema/SystemServerPropertiesTableTest.java index 1a5ed0df8d0d..4a4fee4eb1a8 100644 --- a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/schema/SystemServerPropertiesTableTest.java +++ b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/schema/SystemServerPropertiesTableTest.java @@ -246,10 +246,16 @@ public void test_serverPropertiesTable_serviceNameFilterPushdown() } @Test - public void test_serverPropertiesTable_errorMessageColumnExists() + public void test_serverPropertiesTable_errorMessageIsNullForHealthyServers() { - final String result = cluster.runSql("SELECT COUNT(*) FROM sys.server_properties WHERE error_message IS NULL"); - Assertions.assertFalse(result.isEmpty(), "Should return count of servers with null error_message"); + // All 3 servers in the embedded cluster are healthy, so no rows should have a non-null error_message + final String errorRows = cluster.runSql("SELECT server FROM sys.server_properties WHERE error_message IS NOT NULL"); + Assertions.assertTrue(errorRows.isEmpty(), "Healthy servers should have null error_message"); + + // Every row should have a null error_message + final String totalCount = cluster.runSql("SELECT COUNT(*) FROM sys.server_properties"); + final String nullErrorCount = cluster.runSql("SELECT COUNT(*) FROM sys.server_properties WHERE error_message IS NULL"); + Assertions.assertEquals(totalCount, nullErrorCount, "All rows should have null error_message in a healthy cluster"); } private void verifyPropertiesForServer(Map properties, String serivceName, String hostAndPort, String nodeRole) diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemServerPropertiesTable.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemServerPropertiesTable.java index 6c11ea9ca5a4..8c8ddbb55dc0 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemServerPropertiesTable.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemServerPropertiesTable.java @@ -183,21 +183,14 @@ public Enumerable scan( return Linq4j.asEnumerable(rows); } - /** - * Single-pass extraction of equality filters for multiple columns. - * Consumed filters are removed from the list so Calcite won't re-apply them. - */ private static Map> extractColumnFilters(final List filters, final int... columnIndices) { final Map> result = new HashMap<>(); - final Iterator iterator = filters.iterator(); - while (iterator.hasNext()) { - final RexNode filter = iterator.next(); + for (final RexNode filter : filters) { for (final int columnIndex : columnIndices) { final String value = extractEqualityOnColumn(filter, columnIndex); if (value != null) { result.computeIfAbsent(columnIndex, k -> new HashSet<>()).add(value); - iterator.remove(); break; } } @@ -264,6 +257,10 @@ private PropertiesResult getProperties(DruidNode druidNode) null ); } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(StringUtils.format("Interrupted while fetching properties from node[%s]", url), e); + } catch (Exception e) { final String errorMsg = e.getMessage() != null ? e.getMessage() : e.getClass().getSimpleName(); log.warn(e, "Failed to get properties from node[%s]", url); From 58c496d2bc3811e315cc4f5876cbe360f5e50790 Mon Sep 17 00:00:00 2001 From: Abhishek Balaji Radhakrishnan Date: Wed, 13 May 2026 08:42:39 -0700 Subject: [PATCH 6/8] javadocs --- .../schema/SystemServerPropertiesTable.java | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemServerPropertiesTable.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemServerPropertiesTable.java index 8c8ddbb55dc0..4b25c3cefa12 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemServerPropertiesTable.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemServerPropertiesTable.java @@ -140,7 +140,7 @@ public Enumerable scan( SystemSchema.checkStateReadAccessForServers(authenticationResult, authorizerMapper); // Extract equality filters to skip fetching properties from non-matching servers. - final Map> columnFilters = extractColumnFilters(filters, SERVER_INDEX, SERVICE_NAME_INDEX); + final Map> columnFilters = extractColumnEqualityFilters(filters, SERVER_INDEX, SERVICE_NAME_INDEX); final Set serverFilter = columnFilters.get(SERVER_INDEX); final Set serviceNameFilter = columnFilters.get(SERVICE_NAME_INDEX); @@ -183,7 +183,14 @@ public Enumerable scan( return Linq4j.asEnumerable(rows); } - private static Map> extractColumnFilters(final List filters, final int... columnIndices) + /** + * Extracts simple equality filters ({@code column = 'literal'}) for the specified columns. + * Only handles top-level AND equalities; any other predicate (!=, LIKE, OR, functions) is + * ignored and left for Calcite to apply as a post-filter. + * + * @return map from column index to the set of literal values; absent key means no filter for that column + */ + private static Map> extractColumnEqualityFilters(final List filters, final int... columnIndices) { final Map> result = new HashMap<>(); for (final RexNode filter : filters) { @@ -198,6 +205,10 @@ private static Map> extractColumnFilters(final List Date: Wed, 13 May 2026 12:00:47 -0700 Subject: [PATCH 7/8] unit tests for coverage --- .../sql/calcite/schema/SystemSchemaTest.java | 374 +++++++++++++++++- 1 file changed, 353 insertions(+), 21 deletions(-) diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java index ff8272f3dfa1..4006c043eb45 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java @@ -34,8 +34,11 @@ import org.apache.calcite.linq4j.QueryProvider; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexNode; import org.apache.calcite.schema.SchemaPlus; import org.apache.calcite.schema.Table; +import org.apache.calcite.sql.fun.SqlStdOperatorTable; import org.apache.calcite.sql.type.SqlTypeName; import org.apache.druid.client.DruidServer; import org.apache.druid.client.FilteredServerInventoryView; @@ -151,6 +154,10 @@ public class SystemSchemaTest extends CalciteTestBase { private static final ObjectMapper MAPPER = CalciteTests.getJsonMapper(); + private static final int SERVER_INDEX = SystemServerPropertiesTable.ROW_SIGNATURE.indexOf("server"); + private static final int SERVICE_NAME_INDEX = SystemServerPropertiesTable.ROW_SIGNATURE.indexOf("service_name"); + private static final int PROPERTY_INDEX = SystemServerPropertiesTable.ROW_SIGNATURE.indexOf("property"); + private static final String DATASOURCE_ALL_ACCESS = "allAccess"; private static final BrokerSegmentMetadataCacheConfig SEGMENT_CACHE_CONFIG_DEFAULT = BrokerSegmentMetadataCacheConfig.create(); @@ -1677,17 +1684,7 @@ public void testPropertiesTable_withUnreachableServer() MAPPER ); - // Mock all node roles (getDruidServers iterates through all of them) - mockNodeDiscovery(NodeRole.BROKER); - mockNodeDiscovery(NodeRole.ROUTER); - mockNodeDiscovery(NodeRole.HISTORICAL); - mockNodeDiscovery(NodeRole.OVERLORD); - mockNodeDiscovery(NodeRole.PEON); - mockNodeDiscovery(NodeRole.INDEXER); - mockNodeDiscovery(NodeRole.MIDDLE_MANAGER); - - // Mock a single coordinator that will fail to respond - mockNodeDiscovery(NodeRole.COORDINATOR, coordinator); + mockAllNodeRolesWithCoordinator(coordinator); // Mock HTTP client to throw exception (connection refused) EasyMock.expect( @@ -1731,16 +1728,7 @@ public void testPropertiesTable_withHttpError() MAPPER ); - // Mock all node roles - mockNodeDiscovery(NodeRole.BROKER); - mockNodeDiscovery(NodeRole.ROUTER); - mockNodeDiscovery(NodeRole.HISTORICAL); - mockNodeDiscovery(NodeRole.OVERLORD); - mockNodeDiscovery(NodeRole.PEON); - mockNodeDiscovery(NodeRole.INDEXER); - mockNodeDiscovery(NodeRole.MIDDLE_MANAGER); - - mockNodeDiscovery(NodeRole.COORDINATOR, coordinator); + mockAllNodeRolesWithCoordinator(coordinator); // Mock HTTP client to return 503 error HttpResponse errorHttpResponse = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.SERVICE_UNAVAILABLE); @@ -1769,6 +1757,338 @@ public void testPropertiesTable_withHttpError() EasyMock.verify(druidNodeDiscoveryProvider, httpClient); } + @Test + public void testPropertiesTable_filterPushdown() + { + SystemServerPropertiesTable propertiesTable = new SystemServerPropertiesTable( + druidNodeDiscoveryProvider, + authMapper, + httpClient, + MAPPER + ); + + mockAllNodeRolesWithCoordinator(coordinator, coordinator2); + + // coordinator (localhost:8081, service "s1") will be fetched; coordinator2 (localhost:8181, service "s1") will not + HttpResponse resp = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); + StringFullResponseHolder holder = new StringFullResponseHolder(resp, StandardCharsets.UTF_8); + holder.addChunk("{\"druid.key\": \"val\"}"); + + EasyMock.expect( + httpClient.go(EasyMock.isA(Request.class), EasyMock.isA(StringFullResponseHandler.class)) + ).andReturn(Futures.immediateFuture(holder)).once(); + + EasyMock.replay(druidNodeDiscoveryProvider, httpClient); + + final RexBuilder rexBuilder = new RexBuilder(new JavaTypeFactoryImpl()); + final RelDataType rowType = propertiesTable.getRowType(new JavaTypeFactoryImpl()); + + // server = 'localhost:8081' — only coordinator matches, coordinator2 skipped (1 HTTP call) + final RexNode serverEquality = rexBuilder.makeCall( + SqlStdOperatorTable.EQUALS, + rexBuilder.makeInputRef(rowType.getFieldList().get(SERVER_INDEX).getType(), SERVER_INDEX), + rexBuilder.makeLiteral("localhost:8081") + ); + + DataContext dataContext = createDataContext(Users.SUPER); + final List rows = propertiesTable.scan(dataContext, ImmutableList.of(serverEquality), null).toList(); + + Assert.assertEquals(1, rows.size()); + Assert.assertEquals("localhost:8081", rows.get(0)[0]); + Assert.assertEquals("druid.key", rows.get(0)[3]); + Assert.assertEquals("val", rows.get(0)[4]); + + EasyMock.verify(druidNodeDiscoveryProvider, httpClient); + } + + @Test + public void testPropertiesTable_filterPushdownServiceNameAndNonMatching() + { + SystemServerPropertiesTable propertiesTable = new SystemServerPropertiesTable( + druidNodeDiscoveryProvider, + authMapper, + httpClient, + MAPPER + ); + + mockAllNodeRolesWithCoordinator(coordinator, coordinator2); + + // Both coordinators have service "s1", so both match service_name filter + HttpResponse resp1 = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); + StringFullResponseHolder holder1 = new StringFullResponseHolder(resp1, StandardCharsets.UTF_8); + holder1.addChunk("{\"k1\": \"v1\"}"); + + HttpResponse resp2 = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); + StringFullResponseHolder holder2 = new StringFullResponseHolder(resp2, StandardCharsets.UTF_8); + holder2.addChunk("{\"k2\": \"v2\"}"); + + EasyMock.expect( + httpClient.go(EasyMock.isA(Request.class), EasyMock.isA(StringFullResponseHandler.class)) + ).andAnswer(() -> { + Request req = (Request) EasyMock.getCurrentArguments()[0]; + String url = req.getUrl().toString(); + if (url.contains("8081")) { + return Futures.immediateFuture(holder1); + } else { + return Futures.immediateFuture(holder2); + } + }).times(2); + + EasyMock.replay(druidNodeDiscoveryProvider, httpClient); + + final RexBuilder rexBuilder = new RexBuilder(new JavaTypeFactoryImpl()); + final RelDataType rowType = propertiesTable.getRowType(new JavaTypeFactoryImpl()); + + // service_name = 's1' — both coordinators match + final RexNode serviceNameEquality = rexBuilder.makeCall( + SqlStdOperatorTable.EQUALS, + rexBuilder.makeInputRef(rowType.getFieldList().get(SERVICE_NAME_INDEX).getType(), SERVICE_NAME_INDEX), + rexBuilder.makeLiteral("s1") + ); + + DataContext dataContext = createDataContext(Users.SUPER); + List rows = propertiesTable.scan(dataContext, ImmutableList.of(serviceNameEquality), null).toList(); + Assert.assertEquals(2, rows.size()); + + // Non-matching server filter returns 0 rows with no HTTP calls + EasyMock.verify(druidNodeDiscoveryProvider, httpClient); + EasyMock.reset(druidNodeDiscoveryProvider, httpClient); + mockAllNodeRolesWithCoordinator(coordinator); + EasyMock.replay(druidNodeDiscoveryProvider, httpClient); + + final RexNode nonMatchingFilter = rexBuilder.makeCall( + SqlStdOperatorTable.EQUALS, + rexBuilder.makeInputRef(rowType.getFieldList().get(SERVER_INDEX).getType(), SERVER_INDEX), + rexBuilder.makeLiteral("nonexistent:9999") + ); + + dataContext = createDataContext(Users.SUPER); + rows = propertiesTable.scan(dataContext, ImmutableList.of(nonMatchingFilter), null).toList(); + Assert.assertEquals(0, rows.size()); + + EasyMock.verify(druidNodeDiscoveryProvider, httpClient); + } + + @Test + public void testPropertiesTable_filterFallback() + { + SystemServerPropertiesTable propertiesTable = new SystemServerPropertiesTable( + druidNodeDiscoveryProvider, + authMapper, + httpClient, + MAPPER + ); + + final RexBuilder rexBuilder = new RexBuilder(new JavaTypeFactoryImpl()); + final RelDataType rowType = propertiesTable.getRowType(new JavaTypeFactoryImpl()); + + // 1) NOT_EQUALS is not pushed down — all rows returned + mockAllNodeRolesWithCoordinator(coordinator); + HttpResponse resp1 = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); + StringFullResponseHolder holder1 = new StringFullResponseHolder(resp1, StandardCharsets.UTF_8); + holder1.addChunk("{\"druid.key\": \"val\", \"druid.other\": \"other\"}"); + EasyMock.expect(httpClient.go(EasyMock.isA(Request.class), EasyMock.isA(StringFullResponseHandler.class))) + .andReturn(Futures.immediateFuture(holder1)).once(); + EasyMock.replay(druidNodeDiscoveryProvider, httpClient); + + final RexNode notEquals = rexBuilder.makeCall( + SqlStdOperatorTable.NOT_EQUALS, + rexBuilder.makeInputRef(rowType.getFieldList().get(SERVER_INDEX).getType(), SERVER_INDEX), + rexBuilder.makeLiteral("some-server:1234") + ); + Assert.assertEquals(2, propertiesTable.scan(createDataContext(Users.SUPER), ImmutableList.of(notEquals), null).toList().size()); + EasyMock.verify(druidNodeDiscoveryProvider, httpClient); + + // 2) Non-RexCall filter (bare RexInputRef) is ignored + EasyMock.reset(druidNodeDiscoveryProvider, httpClient); + mockAllNodeRolesWithCoordinator(coordinator); + HttpResponse resp2 = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); + StringFullResponseHolder holder2 = new StringFullResponseHolder(resp2, StandardCharsets.UTF_8); + holder2.addChunk("{\"druid.key\": \"val\", \"druid.other\": \"other\"}"); + EasyMock.expect(httpClient.go(EasyMock.isA(Request.class), EasyMock.isA(StringFullResponseHandler.class))) + .andReturn(Futures.immediateFuture(holder2)).once(); + EasyMock.replay(druidNodeDiscoveryProvider, httpClient); + + final RexNode inputRef = rexBuilder.makeInputRef(rowType.getFieldList().get(SERVER_INDEX).getType(), SERVER_INDEX); + Assert.assertEquals(2, propertiesTable.scan(createDataContext(Users.SUPER), ImmutableList.of(inputRef), null).toList().size()); + EasyMock.verify(druidNodeDiscoveryProvider, httpClient); + + // 3) Equality on non-pushed column (property) is ignored + EasyMock.reset(druidNodeDiscoveryProvider, httpClient); + mockAllNodeRolesWithCoordinator(coordinator); + HttpResponse resp3 = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); + StringFullResponseHolder holder3 = new StringFullResponseHolder(resp3, StandardCharsets.UTF_8); + holder3.addChunk("{\"druid.key\": \"val\", \"druid.other\": \"other\"}"); + EasyMock.expect(httpClient.go(EasyMock.isA(Request.class), EasyMock.isA(StringFullResponseHandler.class))) + .andReturn(Futures.immediateFuture(holder3)).once(); + EasyMock.replay(druidNodeDiscoveryProvider, httpClient); + + final RexNode propertyEquality = rexBuilder.makeCall( + SqlStdOperatorTable.EQUALS, + rexBuilder.makeInputRef(rowType.getFieldList().get(PROPERTY_INDEX).getType(), PROPERTY_INDEX), + rexBuilder.makeLiteral("druid.key") + ); + Assert.assertEquals(2, propertiesTable.scan(createDataContext(Users.SUPER), ImmutableList.of(propertyEquality), null).toList().size()); + EasyMock.verify(druidNodeDiscoveryProvider, httpClient); + + // 4) Reversed equality ('localhost:8081' = server) is correctly extracted + EasyMock.reset(druidNodeDiscoveryProvider, httpClient); + mockAllNodeRolesWithCoordinator(coordinator); + HttpResponse resp4 = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); + StringFullResponseHolder holder4 = new StringFullResponseHolder(resp4, StandardCharsets.UTF_8); + holder4.addChunk("{\"druid.key\": \"val\", \"druid.other\": \"other\"}"); + EasyMock.expect(httpClient.go(EasyMock.isA(Request.class), EasyMock.isA(StringFullResponseHandler.class))) + .andReturn(Futures.immediateFuture(holder4)).once(); + EasyMock.replay(druidNodeDiscoveryProvider, httpClient); + + final RexNode reversedEquality = rexBuilder.makeCall( + SqlStdOperatorTable.EQUALS, + rexBuilder.makeLiteral("localhost:8081"), + rexBuilder.makeInputRef(rowType.getFieldList().get(SERVER_INDEX).getType(), SERVER_INDEX) + ); + List rows = propertiesTable.scan(createDataContext(Users.SUPER), ImmutableList.of(reversedEquality), null).toList(); + Assert.assertEquals(2, rows.size()); + Assert.assertEquals("localhost:8081", rows.get(0)[0]); + EasyMock.verify(druidNodeDiscoveryProvider, httpClient); + } + + @Test + public void testPropertiesTable_projectionAndMultiRole() + { + SystemServerPropertiesTable propertiesTable = new SystemServerPropertiesTable( + druidNodeDiscoveryProvider, + authMapper, + httpClient, + MAPPER + ); + + // Same host:port under two roles + DiscoveryDruidNode coordinatorRole = new DiscoveryDruidNode( + new DruidNode("s1", "localhost", false, 8081, null, true, false), + NodeRole.COORDINATOR, + ImmutableMap.of(), + startTime + ); + DiscoveryDruidNode overlordRole = new DiscoveryDruidNode( + new DruidNode("s1", "localhost", false, 8081, null, true, false), + NodeRole.OVERLORD, + ImmutableMap.of(), + startTime + ); + + mockNodeDiscovery(NodeRole.BROKER); + mockNodeDiscovery(NodeRole.ROUTER); + mockNodeDiscovery(NodeRole.HISTORICAL); + mockNodeDiscovery(NodeRole.OVERLORD, overlordRole); + mockNodeDiscovery(NodeRole.PEON); + mockNodeDiscovery(NodeRole.INDEXER); + mockNodeDiscovery(NodeRole.MIDDLE_MANAGER); + mockNodeDiscovery(NodeRole.COORDINATOR, coordinatorRole); + + HttpResponse resp = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); + StringFullResponseHolder holder = new StringFullResponseHolder(resp, StandardCharsets.UTF_8); + holder.addChunk("{\"druid.port\": \"8081\"}"); + + EasyMock.expect( + httpClient.go(EasyMock.isA(Request.class), EasyMock.isA(StringFullResponseHandler.class)) + ).andReturn(Futures.immediateFuture(holder)).once(); + + EasyMock.replay(druidNodeDiscoveryProvider, httpClient); + + DataContext dataContext = createDataContext(Users.SUPER); + + // Multi-role: only 1 HTTP call, node_roles contains both + final List fullRows = propertiesTable.scan(dataContext, Collections.emptyList(), null).toList(); + Assert.assertEquals(1, fullRows.size()); + String nodeRoles = (String) fullRows.get(0)[2]; + Assert.assertTrue(nodeRoles.contains("coordinator")); + Assert.assertTrue(nodeRoles.contains("overlord")); + + // Projection: project only server (0) and property (3) + EasyMock.reset(druidNodeDiscoveryProvider, httpClient); + mockAllNodeRolesWithCoordinator(coordinator); + + HttpResponse resp2 = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); + StringFullResponseHolder holder2 = new StringFullResponseHolder(resp2, StandardCharsets.UTF_8); + holder2.addChunk("{\"druid.port\": \"8081\"}"); + EasyMock.expect( + httpClient.go(EasyMock.isA(Request.class), EasyMock.isA(StringFullResponseHandler.class)) + ).andReturn(Futures.immediateFuture(holder2)).once(); + EasyMock.replay(druidNodeDiscoveryProvider, httpClient); + + final int[] projects = new int[]{0, 3}; + final List projectedRows = propertiesTable.scan(dataContext, Collections.emptyList(), projects).toList(); + Assert.assertEquals(1, projectedRows.size()); + Assert.assertEquals(2, projectedRows.get(0).length); + Assert.assertEquals("localhost:8081", projectedRows.get(0)[0]); + Assert.assertEquals("druid.port", projectedRows.get(0)[1]); + + EasyMock.verify(druidNodeDiscoveryProvider, httpClient); + } + + @Test + public void testPropertiesTable_withInterruptedException() throws Exception + { + SystemServerPropertiesTable propertiesTable = new SystemServerPropertiesTable( + druidNodeDiscoveryProvider, + authMapper, + httpClient, + MAPPER + ); + + mockAllNodeRolesWithCoordinator(coordinator); + + @SuppressWarnings("unchecked") + ListenableFuture interruptingFuture = EasyMock.createMock(ListenableFuture.class); + EasyMock.expect(interruptingFuture.get()).andThrow(new InterruptedException("test interrupt")); + EasyMock.replay(interruptingFuture); + + EasyMock.expect( + httpClient.go(EasyMock.isA(Request.class), EasyMock.isA(StringFullResponseHandler.class)) + ).andReturn(interruptingFuture).once(); + + EasyMock.replay(druidNodeDiscoveryProvider, httpClient); + + DataContext dataContext = createDataContext(Users.SUPER); + RuntimeException ex = Assert.assertThrows( + RuntimeException.class, + () -> propertiesTable.scan(dataContext, Collections.emptyList(), null).toList() + ); + Assert.assertTrue(ex.getMessage().contains("Interrupted")); + Assert.assertTrue(Thread.currentThread().isInterrupted()); + Thread.interrupted(); + + EasyMock.verify(druidNodeDiscoveryProvider, httpClient); + } + + @Test + public void testPropertiesTable_exceptionWithNullMessage() + { + SystemServerPropertiesTable propertiesTable = new SystemServerPropertiesTable( + druidNodeDiscoveryProvider, + authMapper, + httpClient, + MAPPER + ); + + mockAllNodeRolesWithCoordinator(coordinator); + + // Exception with no message — error_message should fall back to class simple name + EasyMock.expect( + httpClient.go(EasyMock.isA(Request.class), EasyMock.isA(StringFullResponseHandler.class)) + ).andThrow(new RuntimeException((String) null)).once(); + + EasyMock.replay(druidNodeDiscoveryProvider, httpClient); + + DataContext dataContext = createDataContext(Users.SUPER); + final List rows = propertiesTable.scan(dataContext, Collections.emptyList(), null).toList(); + + Assert.assertEquals(1, rows.size()); + Assert.assertEquals("RuntimeException", rows.get(0)[5]); + + EasyMock.verify(druidNodeDiscoveryProvider, httpClient); + } + @Test public void testQueriesTable() { @@ -2085,6 +2405,18 @@ private DruidNodeDiscovery mockNodeDiscovery(NodeRole nodeRole, DiscoveryDruidNo return druidNodeDiscovery; } + private void mockAllNodeRolesWithCoordinator(DiscoveryDruidNode... coordinators) + { + mockNodeDiscovery(NodeRole.BROKER); + mockNodeDiscovery(NodeRole.ROUTER); + mockNodeDiscovery(NodeRole.HISTORICAL); + mockNodeDiscovery(NodeRole.OVERLORD); + mockNodeDiscovery(NodeRole.PEON); + mockNodeDiscovery(NodeRole.INDEXER); + mockNodeDiscovery(NodeRole.MIDDLE_MANAGER); + mockNodeDiscovery(NodeRole.COORDINATOR, coordinators); + } + /** * Usernames to be used in tests. */ From f6c1bcb587197be2e9ad37b028d4eba6429f3fb3 Mon Sep 17 00:00:00 2001 From: Abhishek Balaji Radhakrishnan Date: Wed, 13 May 2026 14:02:16 -0700 Subject: [PATCH 8/8] Stirct compilation fix --- .../apache/druid/sql/calcite/schema/SystemSchemaTest.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java index 4006c043eb45..498d02c3929a 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java @@ -27,6 +27,7 @@ import com.google.common.collect.ImmutableSet; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; import junitparams.converters.Nullable; import org.apache.calcite.DataContext; import org.apache.calcite.adapter.java.JavaTypeFactory; @@ -2038,10 +2039,7 @@ public void testPropertiesTable_withInterruptedException() throws Exception mockAllNodeRolesWithCoordinator(coordinator); - @SuppressWarnings("unchecked") - ListenableFuture interruptingFuture = EasyMock.createMock(ListenableFuture.class); - EasyMock.expect(interruptingFuture.get()).andThrow(new InterruptedException("test interrupt")); - EasyMock.replay(interruptingFuture); + SettableFuture interruptingFuture = SettableFuture.create(); EasyMock.expect( httpClient.go(EasyMock.isA(Request.class), EasyMock.isA(StringFullResponseHandler.class)) @@ -2050,6 +2048,7 @@ public void testPropertiesTable_withInterruptedException() throws Exception EasyMock.replay(druidNodeDiscoveryProvider, httpClient); DataContext dataContext = createDataContext(Users.SUPER); + Thread.currentThread().interrupt(); RuntimeException ex = Assert.assertThrows( RuntimeException.class, () -> propertiesTable.scan(dataContext, Collections.emptyList(), null).toList()