diff --git a/psc/src/main/java/com/pinterest/psc/config/PscMetadataClientToMemqConsumerConfigConverter.java b/psc/src/main/java/com/pinterest/psc/config/PscMetadataClientToMemqConsumerConfigConverter.java new file mode 100644 index 00000000..2c5ce98e --- /dev/null +++ b/psc/src/main/java/com/pinterest/psc/config/PscMetadataClientToMemqConsumerConfigConverter.java @@ -0,0 +1,26 @@ +package com.pinterest.psc.config; + +import com.pinterest.memq.client.commons.ConsumerConfigs; +import com.pinterest.psc.common.TopicUri; + +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +public class PscMetadataClientToMemqConsumerConfigConverter extends PscMetadataClientToBackendMetatadataClientConfigCoverter { + @Override + protected Map getConfigConverterMap() { + return new HashMap() { + private static final long serialVersionUID = 1L; + + { + put(PscConfiguration.PSC_METADATA_CLIENT_ID, ConsumerConfigs.CLIENT_ID); + } + }; + } + + @Override + public Properties convert(PscConfigurationInternal pscConfigurationInternal, TopicUri topicUri) { + return super.convert(pscConfigurationInternal, topicUri); + } +} diff --git a/psc/src/main/java/com/pinterest/psc/consumer/memq/MemqTopicUri.java b/psc/src/main/java/com/pinterest/psc/consumer/memq/MemqTopicUri.java index d25c1a65..3a1760a5 100644 --- a/psc/src/main/java/com/pinterest/psc/consumer/memq/MemqTopicUri.java +++ b/psc/src/main/java/com/pinterest/psc/consumer/memq/MemqTopicUri.java @@ -9,7 +9,7 @@ public class MemqTopicUri extends BaseTopicUri { public static final String PLAINTEXT_PROTOCOL = "plaintext"; public static final String SECURE_PROTOCOL = "secure"; - MemqTopicUri(TopicUri topicUri) { + public MemqTopicUri(TopicUri topicUri) { super(topicUri); } diff --git a/psc/src/main/java/com/pinterest/psc/metadata/client/memq/PscMemqMetadataClient.java b/psc/src/main/java/com/pinterest/psc/metadata/client/memq/PscMemqMetadataClient.java new file mode 100644 index 00000000..fd21c7b7 --- /dev/null +++ b/psc/src/main/java/com/pinterest/psc/metadata/client/memq/PscMemqMetadataClient.java @@ -0,0 +1,234 @@ +package com.pinterest.psc.metadata.client.memq; + +import com.pinterest.memq.client.commons.ConsumerConfigs; +import com.pinterest.memq.client.commons.serde.ByteArrayDeserializer; +import com.pinterest.memq.client.consumer.MemqConsumer; +import com.pinterest.psc.common.BaseTopicUri; +import com.pinterest.psc.common.TopicRn; +import com.pinterest.psc.common.TopicUri; +import com.pinterest.psc.common.TopicUriPartition; +import com.pinterest.psc.config.PscConfigurationInternal; +import com.pinterest.psc.config.PscMetadataClientToMemqConsumerConfigConverter; +import com.pinterest.psc.consumer.memq.MemqTopicUri; +import com.pinterest.psc.environment.Environment; +import com.pinterest.psc.exception.startup.ConfigurationException; +import com.pinterest.psc.logging.PscLogger; +import com.pinterest.psc.metadata.MetadataUtils; +import com.pinterest.psc.metadata.TopicUriMetadata; +import com.pinterest.psc.metadata.client.PscBackendMetadataClient; +import com.pinterest.psc.metadata.client.PscMetadataClient; + +import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; + +/** + * A Memq-specific implementation of the {@link PscBackendMetadataClient}. + * Uses a {@link MemqConsumer} to query metadata since Memq does not have a dedicated admin client. + */ +public class PscMemqMetadataClient extends PscBackendMetadataClient { + + private static final PscLogger logger = PscLogger.getLogger(PscMemqMetadataClient.class); + protected MemqConsumer memqConsumer; + + @Override + public void initialize( + TopicUri topicUri, + Environment env, + PscConfigurationInternal pscConfigurationInternal + ) throws ConfigurationException { + super.initialize(topicUri, env, pscConfigurationInternal); + Properties properties = new PscMetadataClientToMemqConsumerConfigConverter() + .convert(pscConfigurationInternal, topicUri); + properties.setProperty(ConsumerConfigs.BOOTSTRAP_SERVERS, discoveryConfig.getConnect()); + properties.setProperty(ConsumerConfigs.CLIENT_ID, + pscConfigurationInternal.getMetadataClientId()); + properties.setProperty(ConsumerConfigs.GROUP_ID, + pscConfigurationInternal.getMetadataClientId()); + properties.setProperty(ConsumerConfigs.KEY_DESERIALIZER_CLASS_KEY, + ByteArrayDeserializer.class.getName()); + properties.put(ConsumerConfigs.KEY_DESERIALIZER_CLASS_CONFIGS_KEY, new Properties()); + properties.setProperty(ConsumerConfigs.VALUE_DESERIALIZER_CLASS_KEY, + ByteArrayDeserializer.class.getName()); + properties.put(ConsumerConfigs.VALUE_DESERIALIZER_CLASS_CONFIGS_KEY, new Properties()); + properties.setProperty(ConsumerConfigs.DIRECT_CONSUMER, "false"); + try { + this.memqConsumer = new MemqConsumer<>(properties); + } catch (Exception e) { + throw new ConfigurationException("Failed to create Memq consumer for metadata client", e); + } + logger.info("Initialized PscMemqMetadataClient with properties: " + properties); + } + + @Override + public List listTopicRns(Duration duration) + throws ExecutionException, InterruptedException, TimeoutException { + throw new UnsupportedOperationException( + "[Memq] Listing all topics is not supported by the Memq backend."); + } + + @Override + public Map describeTopicUris( + Collection topicUris, + Duration duration + ) throws ExecutionException, InterruptedException, TimeoutException { + Map result = new HashMap<>(); + for (TopicUri tu : topicUris) { + subscribe(tu.getTopic()); + List partitions = memqConsumer.getPartition(); + List topicUriPartitions = new ArrayList<>(); + for (int partition : partitions) { + topicUriPartitions.add(createMemqTopicUriPartition(tu, partition)); + } + result.put(tu, new TopicUriMetadata(tu, topicUriPartitions)); + } + return result; + } + + @Override + public Map listOffsets( + Map topicUriPartitionsAndOptions, + Duration duration + ) throws ExecutionException, InterruptedException, TimeoutException { + Map> earliestByTopic = new HashMap<>(); + Map> latestByTopic = new HashMap<>(); + + for (Map.Entry entry : + topicUriPartitionsAndOptions.entrySet()) { + TopicUriPartition tup = entry.getKey(); + String topic = tup.getTopicUri().getTopic(); + + if (entry.getValue() == PscMetadataClient.MetadataClientOption.OFFSET_SPEC_EARLIEST) { + earliestByTopic.computeIfAbsent(topic, k -> new HashSet<>()).add(tup.getPartition()); + } else if (entry.getValue() == PscMetadataClient.MetadataClientOption.OFFSET_SPEC_LATEST) { + latestByTopic.computeIfAbsent(topic, k -> new HashSet<>()).add(tup.getPartition()); + } else { + throw new IllegalArgumentException( + "Unsupported MetadataClientOption for listOffsets(): " + entry.getValue()); + } + } + + Map result = new HashMap<>(); + Set allTopics = new HashSet<>(); + allTopics.addAll(earliestByTopic.keySet()); + allTopics.addAll(latestByTopic.keySet()); + + for (String topic : allTopics) { + subscribe(topic); + + Set earliestPartitions = earliestByTopic.getOrDefault(topic, new HashSet<>()); + if (!earliestPartitions.isEmpty()) { + Map offsets = memqConsumer.getEarliestOffsets(earliestPartitions); + for (Map.Entry e : offsets.entrySet()) { + TopicRn topicRn = MetadataUtils.createTopicRn(topicUri, topic); + result.put(createMemqTopicUriPartition(topicRn, e.getKey()), e.getValue()); + } + } + + Set latestPartitions = latestByTopic.getOrDefault(topic, new HashSet<>()); + if (!latestPartitions.isEmpty()) { + Map offsets = memqConsumer.getLatestOffsets(latestPartitions); + for (Map.Entry e : offsets.entrySet()) { + TopicRn topicRn = MetadataUtils.createTopicRn(topicUri, topic); + result.put(createMemqTopicUriPartition(topicRn, e.getKey()), e.getValue()); + } + } + } + return result; + } + + @Override + public Map listOffsetsForTimestamps( + Map topicUriPartitionsAndTimes, + Duration duration + ) throws ExecutionException, InterruptedException, TimeoutException { + Map> timestampsByTopic = new HashMap<>(); + + for (Map.Entry entry : topicUriPartitionsAndTimes.entrySet()) { + TopicUriPartition tup = entry.getKey(); + String topic = tup.getTopicUri().getTopic(); + timestampsByTopic.computeIfAbsent(topic, k -> new HashMap<>()) + .put(tup.getPartition(), entry.getValue()); + } + + Map result = new HashMap<>(); + for (Map.Entry> entry : timestampsByTopic.entrySet()) { + String topic = entry.getKey(); + subscribe(topic); + Map offsets = memqConsumer.offsetsOfTimestamps(entry.getValue()); + for (Map.Entry offsetEntry : offsets.entrySet()) { + TopicRn topicRn = MetadataUtils.createTopicRn(topicUri, topic); + result.put( + createMemqTopicUriPartition(topicRn, offsetEntry.getKey()), + offsetEntry.getValue() + ); + } + } + return result; + } + + @Override + public Map listOffsetsForConsumerGroup( + String consumerGroupId, + Collection topicUriPartitions, + Duration duration + ) throws ExecutionException, InterruptedException, TimeoutException { + Map> partitionsByTopic = new HashMap<>(); + for (TopicUriPartition tup : topicUriPartitions) { + String topic = tup.getTopicUri().getTopic(); + partitionsByTopic.computeIfAbsent(topic, k -> new HashSet<>()).add(tup.getPartition()); + } + + Map result = new HashMap<>(); + for (Map.Entry> entry : partitionsByTopic.entrySet()) { + String topic = entry.getKey(); + subscribe(topic); + for (int partition : entry.getValue()) { + long committedOffset = memqConsumer.committed(partition); + if (committedOffset == -1L) { + logger.warn( + "Consumer group {} has no committed offset for topic {} partition {}", + consumerGroupId, topic, partition + ); + continue; + } + TopicRn topicRn = MetadataUtils.createTopicRn(topicUri, topic); + result.put(createMemqTopicUriPartition(topicRn, partition), committedOffset); + } + } + return result; + } + + @Override + public void close() throws IOException { + if (memqConsumer != null) + memqConsumer.close(); + logger.info("Closed PscMemqMetadataClient"); + } + + private void subscribe(String topic) throws ExecutionException { + try { + memqConsumer.subscribe(topic); + } catch (Exception e) { + throw new ExecutionException("Failed to subscribe to Memq topic " + topic, e); + } + } + + private TopicUriPartition createMemqTopicUriPartition(TopicRn topicRn, int partition) { + return new TopicUriPartition( + new MemqTopicUri(new BaseTopicUri(topicUri.getProtocol(), topicRn)), partition); + } + + private TopicUriPartition createMemqTopicUriPartition(TopicUri topicUri, int partition) { + return new TopicUriPartition(new MemqTopicUri(topicUri), partition); + } +} diff --git a/psc/src/main/java/com/pinterest/psc/metadata/creation/PscMemqMetadataClientCreator.java b/psc/src/main/java/com/pinterest/psc/metadata/creation/PscMemqMetadataClientCreator.java new file mode 100644 index 00000000..d5645e1b --- /dev/null +++ b/psc/src/main/java/com/pinterest/psc/metadata/creation/PscMemqMetadataClientCreator.java @@ -0,0 +1,37 @@ +package com.pinterest.psc.metadata.creation; + +import com.pinterest.psc.common.PscUtils; +import com.pinterest.psc.common.TopicUri; +import com.pinterest.psc.config.PscConfigurationInternal; +import com.pinterest.psc.consumer.memq.MemqTopicUri; +import com.pinterest.psc.environment.Environment; +import com.pinterest.psc.exception.startup.ConfigurationException; +import com.pinterest.psc.exception.startup.TopicUriSyntaxException; +import com.pinterest.psc.logging.PscLogger; +import com.pinterest.psc.metadata.client.memq.PscMemqMetadataClient; + +/** + * A class that creates a {@link com.pinterest.psc.metadata.client.PscBackendMetadataClient} for Memq. + */ +@PscMetadataClientCreatorPlugin(backend = PscUtils.BACKEND_TYPE_MEMQ, priority = 1) +public class PscMemqMetadataClientCreator extends PscBackendMetadataClientCreator { + + private static final PscLogger logger = PscLogger.getLogger(PscMemqMetadataClientCreator.class); + + @Override + public PscMemqMetadataClient create(Environment env, PscConfigurationInternal pscConfigurationInternal, TopicUri clusterUri) throws ConfigurationException { + logger.info("Creating Memq metadata client for clusterUri: " + clusterUri); + PscMemqMetadataClient pscMemqMetadataClient = new PscMemqMetadataClient(); + pscMemqMetadataClient.initialize( + clusterUri, + env, + pscConfigurationInternal + ); + return pscMemqMetadataClient; + } + + @Override + public TopicUri validateBackendTopicUri(TopicUri topicUri) throws TopicUriSyntaxException { + return MemqTopicUri.validate(topicUri); + } +}