Open
Conversation
jeffxiang
requested changes
Mar 10, 2026
psc/src/main/java/com/pinterest/psc/metadata/client/memq/PscMemqMetadataClient.java
Outdated
Show resolved
Hide resolved
psc/src/main/java/com/pinterest/psc/metadata/client/memq/PscMemqMetadataClient.java
Outdated
Show resolved
Hide resolved
psc/src/main/java/com/pinterest/psc/metadata/client/memq/PscMemqMetadataClient.java
Outdated
Show resolved
Hide resolved
| public List<TopicRn> listTopicRns(Duration duration) | ||
| throws ExecutionException, InterruptedException, TimeoutException { | ||
| throw new UnsupportedOperationException( | ||
| "[Memq] Listing all topics is not supported by the Memq backend."); |
Contributor
There was a problem hiding this comment.
this is needed - we'll update MemQ consumer
jeffxiang
requested changes
Mar 10, 2026
psc/src/main/java/com/pinterest/psc/metadata/client/memq/PscMemqMetadataClient.java
Show resolved
Hide resolved
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Add Memq Metadata Client
Here's a summary of all the changes:
Files created
psc/src/main/java/com/pinterest/psc/metadata/client/memq/PscMemqMetadataClient.java - The main Memq metadata client implementation extending PscBackendMetadataClient. Since Memq doesn't have a dedicated admin client (unlike Kafka's AdminClient), it uses short-lived MemqConsumer instances for all metadata queries. The implementation:
listTopicRns() - throws UnsupportedOperationException since Memq doesn't support listing all topics
describeTopicUris() - creates a consumer per topic, subscribes, and calls getPartition() to discover partitions
listOffsets() - groups partitions by topic, uses getEarliestOffsets() / getLatestOffsets() based on the requested option
listOffsetsForTimestamps() - groups by topic and uses offsetsOfTimestamps()
listOffsetsForConsumerGroup() - creates a consumer with the target consumer group ID and uses committed() per partition
psc/src/main/java/com/pinterest/psc/metadata/creation/PscMemqMetadataClientCreator.java - The plugin creator class annotated with @PscMetadataClientCreatorPlugin(backend = "memq", priority = 1), following the exact same pattern as PscKafkaMetadataClientCreator. It creates and initializes PscMemqMetadataClient instances and validates Memq topic URIs via MemqTopicUri.validate().
psc/src/main/java/com/pinterest/psc/config/PscMetadataClientToMemqConsumerConfigConverter.java - Config converter that extends PscMetadataClientToBackendMetatadataClientConfigCoverter, mapping PSC metadata client configuration to Memq consumer configs (e.g., PSC_METADATA_CLIENT_ID to ConsumerConfigs.CLIENT_ID).
File modified
psc/src/main/java/com/pinterest/psc/consumer/memq/MemqTopicUri.java - Changed the constructor from package-private to public so it can be used from the com.pinterest.psc.metadata.client.memq package (consistent with how KafkaTopicUri has a public constructor).