Technical reference for every algorithm used in fullstack-manip.
Each section documents the theoretical background, the mathematical
formulation, and the code implementation.
- Part I — Kinematics & Planning
- Part II — Control
- Part III — Collision Detection & Avoidance
- Part IV — Trajectory Generation & Optimization (Roadmap)
- Part V — Analytics & Telemetry
This part covers algorithms that determine where the robot should be (inverse kinematics), how it gets there (trajectory planning), and where it can reach (workspace sampling).
Differential IK solves the first-order linearised kinematics:
where
The position/orientation task error is:
The QP is solved by the DAQP active-set solver.
Each joint-velocity solution is integrated forward with Euler:
Convergence thresholds and iteration cap are configurable constructor kwargs (defaults: pos_threshold=1e-4, ori_threshold=1e-4, max_iters=100). Custom IK backends can be injected via ik_fn — all Mink/MuJoCo logic is bypassed when provided.
| Name | Value | Description |
|---|---|---|
SOLVER |
"daqp" |
QP backend |
pos_threshold |
1e-4 m |
Position convergence (configurable) |
ori_threshold |
1e-4 rad |
Orientation convergence (configurable) |
max_iters |
100 |
Max integration steps (configurable) |
dt |
0.01 s |
Integration timestep |
position_cost |
1.0 |
Task weight |
lm_damping |
1e-6 |
Levenberg-Marquardt regularisation |
fullstack_manip/kinematics/ik_solver.py
IKSolver
solve_ik_for_qpos(frame_name, frame_type, target_pos, target_orient) → q[6]
Key steps:
# 1. Build SE(3) target
target = mink.SE3.from_rotation_and_translation(SO3(target_orient), target_pos)
# 2. Create frame task
task = mink.FrameTask(frame_name, frame_type,
position_cost=1.0, orientation_cost=0.0, lm_damping=1e-6)
task.set_target(target)
# 3. Integrate until convergence
for i in range(self._max_iters):
vel = mink.solve_ik(config, [task], dt, solver="daqp", limits=limits)
config.integrate_inplace(vel, dt)
e = task.compute_error(config)
if norm(e[:3]) <= self._pos_threshold and norm(e[3:]) <= self._ori_threshold:
break
return config.q[:6]Given start and goal Cartesian poses
- IK → maps each Cartesian pose to a joint configuration:
- Duration heuristic — minimum time to respect maximum joint velocity:
-
Cubic spline —
$C^2$ -continuous trajectory interpolating boundary conditions:
sampled at uniform timesteps
The cubic polynomial per segment satisfies:
where
| Name | Default | Description |
|---|---|---|
max_velocity |
0.5 rad/s |
Per-joint velocity bound |
max_acceleration |
1.0 rad/s² |
Per-joint acceleration bound |
dt |
0.01 s |
Trajectory sample rate |
fullstack_manip/planning/motion_planner.py
MotionPlanner
plan_trajectory(start_pos, end_pos, ...) → (trajectory[T,nq], t[T])
solve_ik_for_qpos(frame_name, ...) → q[6]
The planner delegates IK to IKSolver, trajectory generation to
scipy.interpolate.CubicSpline, and collision checking to the injected
CollisionChecker instance.
The reachable workspace
Algorithm:
- Grid:
axis = linspace(-R, R, G)along each axis, full meshgrid →$G^3$ candidates. - Random subsample to
$M$ usingnp.random.default_rng(seed)for reproducibility. - For each candidate
$\mathbf{p}^* = \mathbf{c} + \mathbf{offset}$ :- Solve
$q = \text{IK}(\mathbf{p}^*)$ from current seed configuration$q_{seed}$ - Apply
$q$ , run FK, get$\mathbf{p}_{fk}$ - Accept if $|\mathbf{p}{fk} - \mathbf{p}^*| \leq \delta{tol}$
- Update
$q_{seed} \leftarrow q$ (warm-start for nearby points)
- Solve
- Deduplicate by rounding to
$\delta_{tol}$ andnp.unique.
| Name | Default | Description |
|---|---|---|
grid_size |
20 |
Points per axis ( |
grid_range |
0.4 m |
Half-width of the sampling volume |
max_points |
4000 |
Subsample cap for IK budget |
tolerance |
5e-3 m |
FK acceptance threshold |
seed |
0 |
RNG seed for reproducibility |
fullstack_manip/core/robot.py
Robot
compute_workspace(ik_solver, grid_size=20, grid_range=0.4,
max_points=4000, tolerance=5e-3, center, seed=0)
→ np.ndarray[(N,3)]
visualize_workspace(points) # renders mjGEOM_SPHERE via user_scn
This part covers algorithms that execute motion: low-level joint regulation, Cartesian-space interaction controllers (impedance, admittance, force), model-based gravity/friction cancellation, and the pick-and-place task sequencer.
Each joint is regulated by an independent discrete-time PID controller:
with an additional passive damping term (
Optional clamping: integral is clipped to
The MotionExecutor wraps the PID loop with a contact-stop condition: trajectory
execution halts when CollisionChecker.detect_contact(ee, object_geom) returns
True.
| Gain | Default |
|---|---|
2.0 (per joint, uniform) |
|
0.1 |
|
0.5 |
fullstack_manip/control/motion_executor.py
MotionExecutor
move_to_pose(target_pos, target_orient, duration, ...)
fullstack_manip/control/low_level/pid_controller.py
PIDController
compute(target, current, timestamp=None) → u[n_q]
reset()
set_gains(kp, ki, kd)
The robot is modelled as a virtual mass-spring-damper in Cartesian space. The impedance relationship between end-effector displacement and applied force is:
where
The current implementation uses the proportional (stiffness) term in the motion loop:
Design rationale: all three matrices
Critical damping condition:
| Matrix | Default | Unit |
|---|---|---|
| N/m, Nm/rad | ||
| Ns/m, Nms/rad | ||
| kg, kg·m² |
fullstack_manip/control/high_level/impedance_controller.py
ImpedanceController(BaseController)
execute(desired_pos, desired_orient, duration=5.0)
set_stiffness(value, axis=None) # axis ∈ {x,y,z,rx,ry,rz} → index 0–5
set_damping(value, axis=None)
compliant_approach(target_pos, low_stiffness=100.0)
absorb_impact(duration=2.0, damping_ratio=2.0)
get_impedance_parameters() → {'stiffness': K, 'damping': B, 'inertia': M}
Admittance control is the dual of impedance: external forces are mapped to commanded Cartesian velocities. The admittance model is:
Rearranging for acceleration and integrating with forward Euler:
The linear system np.linalg.solve (LU factorisation of
Stability condition:
Velocity decay: when $|\mathbf{F}{ext}[:3]| < F{threshold}$,
velocity is multiplied by 0.95 per timestep (exponential decay to rest).
External force data.contact[i].frame[:3] and
accumulated into a 6-vector (3 force + 3 torque).
| Matrix | Default | Description |
|---|---|---|
| Virtual inertia | ||
| Virtual damping | ||
| Stiffness (zero = pure admittance) |
fullstack_manip/control/high_level/admittance_controller.py
AdmittanceController(BaseController)
execute(duration=10.0, force_threshold=1.0)
follow_force(force_source, duration, velocity_limit=0.1)
backdrive_motion(duration=5.0, sensitivity=0.5) → [(pos, orient)]
kinesthetic_teaching(duration=10.0) → [q]
_compute_acceleration(F_ext) → ẍ[6]
_measure_external_force() → F[6]
The controller maintains a target contact force
Force error:
Compliant approach — binary gate:
- If
$F_{contact} > F_{max}$ : stop immediately, returnFalse - Otherwise: step EE by
$v_{approach} \cdot \Delta t$ along$\hat{n}$
Contact-force maintenance loop (maintain_contact):
The position correction magnitude is proportional to
Jacobian transpose (conceptual, apply_constant_force):
approximated in the current implementation by direct Cartesian EE displacement.
| Name | Default |
|---|---|
target_force |
5.0 N |
force_tolerance |
1.0 N |
contact_threshold |
0.1 N |
approach_speed |
0.01 m/s |
fullstack_manip/control/high_level/force_control_controller.py
ForceControlController(BaseController)
execute(target_pos, target_orient, force_direction)
apply_constant_force(force_vector, duration)
maintain_contact(contact_body, duration) → bool
compliant_motion(target_pos, max_force, approach_speed=0.01) → bool
Gravity compensation — MuJoCo computes data.qfrc_bias. Applying the
negative of this term to the actuators makes the robot feel weightless:
with
Friction compensation — viscous + Coulomb model:
where
Friction calibration (calibrate_friction) — collects
Position-holding superimposes a joint-space spring:
| Name | Default | Range |
|---|---|---|
compensation_gain |
1.0 |
[0.0, 1.5] |
enable_friction_compensation |
False |
bool |
friction_coefficients |
0.1 (uniform) |
per-joint, calibrated |
fullstack_manip/control/high_level/gravity_compensation_controller.py
GravityCompensationController(BaseController)
execute(duration, record_trajectory=False) → [q]
hold_position(target_joints, duration, stiffness=0.1)
compensate_and_teach(duration) → [(qpos, ee_pose)]
calibrate_friction(duration, velocity_amplitude)
set_compensation_gain(gain)
_apply_compensation_torques(torques) # writes data.ctrl, calls mj_step
A finite state machine with four states driven by Cartesian waypoints and gripper commands. The gripper grasp condition is evaluated by contact force thresholds.
State sequence (pick_object):
| # | State | Action |
|---|---|---|
| 1 | Pre-grasp | Move EE to object_pos + [0.04, 0, 0.05] (above) |
| 2 | Open | Open gripper, verify release |
| 3 | Grasp | Move EE to object_pos + [0.02, 0, -0.005] (insert) |
| 4 | Close | Close gripper; if grasp_success == False → retry from step 1 |
Gripper success is determined by Gripper.check_grasp_success():
contact force between jaw geoms and object geom exceeds grasp_threshold (1.0 N).
| Offset | Value |
|---|---|
| Pre-grasp height | [0.04, 0, 0.05] m |
| Grasp insert | [0.02, 0, -0.005] m |
| Grasp force threshold | 1.0 N |
| Release force threshold | 0.01 N |
fullstack_manip/control/high_level/pick_place_controller.py
PickPlaceController(BaseController)
execute(pick_position, place_position)
pick_object(object_position) → bool
place_object(target_position)
This part covers all algorithms that detect, measure, and avoid collisions:
the shared CollisionCheckerInterface constraint layer used inside the IK QP,
three interchangeable geometry backends, and the trajectory-repair gradient-repulsion planner.
Mink stacks multiple constraint layers in the QP:
Three constraint types are used:
-
Configuration limit — joint position bounds from URDF:
$$q_{lo} \leq q + \dot{q},\Delta t \leq q_{hi}$$ -
Collision avoidance limit — minimum-distance repulsion between gripper geometry and obstacle geoms. Each pair
$(g_{robot}, g_{obs})$ generates an inequality keeping signed distance$d > d_{margin}$ :$$\nabla_q d \cdot \dot{q} \geq -\alpha,(d - d_{margin})$$ -
Velocity limit — per-joint speed cap: $$|\dot{q}j| \leq v{j,max}$$
Default velocity limits (all joints):
fullstack_manip/core/limit.py
LimitManager
set_limits(collision_pairs, max_velocities) → List[mink.Limit]
get_limits() → List[mink.Limit]
Collision pairs are constructed as:
for body in gripper_bodies:
for geom_id in mink.get_body_geom_ids(model, body_id):
for obs_geom_id in obstacle_geom_ids:
geom_pairs.append(([geom_id], [obs_geom_id]))MuJoCo runs a narrow-phase contact solver during mj_step / mj_forward.
It fills data.contact[0:ncon] with contact structs containing:
geom1, geom2— colliding geom IDsdist— signed penetration distance (negative = interpenetrating)frame[:3]— contact normal force resultant (3-vector)
Queries are
Contact force magnitude:
Trajectory check — sets data.qpos and calls mj_forward at each waypoint,
then scans the contact list for robot–obstacle pairs.
fullstack_manip/core/collision_backends/mujoco_collision.py
MuJoCoCollisionChecker(model, data)
detect_contact(body_a, body_b) → bool
check_contact_pair(body_a, body_b) → bool # dist < 0
compute_contact_force(body_a, body_b) → float
minimum_distance(body_a, body_b) → float # +inf if no contact
check_trajectory(trajectory[T,nq], obstacle_bodies, robot_bodies=None) → bool
check_collision(trajectory, obstacles) → bool # deprecated alias
Internal: _body_geom_ids(name) uses model.body_geomadr[id] + body_geomnum[id].
Coal (fork of hpp-fcl) implements the Gilbert-Johnson-Keerthi (GJK) algorithm for convex objects and the Expanding Polytope Algorithm (EPA) for penetration depth.
GJK iteratively finds the minimum-norm point of the Minkowski difference
Distance query:
BVH mesh representation: BVHModelOBBRSS decomposes a triangle mesh into
a hierarchy of oriented bounding boxes (OBB) for
FK is provided by yourdfpy:
update_cfg(joint_dict) → get_transform(link, collision_geometry=True) → Transform3s.setRotation(R) + setTranslation(t).
fullstack_manip/core/collision_backends/coal_collision.py
CoalCollisionChecker(urdf_path, joint_names=None)
detect_contact(body_a, body_b) → bool
check_contact_pair(body_a, body_b) → bool
minimum_distance(body_a, body_b) → float
compute_contact_force(...) → NotImplementedError
check_trajectory(trajectory, obstacle_bodies, robot_bodies=None) → bool
Install: pip install "fullstack-manip[coal]" (pulls coal>=2.0.0, yourdfpy>=0.0.53).
Trimesh wraps python-fcl
in a high-level CollisionManager. The manager maintains an internal
BVH (Bounding Volume Hierarchy) of triangle meshes and supports:
in_collision_single(mesh)— broadphase AABB then narrowphase triangle-triangle testmin_distance_single(mesh)— minimum unsigned distance between mesh surfaces
Signed distance is reconstructed by negating the unsigned distance when
in_collision_single returns True.
FK (same as Coal backend): yourdfpy.update_cfg → get_transform → mesh.apply_transform(T).
fullstack_manip/core/collision_backends/trimesh_collision.py
TrimeshCollisionChecker(urdf_path, joint_names=None)
detect_contact(body_a, body_b) → bool
check_contact_pair(body_a, body_b) → bool
minimum_distance(body_a, body_b) → float # negative if penetrating
compute_contact_force(...) → NotImplementedError
check_trajectory(trajectory, obstacle_bodies, robot_bodies=None) → bool
Install: pip install "fullstack-manip[trimesh]" (pulls trimesh>=4.0.0, yourdfpy>=0.0.53).
The planner repairs a naive joint trajectory
Bounding-sphere collision test — fast broadphase per link
Link
Numerical Jacobian column for joint
Repulsion gradient (accumulated over all colliding links):
Normalised gradient step with joint-limit clamp:
Repeat until collision-free or max_iter steps.
Post-processing:
-
Gaussian smoothing — removes per-waypoint kinks introduced by local gradient steps. Per-joint convolution with kernel:
$$k_i = \frac{\exp(-i^2 / 2\sigma^2)}{\sum_j \exp(-j^2/2\sigma^2)}, \qquad \sigma = 3\ \text{waypoints}$$ -
Upsample — linear interpolation from planning resolution (
$N_{plan}$ waypoints) to playback resolution ($N_{play} = fps \times T$ ) vianp.interp. -
Pre-bake — all FK transforms and per-link collision statuses are computed offline. The 60 fps render loop does only
$O(n_{links})$ handle-property writes.
| Parameter | Default | Description |
|---|---|---|
max_iter |
25 |
Max gradient steps per waypoint |
step |
0.05 |
Gradient step size |
eps |
0.003 rad |
Finite-difference perturbation |
sigma |
3.0 waypoints |
Gaussian smoothing width |
obs_eff_r |
|
Obstacle sphere radius + clearance |
plan_steps |
150 |
Planning waypoints |
fps |
60 |
Playback frame-rate |
Per waypoint per iteration: plan_steps=150, max_iter=25, n_links=7, n_joints=6, this is at most
examples/collision_vis_viser.py
_plan_avoiding_trajectory(raw_traj, robot_urdf, jnames,
local_centroids, link_radii,
obs_center, obs_eff_r,
j_lower, j_upper,
max_iter=25, step=0.05, eps=0.003)
_gauss_smooth(traj, sigma=3.0) → smoothed_traj
_resample(traj, n_out) → resampled_traj
_hitting_links(local_centroids, link_radii, fk, obs_center, obs_eff_r) → set
_bake(traj, ...) → List[{q, fk, hits}]
| Algorithm | Backend | Geometry | Force data | Speed | Dependency |
|---|---|---|---|---|---|
| MuJoCo narrow-phase | MuJoCoCollisionChecker |
Implicit (sim) | ✅ | Fastest (in-step) | mujoco (core) |
| GJK/EPA | CoalCollisionChecker |
BVH mesh + primitives | ❌ | Fast | coal, yourdfpy |
| BVH trimesh | TrimeshCollisionChecker |
BVH mesh | ❌ | Medium | trimesh, yourdfpy |
| Gradient repulsion | _plan_avoiding_trajectory |
Spheres (broadphase) | ❌ | Planning-time | none extra |
Select with create_collision_checker(backend, **kwargs) from
fullstack_manip.core.collision.
This part covers algorithms in fullstack_manip/planning/trajectory_generators/ and planning/task/.
Sections marked [🔲 not yet implemented] are still design specifications; see
sota-planning.md for library recommendations.
A degree-5 polynomial minimises the integral of squared jerk:
Velocity and acceleration are guaranteed zero at both endpoints. Generalises to joint-space by applying the formula per-DOF.
Flash & Hogan (1985) showed this polynomial matches biological reaching trajectories.
| Name | Value | Description |
|---|---|---|
T |
caller-specified | Duration in seconds |
n_joints |
6 | Applied per-DOF |
fullstack_manip/planning/trajectory_generators/min_jerk.py
min_jerk_trajectory(q_start, q_end, duration, dt=0.01) → (traj[T,n], t[T])
min_jerk_path(waypoints, durations?, dt=0.01, default_speed=0.5) → (traj, t)
MinJerkGenerator(dt=0.01, default_speed=0.5)
.generate(q_start, q_end, duration?) → (traj, t)
.generate_path(waypoints, durations?) → (traj, t)
from fullstack_manip.planning import min_jerk_trajectory, min_jerk_path, MinJerkGenerator
# Single segment
traj, t = min_jerk_trajectory(q0, q1, duration=1.0)
# Piecewise multi-waypoint (duration auto-estimated from joint-space distance)
traj, t = min_jerk_path([q0, q_via, q1], default_speed=0.5)
# Stateful wrapper
gen = MinJerkGenerator(dt=0.01, default_speed=0.5)
traj, t = gen.generate(q0, q1) # duration = ‖q1-q0‖₂ / default_speed
traj, t = gen.generate_path([q0, q1]) # piecewiseGiven a geometric path
fastest time parametrisation
TOPP-RA (Pham & Pham, 2018) solves this via a 1-D reachability analysis scan —
pip install topprafullstack_manip/planning/trajectory_generators/topp_ra.py
reparametrize(trajectory, t_in, velocity_limits?, acceleration_limits?, gridpoints?)
→ (new_traj, new_t)
ToppRaGenerator(velocity_limits?, acceleration_limits?)
.reparametrize(trajectory, t_in) → (new_traj, new_t)
.available → bool # False when toppra not installed → returns (traj, t) unchanged
from fullstack_manip.planning import ToppRaGenerator, reparametrize
new_traj, new_t = reparametrize(traj, t, velocity_limits=np.full(6, np.pi))
gen = ToppRaGenerator(velocity_limits=np.full(6, np.pi),
acceleration_limits=np.full(6, 2*np.pi))
new_traj, new_t = gen.reparametrize(traj, t)Graceful degradation: emits UserWarning at import when toppra is missing; call-site code unchanged.
Integrates with: any trajectory as a post-processing step.
STOMP (Stochastic Trajectory Optimization for Motion Planning, Kalakrishnan et al. 2011) generates noisy trajectory samples, evaluates cost (smoothness + obstacle penalty), and updates the trajectory via importance-weighted average:
where
No gradient computation required — purely sample-based.
| Name | Default | Description |
|---|---|---|
n_samples |
20 | Number of noisy trajectories per iteration |
n_iter |
50 | Optimization iterations |
noise_stddev |
0.05 rad | Per-DOF Gaussian noise |
lambda |
10.0 | Temperature controlling exploitation |
obstacle_weight |
1.0 | Weight of obstacle penetration cost |
[🔲 not yet implemented] — target file: fullstack_manip/planning/trajectory_optimizers/stomp.py
# Planned API
def stomp_optimize(trajectory, is_state_valid_fn, n_samples=20, n_iter=50) -> np.ndarray:
"""Returns optimized trajectory [T, n_joints]."""
...iLQR (iterative Linear Quadratic Regulator) optimises a trajectory under nonlinear dynamics
by alternating backward-pass (compute linear-quadratic gains) and forward-pass (improve trajectory):
The backward pass solves a Riccati recursion; the forward pass does a line search along the optimal perturbation direction. DDP adds second-order dynamics terms for faster convergence.
pip install crocoddyl # or implement from scratch with JAX/NumPy[🔲 not yet implemented] — suitable for torque-controlled arms in MuJoCo simulation;
not directly applicable to position-mode soarm100 servos.
Target integration: OperationalSpaceController torque-mode (already scaffolded) →
iLQR provides the reference trajectory; OSC tracks it.
A ground-atom STRIPS planner represents task state as a set of Boolean atoms
(strings). Each TaskAction specifies:
- preconditions — atoms that must all hold before the action can fire
- effects_add — atoms that become true after the action
- effects_del — atoms that become false after the action
Search uses breadth-first search (BFS) over the reachable state space, guaranteeing
the shortest plan (fewest actions) when one exists within max_depth.
Complexity:
| Name | Default | Description |
|---|---|---|
actions |
MANIPULATION_OPERATORS |
List of TaskAction objects |
max_depth |
50 |
BFS depth cap — prevents unbounded search |
| Action | Preconditions | Effects |
|---|---|---|
open_gripper |
gripper_closed |
+gripper_open, -gripper_closed |
close_gripper |
gripper_open, arm_at_grasp_pose |
+gripper_closed, +object_grasped, -gripper_open |
move_to_grasp_pose |
gripper_open |
+arm_at_grasp_pose |
move_to_place_pose |
object_grasped |
+arm_at_place_pose |
move_to_home |
— | +arm_at_home |
release_object |
gripper_closed, arm_at_place_pose |
+gripper_open, +object_placed, -object_grasped |
fullstack_manip/planning/task/task_planner.py
TaskAction(name, preconditions, effects_add, effects_del, metadata)
.is_applicable(state) → bool
.apply(state) → new_state
TaskPlanner(actions?, max_depth=50)
.plan(initial_state, goal_state) → TaskPlan
.add_action(action) # replaces by name
MANIPULATION_OPERATORS — predefined tabletop pick-place ground actions
fullstack_manip/core/interfaces.py
TaskPlan(success, actions, info) — result dataclass
TaskPlannerInterface — runtime-checkable Protocol
from fullstack_manip.planning import TaskPlanner, MANIPULATION_OPERATORS
planner = TaskPlanner(MANIPULATION_OPERATORS, max_depth=10)
plan = planner.plan(
initial_state={'gripper_open'},
goal_state={'object_placed'},
)
print(plan.success) # True
print(plan.actions) # ['move_to_grasp_pose', 'close_gripper',
# 'move_to_place_pose', 'release_object']fullstack_manip/analysis/ provides opt-in, zero-overhead telemetry instrumentation
for controllers and planners. Every instrumented component accepts a telemetry_logger=None
argument; omitting it disables all recording with no runtime cost.
A thread-safe, in-memory ring buffer that stores (timestamp, key, value) triples.
The buffer is backed by collections.deque(maxlen=N) so older entries are automatically
discarded when the buffer is full, making it suitable for long-running experiments.
TelemetryEntry = (timestamp: float, key: str, value: Any)
TelemetryBuffer.data: deque[TelemetryEntry]
from fullstack_manip.analysis import TelemetryBuffer
buf = TelemetryBuffer(maxlen=5000) # ring buffer, oldest entries dropped when full
# Log a scalar / array / dict
buf.log("joint_pos", q) # (6,) array
buf.log("ee_force", wrench)
buf.log("step", {"q": q, "dq": dq})
# Retrieve as NumPy array
q_history = buf.to_numpy("joint_pos") # shape (N, 6)
timestamps = buf.timestamps("joint_pos") # shape (N,)
# Persist
buf.save("run_001.npz")
buf2 = TelemetryBuffer.load("run_001.npz")
# Introspection
len(buf) # total entries across all keys
buf.keys() # set of logged key names
buf.clear()log()acquires athreading.Lock— safe to call from control threadto_numpy(key)stacks all values matchingkey; raisesKeyErrorif key absentsave()persists to.npzwith three arrays per key:timestamps_<key>,values_<key>, and a_keysmetadata arrayload()reconstructs the deque in chronological order
# fullstack_manip/core/interfaces.py
class TelemetryLoggerInterface(Protocol):
def log(self, key: str, value: Any, timestamp: float | None = None) -> None: ...Any class implementing log(key, value, timestamp) satisfies the protocol.
No base class required; duck typing applies.
from fullstack_manip.analysis import NullLogger
logger = NullLogger()
logger.log("anything", 42) # silent no-opNullLogger is the drop-in when telemetry is disabled.
It carries zero overhead because Python's method dispatch is not guarded by
an if logger is not None check in hot paths — the _emit() helper on
BaseController handles this transparently (see §5.5).
RerunLogger wraps the Rerun SDK to stream telemetry in real time
to the Rerun desktop viewer. Useful for visualising 3-D trajectories, joint time-series,
and contact forces during live experiments.
| Key logged | Rerun entity path | Type logged |
|---|---|---|
joint_pos |
robot/joints/position |
rr.Scalar per joint |
joint_vel |
robot/joints/velocity |
rr.Scalar per joint |
ee_pos |
robot/ee/position |
rr.Points3D |
ee_force |
robot/ee/force |
rr.Scalar |
rrt_node |
planning/rrt/nodes |
rr.Points3D |
trajectory |
planning/trajectory |
rr.LineStrips3D |
controller_error |
control/error |
rr.Scalar |
Custom keys are forwarded to <key> entity path as rr.AnyValues.
from fullstack_manip.analysis import RerunLogger
logger = RerunLogger(
application_id="arm_debug", # Rerun viewer window title
recording_id=None, # auto-generated UUID if None
spawn=True, # spawn Rerun viewer process inline
blueprint=None, # optional rr.Blueprint for layout
)
# Attach to a component
planner = RRTPlanner(..., telemetry_logger=logger)If rerun-sdk is not installed the module-level _HAS_RERUN guard disables
the logger silently:
try:
import rerun as rr
_HAS_RERUN = True
except ImportError:
_HAS_RERUN = FalseRerunLogger.log() becomes a no-op when _HAS_RERUN is False.
All plot functions live in fullstack_manip/analysis/ and return a matplotlib.figure.Figure.
They do not call plt.show(), so figures can be saved, embedded in notebooks, or forwarded
to the Streamlit dashboard without side-effects.
| Function | Input keys | Description |
|---|---|---|
plot_joint_positions(buf_or_array) |
joint_pos (N,6) |
Joint angles vs time, one axis per joint |
plot_joint_velocities(buf_or_array) |
joint_vel (N,6) |
Joint velocities vs time |
plot_ee_path(buf_or_array) |
ee_pos (N,3) |
3-D end-effector path (Axes3D) |
plot_trajectory_overview(buf) |
joint_pos, ee_pos |
Combined 2×2 summary figure |
| Function | Input keys | Description |
|---|---|---|
plot_command_vs_actual(buf) |
cmd, actual |
Commanded vs measured positions |
plot_force_signals(buf) |
ee_force |
Wrench components vs time |
plot_controller_error(buf) |
controller_error |
Tracking error norm vs time |
plot_controller_overview(buf) |
all above | Combined summary figure |
| Function | Input keys | Description |
|---|---|---|
plot_rrt_tree(buf) |
rrt_node |
Scatter of explored C-space nodes |
plot_cost_convergence(buf) |
cost |
Cost / path length vs iteration |
plot_planning_overview(buf) |
all above | Combined planning summary figure |
from fullstack_manip.analysis import TelemetryBuffer
from fullstack_manip.analysis.plot_trajectory import plot_joint_positions, plot_ee_path
from fullstack_manip.analysis.plot_controller import plot_controller_overview
from fullstack_manip.analysis.plot_planning import plot_planning_overview
buf = TelemetryBuffer.load("run_001.npz")
fig1 = plot_joint_positions(buf)
fig1.savefig("joints.png", dpi=150)
fig2 = plot_ee_path(buf)
fig2.savefig("ee_path.png", dpi=150)BaseController adds the _emit() helper that routes through the injected logger:
# fullstack_manip/control/high_level/base_controller.py (simplified)
class BaseController:
def __init__(self, ..., telemetry_logger=None):
self._tel = telemetry_logger # None → no-op
def _emit(self, key: str, value) -> None:
if self._tel is not None:
self._tel.log(key, value)Every controller subclass calls self._emit() inside execute():
# ImpedanceController
def execute(self, target_pos, target_rot=None, **kwargs):
...
self._emit("joint_pos", q)
self._emit("ee_pos", ee_pos)
self._emit("controller_error", error_norm)
...| Component | Keys emitted |
|---|---|
ImpedanceController |
joint_pos, ee_pos, controller_error |
TrajectoryFollowingController |
joint_pos, joint_vel, cmd |
MotionPlanner.plan_trajectory |
trajectory, joint_pos |
RRTPlanner.plan |
rrt_node, cost |
from fullstack_manip.analysis import TelemetryBuffer
from fullstack_manip.control.high_level.impedance_controller import ImpedanceController
buf = TelemetryBuffer(maxlen=10_000)
ctrl = ImpedanceController(robot=robot, dt=0.01, telemetry_logger=buf)
for _ in range(1000):
ctrl.execute(target_pos=goal)
# Post-process
from fullstack_manip.analysis.plot_controller import plot_controller_overview
fig = plot_controller_overview(buf)
fig.savefig("ctrl_analysis.png")