-
Notifications
You must be signed in to change notification settings - Fork 5
Pubsub pattern subs #1114
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: dev
Are you sure you want to change the base?
Pubsub pattern subs #1114
Conversation
- Move implementations to impl/ subdirectory (lcmpubsub, memory, shmpubsub, etc.) - Extract encoder mixins to encoders.py (PubSubEncoderMixin, PickleEncoderMixin, LCMEncoderMixin, JpegEncoderMixin) - Add AllPubSub and DiscoveryPubSub mixins with complementary default implementations - Add GlobPubSub and RegexPubSub marker classes with docstring examples - Update imports across codebase - Add CLAUDE.md to .gitignore
Greptile OverviewGreptile SummaryThis PR implements pattern-based subscriptions and bridging infrastructure for pubsub systems. The changes enable subscribing to topics using glob/regex patterns and facilitate cross-protocol message translation. Major Changes:
Issues Found:
Confidence Score: 4/5
Important Files Changed
Sequence DiagramsequenceDiagram
participant Source as Source PubSub<br/>(AllPubSub)
participant Bridge as Bridge Service
participant Translator as Translator
participant Dest as Destination PubSub
Note over Bridge: start()
Bridge->>Source: subscribe_all(callback) or<br/>subscribe(topic_from, callback)
Source-->>Bridge: unsubscribe function
Note over Source: Message published
Source->>Bridge: callback(msg_from, topic_from)
Bridge->>Translator: topic(topic_from)
Translator-->>Bridge: topic_to
Bridge->>Translator: msg(msg_from)
Translator-->>Bridge: msg_to
Bridge->>Dest: publish(topic_to, msg_to)
Note over Bridge: stop()
Bridge->>Source: unsubscribe()
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
4 files reviewed, 4 comments
- Add Glob class for glob-style pattern matching (*, **, ?) - Topic now accepts str, re.Pattern, or Glob for flexible subscription patterns - Pattern subscriptions return the actual matched channel in callback - Remove duplicate LCMMsg and Topic from lcmservice.py (use DimosMsg) - Add PubSubProtocol for structural typing - Add tests for regex and glob pattern subscriptions
…subscriptions - Add subscribe_all() method to LCMPubSubBase for subscribing to all topics - Add Topic.from_channel_str() factory method to parse channel strings with embedded type info - Channel format: /topic#module.ClassName enables automatic type extraction - Add test for subscribe_all with typed message decoding
Move message type resolution logic to a dedicated helper module with lru_cache for performance. Supports fallback to dimos_lcm module path.
af9a79b to
d077c67
Compare
b86a7d0 to
3a0c089
Compare
|
@greptileai can you review? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
2 files reviewed, 2 comments
- Fix AllPubSub type parameter order (TopicFrom, MsgFrom not MsgFrom, TopicFrom) - Fix pass_msg callback signature to match spec (MsgFrom, TopicFrom) - Pass subscribe_topic config to bridge() function
- helpers.py: Add type: ignore for getattr Any return - lcmpubsub.py: Add type: ignore for callback type variance and mixin incompatibility - shmpubsub.py: Add type: ignore for mixin incompatibility - transport.py: Add arg-type to existing type: ignore
|
So this works, but I think there's a certain bit of complexity with mixins and generics which isn't needed. Just some brainstorming ideasThe way I view it there are 4 things:
Topics should contain every information about what is subscribed/published to and be just static data (do nothing). One of the things I don't like about the current system is how I'm doing this currently for setting transports: LCMTransport constains LCM() in it's I think we should have something like: We could have a singleton broker gateway which passes data to all real brokers. So you can have: It checks if LcmBroker is started, if it's not, it starts it. (For cleanup, we can have This way, modules don't have to store their own LCMServers, or any server. They just have to know what topics they publish and what messages they publish. Instead of the encoder being a mixin, it could just be a param. So we can have It would be nice if In/Out could have subscribe/publish which just uses the gateway But then we can implement bridges at the Gateway level. Subscribe all could be just: |
| def __init__(self, *args, **kwargs) -> None: # type: ignore[no-untyped-def] | ||
| super().__init__(*args, **kwargs) | ||
| self._encode_callback_map: dict = {} # type: ignore[type-arg] | ||
| def on_msg(msg: MsgT, topic: TopicT) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need locks on modifying seen.
| import traceback | ||
| def subscribe_all(self, callback: Callable[[MsgT, TopicT], Any]) -> Callable[[], None]: | ||
| """Subscribe to all topics by subscribing to each discovered topic.""" | ||
| subscriptions: list[Callable[[], None]] = [] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need a lock
("detections", Detection3DModule): LCMTransport(
"/detector3d/detections", Detection2DArray
),
Yeah agreed, the reason I auto-started LCM was that it was early in dev and very cheap. Generally transports should be a
PubSubProtocol = Literal["LCM", "SHM", ...]
@dataclass
class Topic
protocol: PubSubProtocol
topic: str # maybe better name lol
type: RootMsgClassSo my stuff is more abstract then this because I didn't want to make assumptions pubsub being used for transport or what is required to configure a topic for a specific pubsub protocol I imagined # off site detections, local 3d projections
Detection3dModule.2d_detections.transport = DimosServerTransport("dimos.com")
# video specific transport
VideoParser.color_image.transport = CameraStream("rtsp://....")
# sending odom to remote server
robot.odom.transport = HTTPPost("http://bla.com/receiver.php")
Topic is just an init arg, and the concept of a Topic is not tied to Transport, and Topics if used in pubsub are not generalized since they not 1-to-1 across protocols, for example ROS Topic includes ROS specific QOS, buffer settings, zenoh topic might contain a broker IP address or node discovery mechanism etc. I can imagine a websocket connection to a dimos server being a Transport in the future, this requires a web server url. Right now transport spec should change to have more clear naming of methods class Transport():
send(msg: MsgT):
...
subscribe(callback):
...and that's all, so people don't feel tied to and the whole mariageThat being said, I think we can marry the two ideas, nothing stops you for having VideoParser.color_image.transport = mybroker.topic("/something")OR (what I did before) # mytransport in the background uses a process-central broker
VideoParser.color_image.transport = mytransport(topic("/something"))brief thoughts, what do you think? |
|
TL;DR: I think we should be able to specify how something is meant to be transported without reifying the transport. I view a topic as the equivalent of a connection string for a database. (Maybe I should use a different name, not topic.) You can call
So it's kinda like saying: But instead of using a string I break up the URI into its constituent parts.
The same is true for connection strings. A MySQL connection string can include In Postgres you'd use For us, we could have: |
Idk
judge me, not 100% sure
Intro
Point of this is to construct a generic bridging language, ability to define mappers from one protocol topic/msg to another.
This is easy - if you have a way to subscribe to all messages, this is what this PR lays the groundwork for
We need bridges mostly for spying/visualization - a rerun bridge atm
We want to do standard (and usually heaviest)
OUTvisualization related computation to be done outside of the modules themselves. Module OUTs can be tapped, then modules canrr.logindividually only for specific specific fancy vis (projected pointcloud, planner algo rendering)Rerun could be implemented as a funny (pub only) pubsub that can receive msgs that are automatically .to_rerun()-ed
.. or something like that
reorg
test_pattern_sub.pydocs/development/grid_testing.mdAllPubSub and DiscoveryPubSub
There are two ways in which a protocol can facilitate topic discovery, we implement base classes for both,
AllPubsubsupports subscribing to all topics (Redis, LCM, MQTT)DiscoveryPubSubsupports discovering new topics (ROS)These capabilities are orthogonal but they can implement one another and normalize the API
both implement
subscribe_allfunction we care about.Extending LCM
Topic class to support Regex or Glob patterns (as per the underlying proto)
TODO
other protocols need similar extensions (I will only do SHM),
we just want to get to something like subscribe_all (in potentially different ways) and once there we can bridge
once subscribe_all is reached, potentially post-reception (expensive) glob/regex implementations can also be implemented implicitly
Bridge
added preliminary bridge, mostly as a type spec to demonstrate the idea, shows how we could bridge different protocols once subscribe_all or glob/regex topics are implemented
Tests issue
Some tests are failing because some other test, lcm msgs are flying around while these tests are running. I'll investigate this, didn't want to sweep under rug, likely this is causing flakyness with other LCM tests also