Skip to content

Conversation

@Kaweees
Copy link
Member

@Kaweees Kaweees commented Jan 28, 2026

This merge request implements DDS (Data Distribution Service) transport layer using CycloneDDS, providing a new high-performance pub/sub transport option.

Quick Start

Note

We currently use Eclipse Cyclone DDS as our DDS implementation. Its IdlStruct feature lets you define DDS topic types in pure Python, eliminating the need for separate IDL files, with automatic serialization support.

from dataclasses import dataclass
from cyclonedds.idl import IdlStruct
from dimos.protocol.pubsub.ddspubsub import DDS, Topic

@dataclass
class SensorReading(IdlStruct):
    value: float

dds = DDS()
dds.start()

received = []
sensor_topic = Topic(name="sensors/temperature", typename=SensorReading)

dds.subscribe(sensor_topic, lambda msg, t: received.append(msg))
dds.publish(sensor_topic, SensorReading(value=22.5))

import time
time.sleep(0.1)

print(f"Received: {received[0]}")

dds.stop()

Unit Tests/Benchmarks

uv run pytest -svm tool dimos/protocol/pubsub/benchmark/test_benchmark.py --override-ini="addopts=" -k "DDS"
uv run pytest -svm tool dimos/protocol/pubsub/benchmark/test_benchmark.py
image

This builds off of #1036, which was closed because the branch was renamed to miguel/dds_transport

Kaweees and others added 22 commits January 15, 2026 18:06
* raw rospubsub and benchmarks

* typefixes, shm added to the benchmark

* SHM is not so important to tell us every time when it starts

* greptile comments

* Add co-authorship line to commit message filter patterns

* Remove unused contextmanager import

---------

Co-authored-by: Ivan Nikolic <lesh@sysphere.org>
Replace base64 string encoding with native IDL bytearray type to eliminate
buffer overflow issues. The original base64 encoding exceeded CycloneDDS's
default string size limit (~256 bytes) and caused crashes on messages >= 1KB.

Key changes:
- Use make_idl_struct with bytearray field instead of string
- Convert bytes to bytearray when publishing to DDS
- Convert bytearray back to bytes when receiving from DDS
- Add _DDSMessageListener for async message dispatch
- Implement thread-safe DataWriter/DataReader management
- Add pickle support via __getstate__/__setstate__

Result: All 12 DDS benchmark tests pass (64B to 10MB messages).
The double-checked locking pattern avoids lock contention on every
call after initial object creation. Initial benchmarking shows this
pattern performs better than simple locking for repeated accesses
to the same topics.
@greptile-apps
Copy link

greptile-apps bot commented Jan 28, 2026

Greptile Overview

Greptile Summary

This PR implements DDS (Data Distribution Service) transport protocol using CycloneDDS, providing a new high-performance pub/sub transport option alongside existing LCM and shared memory transports.

Key Changes:

  • Adds DDSPubSubBase and DDS classes implementing thread-safe pub/sub with proper locking for callbacks, readers, and writers
  • Implements DDSTransport in dimos/core/transport.py with lazy initialization protected by _start_lock
  • Creates DDSService with singleton DomainParticipant using double-checked locking pattern
  • Adds comprehensive benchmarking infrastructure for comparing transport performance
  • Includes ROS2 PubSub implementation (RawROS) for performance comparisons
  • Updates PubSubEncoderMixin to support generic encoding types beyond bytes (enables DDS IdlStruct messages)
  • Adds CycloneDDS dependency (>=0.10.5) and Nix environment configuration

Thread Safety:
The implementation properly addresses race conditions identified in previous review threads:

  • Callbacks are registered before reader creation (within _callback_lock)
  • Transport start checks use _start_lock to prevent concurrent initialization
  • Participant creation uses double-checked locking in DDSService

Testing:
Benchmark suite measures throughput, latency, and message loss across multiple message sizes (64B to 10MB) for all transport implementations.

Confidence Score: 5/5

  • This PR is safe to merge with proper thread-safe implementation
  • All threading race conditions from previous reviews have been properly addressed with appropriate locks. The code follows established patterns from existing transports, includes comprehensive error handling, and has benchmarking tests
  • No files require special attention - threading issues have been resolved

Important Files Changed

Filename Overview
dimos/protocol/pubsub/ddspubsub.py New DDS PubSub implementation with proper thread-safe callback and reader management
dimos/core/transport.py Added DDSTransport class with thread-safe lazy initialization using lock
dimos/protocol/service/ddsservice.py DDS service with thread-safe singleton participant initialization using double-checked locking
dimos/protocol/pubsub/rospubsub.py ROS2 PubSub implementation with executor thread and thread-safe publisher/subscriber management
dimos/protocol/pubsub/benchmark/test_benchmark.py Benchmark tests for DDS transport with throughput and latency measurements

Sequence Diagram

sequenceDiagram
    participant App
    participant DDSTransport
    participant DDS
    participant DDSService
    participant DomainParticipant
    participant DataWriter
    participant DataReader
    participant Listener

    Note over App,Listener: Initialization
    App->>DDSTransport: __init__(topic, type)
    DDSTransport->>DDS: __init__()
    DDS->>DDSService: __init__()
    
    Note over App,Listener: Publishing Flow
    App->>DDSTransport: broadcast(msg)
    DDSTransport->>DDSTransport: acquire _start_lock
    DDSTransport->>DDSTransport: check _started
    alt not started
        DDSTransport->>DDS: start()
        DDS->>DDSService: get_participant()
        DDSService->>DDSService: acquire _participant_lock
        DDSService->>DomainParticipant: create(domain_id)
        DDSService->>DDSService: release _participant_lock
    end
    DDSTransport->>DDSTransport: release _start_lock
    DDSTransport->>DDS: publish(topic, msg)
    DDS->>DDS: acquire _writer_lock
    DDS->>DataWriter: write(msg)
    DDS->>DDS: release _writer_lock
    
    Note over App,Listener: Subscription Flow
    App->>DDSTransport: subscribe(callback)
    DDSTransport->>DDSTransport: acquire _start_lock
    DDSTransport->>DDSTransport: check _started
    alt not started
        DDSTransport->>DDS: start()
    end
    DDSTransport->>DDSTransport: release _start_lock
    DDSTransport->>DDS: subscribe(topic, callback)
    DDS->>DDS: acquire _callback_lock
    DDS->>DDS: add callback to list
    DDS->>Listener: create with callback reference
    Listener->>Listener: store callbacks[topic] reference
    DDS->>DataReader: create with listener
    DDS->>DDS: release _callback_lock
    
    Note over App,Listener: Message Reception
    DataReader->>Listener: on_data_available(reader)
    Listener->>DataReader: take()
    DataReader-->>Listener: samples
    loop for each sample
        loop for each callback
            Listener->>App: callback(sample, topic)
        end
    end
Loading

Copy link

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

5 files reviewed, 3 comments

Edit Code Review Agent Settings | Greptile

@Kaweees
Copy link
Member Author

Kaweees commented Jan 28, 2026

@greptile

Copy link

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

5 files reviewed, no comments

Edit Code Review Agent Settings | Greptile

@Kaweees Kaweees self-assigned this Jan 28, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants