From df4f0e3fd53f51bd69f58e81e95a3b3348c7a7d4 Mon Sep 17 00:00:00 2001 From: artem Date: Tue, 10 Mar 2026 11:51:09 -0700 Subject: [PATCH 1/4] Add Memq Metadata Client --- ...taClientToMemqConsumerConfigConverter.java | 26 ++ .../psc/consumer/memq/MemqTopicUri.java | 2 +- .../client/memq/PscMemqMetadataClient.java | 267 ++++++++++++++++++ .../PscMemqMetadataClientCreator.java | 37 +++ 4 files changed, 331 insertions(+), 1 deletion(-) create mode 100644 psc/src/main/java/com/pinterest/psc/config/PscMetadataClientToMemqConsumerConfigConverter.java create mode 100644 psc/src/main/java/com/pinterest/psc/metadata/client/memq/PscMemqMetadataClient.java create mode 100644 psc/src/main/java/com/pinterest/psc/metadata/creation/PscMemqMetadataClientCreator.java diff --git a/psc/src/main/java/com/pinterest/psc/config/PscMetadataClientToMemqConsumerConfigConverter.java b/psc/src/main/java/com/pinterest/psc/config/PscMetadataClientToMemqConsumerConfigConverter.java new file mode 100644 index 00000000..2c5ce98e --- /dev/null +++ b/psc/src/main/java/com/pinterest/psc/config/PscMetadataClientToMemqConsumerConfigConverter.java @@ -0,0 +1,26 @@ +package com.pinterest.psc.config; + +import com.pinterest.memq.client.commons.ConsumerConfigs; +import com.pinterest.psc.common.TopicUri; + +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +public class PscMetadataClientToMemqConsumerConfigConverter extends PscMetadataClientToBackendMetatadataClientConfigCoverter { + @Override + protected Map getConfigConverterMap() { + return new HashMap() { + private static final long serialVersionUID = 1L; + + { + put(PscConfiguration.PSC_METADATA_CLIENT_ID, ConsumerConfigs.CLIENT_ID); + } + }; + } + + @Override + public Properties convert(PscConfigurationInternal pscConfigurationInternal, TopicUri topicUri) { + return super.convert(pscConfigurationInternal, topicUri); + } +} diff --git a/psc/src/main/java/com/pinterest/psc/consumer/memq/MemqTopicUri.java b/psc/src/main/java/com/pinterest/psc/consumer/memq/MemqTopicUri.java index d25c1a65..3a1760a5 100644 --- a/psc/src/main/java/com/pinterest/psc/consumer/memq/MemqTopicUri.java +++ b/psc/src/main/java/com/pinterest/psc/consumer/memq/MemqTopicUri.java @@ -9,7 +9,7 @@ public class MemqTopicUri extends BaseTopicUri { public static final String PLAINTEXT_PROTOCOL = "plaintext"; public static final String SECURE_PROTOCOL = "secure"; - MemqTopicUri(TopicUri topicUri) { + public MemqTopicUri(TopicUri topicUri) { super(topicUri); } diff --git a/psc/src/main/java/com/pinterest/psc/metadata/client/memq/PscMemqMetadataClient.java b/psc/src/main/java/com/pinterest/psc/metadata/client/memq/PscMemqMetadataClient.java new file mode 100644 index 00000000..68be991f --- /dev/null +++ b/psc/src/main/java/com/pinterest/psc/metadata/client/memq/PscMemqMetadataClient.java @@ -0,0 +1,267 @@ +package com.pinterest.psc.metadata.client.memq; + +import com.pinterest.memq.client.commons.ConsumerConfigs; +import com.pinterest.memq.client.commons.serde.ByteArrayDeserializer; +import com.pinterest.memq.client.consumer.MemqConsumer; +import com.pinterest.psc.common.BaseTopicUri; +import com.pinterest.psc.common.TopicRn; +import com.pinterest.psc.common.TopicUri; +import com.pinterest.psc.common.TopicUriPartition; +import com.pinterest.psc.config.PscConfigurationInternal; +import com.pinterest.psc.config.PscMetadataClientToMemqConsumerConfigConverter; +import com.pinterest.psc.consumer.memq.MemqTopicUri; +import com.pinterest.psc.environment.Environment; +import com.pinterest.psc.exception.startup.ConfigurationException; +import com.pinterest.psc.logging.PscLogger; +import com.pinterest.psc.metadata.MetadataUtils; +import com.pinterest.psc.metadata.TopicUriMetadata; +import com.pinterest.psc.metadata.client.PscBackendMetadataClient; +import com.pinterest.psc.metadata.client.PscMetadataClient; + +import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeoutException; + +/** + * A Memq-specific implementation of the {@link PscBackendMetadataClient}. + * Uses short-lived {@link MemqConsumer} instances to query metadata since Memq + * does not have a dedicated admin client. + */ +public class PscMemqMetadataClient extends PscBackendMetadataClient { + + private static final PscLogger logger = PscLogger.getLogger(PscMemqMetadataClient.class); + private Properties baseProperties; + + @Override + public void initialize( + TopicUri topicUri, + Environment env, + PscConfigurationInternal pscConfigurationInternal + ) throws ConfigurationException { + super.initialize(topicUri, env, pscConfigurationInternal); + baseProperties = new PscMetadataClientToMemqConsumerConfigConverter() + .convert(pscConfigurationInternal, topicUri); + baseProperties.setProperty(ConsumerConfigs.BOOTSTRAP_SERVERS, discoveryConfig.getConnect()); + baseProperties.setProperty(ConsumerConfigs.CLIENT_ID, pscConfigurationInternal.getMetadataClientId()); + baseProperties.setProperty(ConsumerConfigs.KEY_DESERIALIZER_CLASS_KEY, + ByteArrayDeserializer.class.getName()); + baseProperties.put(ConsumerConfigs.KEY_DESERIALIZER_CLASS_CONFIGS_KEY, new Properties()); + baseProperties.setProperty(ConsumerConfigs.VALUE_DESERIALIZER_CLASS_KEY, + ByteArrayDeserializer.class.getName()); + baseProperties.put(ConsumerConfigs.VALUE_DESERIALIZER_CLASS_CONFIGS_KEY, new Properties()); + baseProperties.setProperty(ConsumerConfigs.DIRECT_CONSUMER, "false"); + logger.info("Initialized PscMemqMetadataClient with base properties: " + baseProperties); + } + + @Override + public List listTopicRns(Duration duration) + throws ExecutionException, InterruptedException, TimeoutException { + throw new UnsupportedOperationException( + "[Memq] Listing all topics is not supported by the Memq backend."); + } + + @Override + public Map describeTopicUris( + Collection topicUris, + Duration duration + ) throws ExecutionException, InterruptedException, TimeoutException { + Map result = new HashMap<>(); + for (TopicUri tu : topicUris) { + try (MemqConsumer consumer = createConsumer(tu.getTopic())) { + List partitions = consumer.getPartition(); + List topicUriPartitions = new ArrayList<>(); + for (int partition : partitions) { + topicUriPartitions.add(createMemqTopicUriPartition(tu, partition)); + } + result.put(tu, new TopicUriMetadata(tu, topicUriPartitions)); + } catch (IOException e) { + throw new ExecutionException("Failed to close Memq metadata consumer for " + tu, e); + } catch (Exception e) { + throw new ExecutionException("Failed to describe topic " + tu, e); + } + } + return result; + } + + @Override + public Map listOffsets( + Map topicUriPartitionsAndOptions, + Duration duration + ) throws ExecutionException, InterruptedException, TimeoutException { + Map> earliestByTopic = new HashMap<>(); + Map> latestByTopic = new HashMap<>(); + Map topicToUri = new HashMap<>(); + + for (Map.Entry entry : + topicUriPartitionsAndOptions.entrySet()) { + TopicUriPartition tup = entry.getKey(); + String topic = tup.getTopicUri().getTopic(); + topicToUri.put(topic, tup.getTopicUri()); + + if (entry.getValue() == PscMetadataClient.MetadataClientOption.OFFSET_SPEC_EARLIEST) { + earliestByTopic.computeIfAbsent(topic, k -> new HashSet<>()).add(tup.getPartition()); + } else if (entry.getValue() == PscMetadataClient.MetadataClientOption.OFFSET_SPEC_LATEST) { + latestByTopic.computeIfAbsent(topic, k -> new HashSet<>()).add(tup.getPartition()); + } else { + throw new IllegalArgumentException( + "Unsupported MetadataClientOption for listOffsets(): " + entry.getValue()); + } + } + + Map result = new HashMap<>(); + Set allTopics = new HashSet<>(); + allTopics.addAll(earliestByTopic.keySet()); + allTopics.addAll(latestByTopic.keySet()); + + for (String topic : allTopics) { + try (MemqConsumer consumer = createConsumer(topic)) { + Set earliestPartitions = earliestByTopic.getOrDefault(topic, new HashSet<>()); + if (!earliestPartitions.isEmpty()) { + Map offsets = consumer.getEarliestOffsets(earliestPartitions); + for (Map.Entry e : offsets.entrySet()) { + TopicRn topicRn = MetadataUtils.createTopicRn(topicUri, topic); + result.put(createMemqTopicUriPartition(topicRn, e.getKey()), e.getValue()); + } + } + + Set latestPartitions = latestByTopic.getOrDefault(topic, new HashSet<>()); + if (!latestPartitions.isEmpty()) { + Map offsets = consumer.getLatestOffsets(latestPartitions); + for (Map.Entry e : offsets.entrySet()) { + TopicRn topicRn = MetadataUtils.createTopicRn(topicUri, topic); + result.put(createMemqTopicUriPartition(topicRn, e.getKey()), e.getValue()); + } + } + } catch (IOException e) { + throw new ExecutionException("Failed to close Memq metadata consumer for topic " + topic, e); + } catch (Exception e) { + throw new ExecutionException("Failed to list offsets for topic " + topic, e); + } + } + return result; + } + + @Override + public Map listOffsetsForTimestamps( + Map topicUriPartitionsAndTimes, + Duration duration + ) throws ExecutionException, InterruptedException, TimeoutException { + Map> timestampsByTopic = new HashMap<>(); + Map> partitionLookupByTopic = new HashMap<>(); + + for (Map.Entry entry : topicUriPartitionsAndTimes.entrySet()) { + TopicUriPartition tup = entry.getKey(); + String topic = tup.getTopicUri().getTopic(); + timestampsByTopic.computeIfAbsent(topic, k -> new HashMap<>()) + .put(tup.getPartition(), entry.getValue()); + partitionLookupByTopic.computeIfAbsent(topic, k -> new HashMap<>()) + .put(tup.getPartition(), tup); + } + + Map result = new HashMap<>(); + for (Map.Entry> entry : timestampsByTopic.entrySet()) { + String topic = entry.getKey(); + try (MemqConsumer consumer = createConsumer(topic)) { + Map offsets = consumer.offsetsOfTimestamps(entry.getValue()); + Map partitionLookup = partitionLookupByTopic.get(topic); + for (Map.Entry offsetEntry : offsets.entrySet()) { + TopicRn topicRn = MetadataUtils.createTopicRn(topicUri, topic); + result.put( + createMemqTopicUriPartition(topicRn, offsetEntry.getKey()), + offsetEntry.getValue() + ); + } + } catch (IOException e) { + throw new ExecutionException( + "Failed to close Memq metadata consumer for topic " + topic, e); + } catch (Exception e) { + throw new ExecutionException( + "Failed to list offsets for timestamps for topic " + topic, e); + } + } + return result; + } + + @Override + public Map listOffsetsForConsumerGroup( + String consumerGroupId, + Collection topicUriPartitions, + Duration duration + ) throws ExecutionException, InterruptedException, TimeoutException { + Map> partitionsByTopic = new HashMap<>(); + for (TopicUriPartition tup : topicUriPartitions) { + String topic = tup.getTopicUri().getTopic(); + partitionsByTopic.computeIfAbsent(topic, k -> new HashSet<>()).add(tup.getPartition()); + } + + Map result = new HashMap<>(); + for (Map.Entry> entry : partitionsByTopic.entrySet()) { + String topic = entry.getKey(); + try (MemqConsumer consumer = createConsumer(topic, consumerGroupId)) { + for (int partition : entry.getValue()) { + long committedOffset = consumer.committed(partition); + if (committedOffset == -1L) { + logger.warn( + "Consumer group {} has no committed offset for topic {} partition {}", + consumerGroupId, topic, partition + ); + continue; + } + TopicRn topicRn = MetadataUtils.createTopicRn(topicUri, topic); + result.put(createMemqTopicUriPartition(topicRn, partition), committedOffset); + } + } catch (IOException e) { + throw new ExecutionException( + "Failed to close Memq metadata consumer for topic " + topic, e); + } catch (Exception e) { + throw new ExecutionException( + "Failed to list consumer group offsets for topic " + topic, e); + } + } + return result; + } + + @Override + public void close() throws IOException { + logger.info("Closed PscMemqMetadataClient"); + } + + private MemqConsumer createConsumer(String topic) throws Exception { + return createConsumer(topic, null); + } + + private MemqConsumer createConsumer(String topic, String groupId) throws Exception { + Properties props = new Properties(); + props.putAll(baseProperties); + props.setProperty(ConsumerConfigs.CLIENT_ID, + baseProperties.getProperty(ConsumerConfigs.CLIENT_ID) + "_metadata"); + if (groupId != null) { + props.setProperty(ConsumerConfigs.GROUP_ID, groupId); + } else { + props.setProperty(ConsumerConfigs.GROUP_ID, + topic + "_metadata_cg_" + ThreadLocalRandom.current().nextInt()); + } + MemqConsumer consumer = new MemqConsumer<>(props); + consumer.subscribe(topic); + return consumer; + } + + private TopicUriPartition createMemqTopicUriPartition(TopicRn topicRn, int partition) { + return new TopicUriPartition( + new MemqTopicUri(new BaseTopicUri(topicUri.getProtocol(), topicRn)), partition); + } + + private TopicUriPartition createMemqTopicUriPartition(TopicUri topicUri, int partition) { + return new TopicUriPartition(new MemqTopicUri(topicUri), partition); + } +} diff --git a/psc/src/main/java/com/pinterest/psc/metadata/creation/PscMemqMetadataClientCreator.java b/psc/src/main/java/com/pinterest/psc/metadata/creation/PscMemqMetadataClientCreator.java new file mode 100644 index 00000000..d5645e1b --- /dev/null +++ b/psc/src/main/java/com/pinterest/psc/metadata/creation/PscMemqMetadataClientCreator.java @@ -0,0 +1,37 @@ +package com.pinterest.psc.metadata.creation; + +import com.pinterest.psc.common.PscUtils; +import com.pinterest.psc.common.TopicUri; +import com.pinterest.psc.config.PscConfigurationInternal; +import com.pinterest.psc.consumer.memq.MemqTopicUri; +import com.pinterest.psc.environment.Environment; +import com.pinterest.psc.exception.startup.ConfigurationException; +import com.pinterest.psc.exception.startup.TopicUriSyntaxException; +import com.pinterest.psc.logging.PscLogger; +import com.pinterest.psc.metadata.client.memq.PscMemqMetadataClient; + +/** + * A class that creates a {@link com.pinterest.psc.metadata.client.PscBackendMetadataClient} for Memq. + */ +@PscMetadataClientCreatorPlugin(backend = PscUtils.BACKEND_TYPE_MEMQ, priority = 1) +public class PscMemqMetadataClientCreator extends PscBackendMetadataClientCreator { + + private static final PscLogger logger = PscLogger.getLogger(PscMemqMetadataClientCreator.class); + + @Override + public PscMemqMetadataClient create(Environment env, PscConfigurationInternal pscConfigurationInternal, TopicUri clusterUri) throws ConfigurationException { + logger.info("Creating Memq metadata client for clusterUri: " + clusterUri); + PscMemqMetadataClient pscMemqMetadataClient = new PscMemqMetadataClient(); + pscMemqMetadataClient.initialize( + clusterUri, + env, + pscConfigurationInternal + ); + return pscMemqMetadataClient; + } + + @Override + public TopicUri validateBackendTopicUri(TopicUri topicUri) throws TopicUriSyntaxException { + return MemqTopicUri.validate(topicUri); + } +} From 1aeedcfbe7c398aa048f929de43d50089069454f Mon Sep 17 00:00:00 2001 From: artem Date: Tue, 10 Mar 2026 12:53:50 -0700 Subject: [PATCH 2/4] Use one memq consumer --- .../client/memq/PscMemqMetadataClient.java | 166 +++++++----------- 1 file changed, 67 insertions(+), 99 deletions(-) diff --git a/psc/src/main/java/com/pinterest/psc/metadata/client/memq/PscMemqMetadataClient.java b/psc/src/main/java/com/pinterest/psc/metadata/client/memq/PscMemqMetadataClient.java index 68be991f..3d6580c8 100644 --- a/psc/src/main/java/com/pinterest/psc/metadata/client/memq/PscMemqMetadataClient.java +++ b/psc/src/main/java/com/pinterest/psc/metadata/client/memq/PscMemqMetadataClient.java @@ -28,19 +28,18 @@ import java.util.Map; import java.util.Properties; import java.util.Set; +import java.util.UUID; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeoutException; /** * A Memq-specific implementation of the {@link PscBackendMetadataClient}. - * Uses short-lived {@link MemqConsumer} instances to query metadata since Memq - * does not have a dedicated admin client. + * Uses a {@link MemqConsumer} to query metadata since Memq does not have a dedicated admin client. */ public class PscMemqMetadataClient extends PscBackendMetadataClient { private static final PscLogger logger = PscLogger.getLogger(PscMemqMetadataClient.class); - private Properties baseProperties; + private MemqConsumer memqConsumer; @Override public void initialize( @@ -49,18 +48,26 @@ public void initialize( PscConfigurationInternal pscConfigurationInternal ) throws ConfigurationException { super.initialize(topicUri, env, pscConfigurationInternal); - baseProperties = new PscMetadataClientToMemqConsumerConfigConverter() + Properties properties = new PscMetadataClientToMemqConsumerConfigConverter() .convert(pscConfigurationInternal, topicUri); - baseProperties.setProperty(ConsumerConfigs.BOOTSTRAP_SERVERS, discoveryConfig.getConnect()); - baseProperties.setProperty(ConsumerConfigs.CLIENT_ID, pscConfigurationInternal.getMetadataClientId()); - baseProperties.setProperty(ConsumerConfigs.KEY_DESERIALIZER_CLASS_KEY, + properties.setProperty(ConsumerConfigs.BOOTSTRAP_SERVERS, discoveryConfig.getConnect()); + properties.setProperty(ConsumerConfigs.CLIENT_ID, + pscConfigurationInternal.getMetadataClientId() + "_metadata"); + properties.setProperty(ConsumerConfigs.GROUP_ID, + "psc-metadata-client-" + UUID.randomUUID()); + properties.setProperty(ConsumerConfigs.KEY_DESERIALIZER_CLASS_KEY, ByteArrayDeserializer.class.getName()); - baseProperties.put(ConsumerConfigs.KEY_DESERIALIZER_CLASS_CONFIGS_KEY, new Properties()); - baseProperties.setProperty(ConsumerConfigs.VALUE_DESERIALIZER_CLASS_KEY, + properties.put(ConsumerConfigs.KEY_DESERIALIZER_CLASS_CONFIGS_KEY, new Properties()); + properties.setProperty(ConsumerConfigs.VALUE_DESERIALIZER_CLASS_KEY, ByteArrayDeserializer.class.getName()); - baseProperties.put(ConsumerConfigs.VALUE_DESERIALIZER_CLASS_CONFIGS_KEY, new Properties()); - baseProperties.setProperty(ConsumerConfigs.DIRECT_CONSUMER, "false"); - logger.info("Initialized PscMemqMetadataClient with base properties: " + baseProperties); + properties.put(ConsumerConfigs.VALUE_DESERIALIZER_CLASS_CONFIGS_KEY, new Properties()); + properties.setProperty(ConsumerConfigs.DIRECT_CONSUMER, "false"); + try { + this.memqConsumer = new MemqConsumer<>(properties); + } catch (Exception e) { + throw new ConfigurationException("Failed to create Memq consumer for metadata client", e); + } + logger.info("Initialized PscMemqMetadataClient with properties: " + properties); } @Override @@ -77,18 +84,13 @@ public Map describeTopicUris( ) throws ExecutionException, InterruptedException, TimeoutException { Map result = new HashMap<>(); for (TopicUri tu : topicUris) { - try (MemqConsumer consumer = createConsumer(tu.getTopic())) { - List partitions = consumer.getPartition(); - List topicUriPartitions = new ArrayList<>(); - for (int partition : partitions) { - topicUriPartitions.add(createMemqTopicUriPartition(tu, partition)); - } - result.put(tu, new TopicUriMetadata(tu, topicUriPartitions)); - } catch (IOException e) { - throw new ExecutionException("Failed to close Memq metadata consumer for " + tu, e); - } catch (Exception e) { - throw new ExecutionException("Failed to describe topic " + tu, e); + subscribe(tu.getTopic()); + List partitions = memqConsumer.getPartition(); + List topicUriPartitions = new ArrayList<>(); + for (int partition : partitions) { + topicUriPartitions.add(createMemqTopicUriPartition(tu, partition)); } + result.put(tu, new TopicUriMetadata(tu, topicUriPartitions)); } return result; } @@ -100,13 +102,11 @@ public Map listOffsets( ) throws ExecutionException, InterruptedException, TimeoutException { Map> earliestByTopic = new HashMap<>(); Map> latestByTopic = new HashMap<>(); - Map topicToUri = new HashMap<>(); for (Map.Entry entry : topicUriPartitionsAndOptions.entrySet()) { TopicUriPartition tup = entry.getKey(); String topic = tup.getTopicUri().getTopic(); - topicToUri.put(topic, tup.getTopicUri()); if (entry.getValue() == PscMetadataClient.MetadataClientOption.OFFSET_SPEC_EARLIEST) { earliestByTopic.computeIfAbsent(topic, k -> new HashSet<>()).add(tup.getPartition()); @@ -124,28 +124,24 @@ public Map listOffsets( allTopics.addAll(latestByTopic.keySet()); for (String topic : allTopics) { - try (MemqConsumer consumer = createConsumer(topic)) { - Set earliestPartitions = earliestByTopic.getOrDefault(topic, new HashSet<>()); - if (!earliestPartitions.isEmpty()) { - Map offsets = consumer.getEarliestOffsets(earliestPartitions); - for (Map.Entry e : offsets.entrySet()) { - TopicRn topicRn = MetadataUtils.createTopicRn(topicUri, topic); - result.put(createMemqTopicUriPartition(topicRn, e.getKey()), e.getValue()); - } + subscribe(topic); + + Set earliestPartitions = earliestByTopic.getOrDefault(topic, new HashSet<>()); + if (!earliestPartitions.isEmpty()) { + Map offsets = memqConsumer.getEarliestOffsets(earliestPartitions); + for (Map.Entry e : offsets.entrySet()) { + TopicRn topicRn = MetadataUtils.createTopicRn(topicUri, topic); + result.put(createMemqTopicUriPartition(topicRn, e.getKey()), e.getValue()); } + } - Set latestPartitions = latestByTopic.getOrDefault(topic, new HashSet<>()); - if (!latestPartitions.isEmpty()) { - Map offsets = consumer.getLatestOffsets(latestPartitions); - for (Map.Entry e : offsets.entrySet()) { - TopicRn topicRn = MetadataUtils.createTopicRn(topicUri, topic); - result.put(createMemqTopicUriPartition(topicRn, e.getKey()), e.getValue()); - } + Set latestPartitions = latestByTopic.getOrDefault(topic, new HashSet<>()); + if (!latestPartitions.isEmpty()) { + Map offsets = memqConsumer.getLatestOffsets(latestPartitions); + for (Map.Entry e : offsets.entrySet()) { + TopicRn topicRn = MetadataUtils.createTopicRn(topicUri, topic); + result.put(createMemqTopicUriPartition(topicRn, e.getKey()), e.getValue()); } - } catch (IOException e) { - throw new ExecutionException("Failed to close Memq metadata consumer for topic " + topic, e); - } catch (Exception e) { - throw new ExecutionException("Failed to list offsets for topic " + topic, e); } } return result; @@ -157,36 +153,25 @@ public Map listOffsetsForTimestamps( Duration duration ) throws ExecutionException, InterruptedException, TimeoutException { Map> timestampsByTopic = new HashMap<>(); - Map> partitionLookupByTopic = new HashMap<>(); for (Map.Entry entry : topicUriPartitionsAndTimes.entrySet()) { TopicUriPartition tup = entry.getKey(); String topic = tup.getTopicUri().getTopic(); timestampsByTopic.computeIfAbsent(topic, k -> new HashMap<>()) .put(tup.getPartition(), entry.getValue()); - partitionLookupByTopic.computeIfAbsent(topic, k -> new HashMap<>()) - .put(tup.getPartition(), tup); } Map result = new HashMap<>(); for (Map.Entry> entry : timestampsByTopic.entrySet()) { String topic = entry.getKey(); - try (MemqConsumer consumer = createConsumer(topic)) { - Map offsets = consumer.offsetsOfTimestamps(entry.getValue()); - Map partitionLookup = partitionLookupByTopic.get(topic); - for (Map.Entry offsetEntry : offsets.entrySet()) { - TopicRn topicRn = MetadataUtils.createTopicRn(topicUri, topic); - result.put( - createMemqTopicUriPartition(topicRn, offsetEntry.getKey()), - offsetEntry.getValue() - ); - } - } catch (IOException e) { - throw new ExecutionException( - "Failed to close Memq metadata consumer for topic " + topic, e); - } catch (Exception e) { - throw new ExecutionException( - "Failed to list offsets for timestamps for topic " + topic, e); + subscribe(topic); + Map offsets = memqConsumer.offsetsOfTimestamps(entry.getValue()); + for (Map.Entry offsetEntry : offsets.entrySet()) { + TopicRn topicRn = MetadataUtils.createTopicRn(topicUri, topic); + result.put( + createMemqTopicUriPartition(topicRn, offsetEntry.getKey()), + offsetEntry.getValue() + ); } } return result; @@ -207,25 +192,18 @@ public Map listOffsetsForConsumerGroup( Map result = new HashMap<>(); for (Map.Entry> entry : partitionsByTopic.entrySet()) { String topic = entry.getKey(); - try (MemqConsumer consumer = createConsumer(topic, consumerGroupId)) { - for (int partition : entry.getValue()) { - long committedOffset = consumer.committed(partition); - if (committedOffset == -1L) { - logger.warn( - "Consumer group {} has no committed offset for topic {} partition {}", - consumerGroupId, topic, partition - ); - continue; - } - TopicRn topicRn = MetadataUtils.createTopicRn(topicUri, topic); - result.put(createMemqTopicUriPartition(topicRn, partition), committedOffset); + subscribe(topic); + for (int partition : entry.getValue()) { + long committedOffset = memqConsumer.committed(partition); + if (committedOffset == -1L) { + logger.warn( + "Consumer group {} has no committed offset for topic {} partition {}", + consumerGroupId, topic, partition + ); + continue; } - } catch (IOException e) { - throw new ExecutionException( - "Failed to close Memq metadata consumer for topic " + topic, e); - } catch (Exception e) { - throw new ExecutionException( - "Failed to list consumer group offsets for topic " + topic, e); + TopicRn topicRn = MetadataUtils.createTopicRn(topicUri, topic); + result.put(createMemqTopicUriPartition(topicRn, partition), committedOffset); } } return result; @@ -233,27 +211,17 @@ public Map listOffsetsForConsumerGroup( @Override public void close() throws IOException { + if (memqConsumer != null) + memqConsumer.close(); logger.info("Closed PscMemqMetadataClient"); } - private MemqConsumer createConsumer(String topic) throws Exception { - return createConsumer(topic, null); - } - - private MemqConsumer createConsumer(String topic, String groupId) throws Exception { - Properties props = new Properties(); - props.putAll(baseProperties); - props.setProperty(ConsumerConfigs.CLIENT_ID, - baseProperties.getProperty(ConsumerConfigs.CLIENT_ID) + "_metadata"); - if (groupId != null) { - props.setProperty(ConsumerConfigs.GROUP_ID, groupId); - } else { - props.setProperty(ConsumerConfigs.GROUP_ID, - topic + "_metadata_cg_" + ThreadLocalRandom.current().nextInt()); + private void subscribe(String topic) throws ExecutionException { + try { + memqConsumer.subscribe(topic); + } catch (Exception e) { + throw new ExecutionException("Failed to subscribe to Memq topic " + topic, e); } - MemqConsumer consumer = new MemqConsumer<>(props); - consumer.subscribe(topic); - return consumer; } private TopicUriPartition createMemqTopicUriPartition(TopicRn topicRn, int partition) { From 06e639dffa3fcc502339f5cd57cc0df48ce26f9d Mon Sep 17 00:00:00 2001 From: artem Date: Tue, 10 Mar 2026 13:29:19 -0700 Subject: [PATCH 3/4] Fix group_id --- .../psc/metadata/client/memq/PscMemqMetadataClient.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/psc/src/main/java/com/pinterest/psc/metadata/client/memq/PscMemqMetadataClient.java b/psc/src/main/java/com/pinterest/psc/metadata/client/memq/PscMemqMetadataClient.java index 3d6580c8..f1a3d5a0 100644 --- a/psc/src/main/java/com/pinterest/psc/metadata/client/memq/PscMemqMetadataClient.java +++ b/psc/src/main/java/com/pinterest/psc/metadata/client/memq/PscMemqMetadataClient.java @@ -28,7 +28,6 @@ import java.util.Map; import java.util.Properties; import java.util.Set; -import java.util.UUID; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; @@ -52,9 +51,9 @@ public void initialize( .convert(pscConfigurationInternal, topicUri); properties.setProperty(ConsumerConfigs.BOOTSTRAP_SERVERS, discoveryConfig.getConnect()); properties.setProperty(ConsumerConfigs.CLIENT_ID, - pscConfigurationInternal.getMetadataClientId() + "_metadata"); + pscConfigurationInternal.getMetadataClientId()); properties.setProperty(ConsumerConfigs.GROUP_ID, - "psc-metadata-client-" + UUID.randomUUID()); + pscConfigurationInternal.getMetadataClientId()); properties.setProperty(ConsumerConfigs.KEY_DESERIALIZER_CLASS_KEY, ByteArrayDeserializer.class.getName()); properties.put(ConsumerConfigs.KEY_DESERIALIZER_CLASS_CONFIGS_KEY, new Properties()); From adc03229d8f780d7ab74d02b9d14a4bec248c51e Mon Sep 17 00:00:00 2001 From: artem Date: Thu, 12 Mar 2026 12:47:09 -0700 Subject: [PATCH 4/4] make memqConsumer protected --- .../psc/metadata/client/memq/PscMemqMetadataClient.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/psc/src/main/java/com/pinterest/psc/metadata/client/memq/PscMemqMetadataClient.java b/psc/src/main/java/com/pinterest/psc/metadata/client/memq/PscMemqMetadataClient.java index f1a3d5a0..fd21c7b7 100644 --- a/psc/src/main/java/com/pinterest/psc/metadata/client/memq/PscMemqMetadataClient.java +++ b/psc/src/main/java/com/pinterest/psc/metadata/client/memq/PscMemqMetadataClient.java @@ -38,7 +38,7 @@ public class PscMemqMetadataClient extends PscBackendMetadataClient { private static final PscLogger logger = PscLogger.getLogger(PscMemqMetadataClient.class); - private MemqConsumer memqConsumer; + protected MemqConsumer memqConsumer; @Override public void initialize(