Deep-dives on the four main implemented subsystems.
The behavior tree framework wraps existing planning/control/perception modules as re-composable leaf nodes.
from fullstack_manip.execution import Blackboard, BaseBehavior, Status
# Blackboard: thread-safe shared state
bb = Blackboard()
bb.set("target_pose", [0.5, 0.0, 0.3])
pose = bb.get("target_pose")
# Status enum
Status.SUCCESS | Status.FAILURE | Status.RUNNING | Status.INVALID
# BaseBehavior interface
class MyBehavior(BaseBehavior):
def setup(self) -> bool: ...
def update(self) -> Status: ...
def terminate(self, new_status: Status) -> None: ...Motion (4) — wrap MotionPlanner:
MoveToJointConfiguration, MoveToCartesianPose, ExecuteTrajectory, UpdateCollisionWorld
Gripper (4) — wrap Gripper:
OpenGripper, CloseGripper, VerifyGrasp, SetGripperPosition
Perception (5) — wrap vision:
DetectObject, EstimateObjectPose, CheckDetectionStatus, VisuallyAlign, UpdateSceneState
Safety (6) — monitor constraints:
MonitorJointLimits, MonitorWorkspaceBounds, CheckForceLimit, EmergencyStop, CheckCollision, ValidateTrajectory
Control (7) — wrap high-level controllers:
ImpedanceControlBehavior, AdmittanceControlBehavior, TrajectoryFollowingBehavior,
ForceControlBehavior, GravityCompensationBehavior, PickPlaceControlBehavior, VisualServoControlBehavior
Skills are composite behaviors (subtrees) in execution/skills/. Example:
from fullstack_manip.execution.skills.pick_skill import PickSkill
pick = PickSkill(blackboard, motion_planner, gripper)
status = pick.tick()Task Trees (complete workflows)
└── Skills (reusable composites)
└── Atomic Behaviors (leaf wrappers)
├── planning/ (motion)
├── control/ (controllers)
├── state_estim/ (perception)
└── core/ (gripper, safety)
Dedicated gripper control extracted from Robot. Supports force-based grasp verification.
from fullstack_manip.core import Gripper
gripper = Gripper(
model=mujoco_model,
data=mujoco_data,
joint_names=["left_jaw", "right_jaw"],
actuator_names=["jaw_actuator"],
)
gripper.set_positions(close_pos=0.0, open_pos=1.0)
gripper.set_force_thresholds(grasp_threshold=1.0, release_threshold=0.01)
gripper.set_object_geom("target_cube")
# Control
success = gripper.open() # open + verify release
success = gripper.close() # close + verify grasp
success = gripper.grasp() # alias for close(check_grasp=True)
success = gripper.release() # alias for open(check_release=True)
# State
pos = gripper.get_position()
gripper.is_open()
gripper.is_closed()
grasped = gripper.check_grasp_success() # contact forces + in-hand detection| Property | Type | Description |
|---|---|---|
grasp_success |
bool | Object currently in grasp |
joint_names |
list | Gripper joint names |
object_geom |
str | Target object geometry name |
close_position |
float | Position value for closed state |
open_position |
float | Position value for open state |
Generates Graphviz architecture diagrams from a live plant or config file.
pip install graphviz
brew install graphviz # macOS
# or: sudo apt install graphvizfrom fullstack_manip.core import PlantVisualizer, ConfigVisualizer, visualize_plant, visualize_config
# Visualize a live plant (4 diagram types)
diagrams = visualize_plant(plant, output_dir="diagrams")
# returns: {'components': '...png', 'dataflow': '...png', 'state': '...png'}
# Visualize a config before creating the plant
config = PlantConfig.from_yaml("configs/pickplace.yaml")
visualize_config(config, "diagrams/my_config", format="png")
# Fine-grained control
viz = PlantVisualizer(plant)
viz.generate_component_diagram("diagrams/components", format="svg")
viz.generate_dataflow_diagram("diagrams/dataflow", format="pdf")
viz.generate_state_diagram("diagrams/state")
diagrams = viz.generate_all_diagrams(output_dir="diagrams")| Type | Shows |
|---|---|
| Component | Plant structure, all attached components |
| Data Flow | Command/feedback paths through layers |
| State | StateManager hub, state types, observers |
| Config | Configuration structure preview |
Degrades gracefully if graphviz is not installed (returns DOT source string instead).
Define manipulation systems declaratively in YAML or JSON; no code changes needed.
name: pickplace_plant
robot:
type: generic
model_path: "hardware/urdf/robot.xml"
base_link: "base_link"
ee_link: "end_effector"
gripper:
joint_names: ["gripper_left", "gripper_right"]
open_position: 0.04
closed_position: 0.0
min_force: 0.1
max_force: 10.0
motion_planner:
type: rrt
max_planning_time: 5.0
controllers:
pid:
type: pid
kp: 100.0
ki: 0.0
kd: 10.0
objects:
- name: red_cube
type: box
position: [0.5, 0.0, 0.05]
properties:
size: [0.05, 0.05, 0.05]
graspable: truefrom fullstack_manip.core import PlantConfig, create_plant_from_config
# Load
config = PlantConfig.from_yaml("configs/pickplace.yaml")
config = PlantConfig.from_json("configs/assembly.json")
# Validate
config.validate()
# Create plant (requires MuJoCo model/data)
plant = create_plant_from_config(config, model, data)
# Save
config.to_yaml("output.yaml")
config.to_json("output.json")robot (required): type, model_path, base_link, ee_link
gripper (optional): joint_names, open_position, closed_position, min_force, max_force
motion_planner (optional): type (rrt/rrt_star/prm), max_planning_time, step_size
controllers (optional): keyed dict of {name: {type, ...params}}
sensors (optional): keyed dict of {name: {type, ...params}}
objects (optional): list of {name, type, position, orientation, properties}
See configs/ for working examples: minimal.yaml, pickplace.yaml, assembly.json, rates.yaml.
The fullstack_manip/analysis/ package provides opt-in telemetry with zero overhead when disabled.
Attach a logger anywhere along the stack; omit it and nothing is recorded.
Thread-safe, in-memory ring buffer. Drop-in storage backend for any instrumented component.
from fullstack_manip.analysis import TelemetryBuffer
buf = TelemetryBuffer(maxlen=5000) # oldest entries auto-discarded at capacity
# Record during a run
buf.log("joint_pos", q) # q is a (6,) NumPy array
buf.log("ee_pos", ee_xyz)
buf.log("cost", 0.042)
# Retrieve
q_history = buf.to_numpy("joint_pos") # shape (N, 6)
ts = buf.timestamps("joint_pos") # shape (N,)
# Persist
buf.save("run_001.npz")
buf2 = TelemetryBuffer.load("run_001.npz")
# Introspect
buf.keys() # {'joint_pos', 'ee_pos', 'cost'}
len(buf) # total entries (all keys combined)
buf.clear()When telemetry_logger=None (the default on every instrumented class) a NullLogger is
substituted internally, making every _emit() call a no-op:
from fullstack_manip.analysis import NullLogger
# Used internally — you rarely need to instantiate this directly
logger = NullLogger()
logger.log("anything", 99) # silentStreams telemetry live to the Rerun desktop viewer for real-time 3-D inspection of trajectories, joint signals, and contact forces.
from fullstack_manip.analysis import RerunLogger
logger = RerunLogger(
application_id="arm_debug",
spawn=True, # open Rerun viewer immediately
)Requires pip install rerun-sdk>=0.16. If the package is absent the logger degrades silently to a no-op.
All functions return a matplotlib.figure.Figure without calling plt.show().
from fullstack_manip.analysis import TelemetryBuffer
from fullstack_manip.analysis.plot_trajectory import (
plot_joint_positions,
plot_joint_velocities,
plot_ee_path,
plot_trajectory_overview,
)
from fullstack_manip.analysis.plot_controller import plot_controller_overview
from fullstack_manip.analysis.plot_planning import plot_planning_overview
buf = TelemetryBuffer.load("run_001.npz")
fig = plot_joint_positions(buf)
fig.savefig("joints.png", dpi=150)
fig = plot_controller_overview(buf)
fig.savefig("ctrl.png", dpi=150)
fig = plot_planning_overview(buf)
fig.savefig("planning.png", dpi=150)| Component | telemetry_logger param |
Keys emitted |
|---|---|---|
BaseController |
telemetry_logger=None |
via _emit() helper |
ImpedanceController |
inherited | joint_pos, ee_pos, controller_error |
TrajectoryFollowingController |
inherited | joint_pos, joint_vel, cmd |
MotionPlanner |
telemetry_logger=None |
trajectory, joint_pos |
RRTPlanner |
telemetry_logger=None |
rrt_node, cost |
scripts/dashboard.py provides a 7-tab interactive dashboard for live control,
planning, telemetry replay, config editing, and diagnostics:
python -m streamlit run scripts/dashboard.pyTabs: Robot Control · Planning Workshop · Live Telemetry · Config Editor · Replay · Diagnostics · State Estimation