Optuna PID Tuning — Implementation Plan

For Claude: REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.

Goal: Automatically optimize AR4 PID gains using Optuna, communicating with the Gazebo sim via Zenoh (no ROS dependency in the optimizer).

Architecture: Standalone Optuna optimizer container publishes gain vectors and trial commands to Zenoh keys. A tuning bridge node inside the sim container subscribes to these, reconfigures the JointTrajectoryController, executes trajectories, and reports status back via Zenoh. The optimizer evaluates joint_states to compute a cost function (steady-state error + settling time).

Tech Stack: Python 3.12, Optuna (TPE sampler), eclipse-zenoh, pycdr2 (CDR deserialization), rclpy (bridge node only), Docker.

Design doc: docs/plans/2026-03-10-optuna-pid-tuning-design.md


Task 1: Zenoh Router Storage Configuration

Files:

  • Create: zenoh/zenoh-storage.json5

Step 1: Create the zenoh directory and storage config

// zenoh/zenoh-storage.json5
{
  mode: "router",
  listen: { endpoints: ["tcp/0.0.0.0:7447"] },
  plugins: {
    rest: { http_port: 8000 },
    storage_manager: {
      volumes: { memory: {} },
      storages: {
        joint_states: {
          key_expr: "ar4/joint_states",
          volume: { id: "memory" }
        },
        tuning_gains: {
          key_expr: "ar4/tuning/gains",
          volume: { id: "memory" }
        },
        tuning_command: {
          key_expr: "ar4/tuning/command",
          volume: { id: "memory" }
        },
        tuning_status: {
          key_expr: "ar4/tuning/status",
          volume: { id: "memory" }
        }
      }
    }
  },
  adminspace: { enabled: true }
}

Step 2: Verify syntax

Run: python3 -c "import json; json.load(open('zenoh/zenoh-storage.json5'))" 2>&1 || echo "JSON5 not valid JSON — that's OK, Zenoh parses JSON5 natively"

JSON5 allows comments and trailing commas which JSON doesn't. The Zenoh router handles it.

Step 3: Commit

git add zenoh/zenoh-storage.json5
git commit -m "feat: add Zenoh router storage config for PID tuning"

Task 2: Docker Compose Zenoh Infrastructure Services

Files:

  • Modify: docker-compose.yaml
  • Modify: docker/Dockerfile.gpu (add zenoh-bridge-ros2dds to overlay)

Step 1: Add zenoh-bridge-ros2dds to the overlay Dockerfile

In docker/Dockerfile.gpu, in the overlay stage, after the colcon build, add:

# Install zenoh-bridge-ros2dds for DDS-to-Zenoh bridging
RUN apt-get update && apt-get install -y --no-install-recommends \
    ros-${ROS_DISTRO}-zenoh-bridge-ros2dds \
    && rm -rf /var/lib/apt/lists/*

If the apt package doesn't exist for Jazzy, install from the Eclipse Zenoh GitHub releases:

# Install zenoh-bridge-ros2dds (standalone binary)
RUN curl -sSL https://github.com/eclipse-zenoh/zenoh-plugin-ros2dds/releases/latest/download/zenoh-bridge-ros2dds-$(dpkg --print-architecture)-unknown-linux-gnu.zip -o /tmp/zbridge.zip \
    && unzip /tmp/zbridge.zip -d /usr/local/bin/ \
    && chmod +x /usr/local/bin/zenoh-bridge-ros2dds \
    && rm /tmp/zbridge.zip

Step 2: Add Zenoh services to docker-compose.yaml

Append these services after the foxglove-bridge service:

  # Zenoh router — central pub/sub broker with in-memory storage
  zenoh-router:
    image: eclipse/zenoh:latest
    network_mode: host
    command: --config /config/zenoh-storage.json5
    volumes:
      - ./zenoh:/config:ro
 
  # Zenoh-to-ROS 2 DDS bridge
  zenoh-bridge:
    extends: overlay
    command: zenoh-bridge-ros2dds -e tcp/localhost:7447 --mode client
    depends_on:
      - zenoh-router

Step 3: Rebuild overlay to verify Dockerfile changes

Run: docker compose build overlay 2>&1 | tail -5

Expected: Successful build with zenoh-bridge-ros2dds installed.

Step 4: Test Zenoh router starts

Run: docker compose up -d zenoh-router && sleep 3 && curl -s http://localhost:8000/@/router/local | head -5 && docker compose down zenoh-router

Expected: JSON response from the Zenoh admin API.

Step 5: Commit

git add docker-compose.yaml docker/Dockerfile.gpu
git commit -m "feat: add Zenoh router and DDS bridge to Docker Compose"

Task 3: Tuning Bridge Node (ROS + Zenoh)

This node runs inside the sim container. It receives PID gains and commands from Zenoh, reconfigures the JointTrajectoryController, executes trajectories, and reports status.

Files:

  • Create: ar4_skills/tuning/__init__.py
  • Create: ar4_skills/tuning/tuning_bridge_node.py
  • Create: tests/test_tuning_bridge.py

Step 1: Write the failing test

Create tests/test_tuning_bridge.py:

"""Unit tests for the tuning bridge's gain parsing and YAML generation."""
import json
import pytest
 
 
def test_parse_gains_json():
    """Gains JSON should be parsed into a per-joint dict."""
    from ar4_skills.tuning.tuning_bridge_node import parse_gains_json
 
    gains = {
        "joint_1": {"p": 500.0, "i": 100.0, "d": 25.0, "i_clamp": 50.0},
        "joint_2": {"p": 1000.0, "i": 200.0, "d": 50.0, "i_clamp": 100.0},
    }
    result = parse_gains_json(json.dumps(gains))
    assert result["joint_1"]["p"] == 500.0
    assert result["joint_2"]["i_clamp"] == 100.0
 
 
def test_parse_gains_json_rejects_missing_keys():
    """Gains with missing PID keys should raise ValueError."""
    from ar4_skills.tuning.tuning_bridge_node import parse_gains_json
 
    bad = {"joint_1": {"p": 500.0}}  # missing i, d, i_clamp
    with pytest.raises(ValueError, match="missing"):
        parse_gains_json(json.dumps(bad))
 
 
def test_build_controller_params():
    """Build the parameter dict needed for controller reconfiguration."""
    from ar4_skills.tuning.tuning_bridge_node import build_controller_params
 
    gains = {
        "joint_1": {"p": 500.0, "i": 100.0, "d": 25.0, "i_clamp": 50.0},
    }
    params = build_controller_params(gains)
    assert params["gains.joint_1.p"] == 500.0
    assert params["gains.joint_1.i_clamp"] == 50.0

Step 2: Run test to verify it fails

Run: cd /mnt/nvme_internal_drive/robotics/arms/ar4-physical-ai && python -m pytest tests/test_tuning_bridge.py -v 2>&1 | tail -10

Expected: FAIL — module not found.

Step 3: Create the tuning bridge module

Create ar4_skills/tuning/__init__.py (empty).

Create ar4_skills/tuning/tuning_bridge_node.py:

"""
Tuning Bridge Node — sits inside the sim container.
 
Subscribes to Zenoh keys for PID gains and trial commands.
Reconfigures the JointTrajectoryController and executes trajectories.
Publishes trial status back to Zenoh.
 
Usage (inside sim container):
    python -m ar4_skills.tuning.tuning_bridge_node --connect tcp/localhost:7447
"""
 
import argparse
import json
import threading
import time
 
JOINT_NAMES = [f"joint_{i}" for i in range(1, 7)]
REQUIRED_GAIN_KEYS = {"p", "i", "d", "i_clamp"}
 
# Reach pose for trajectory test (radians, within AR4 MK3 limits)
REACH_POSE = [0.5, 0.3, -0.4, 0.8, -0.5, 0.3]
HOME_POSE = [0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
 
 
def parse_gains_json(gains_json: str) -> dict:
    """Parse a JSON string of PID gains into a per-joint dict.
 
    Expected format:
        {"joint_1": {"p": 500, "i": 100, "d": 25, "i_clamp": 50}, ...}
 
    Raises ValueError if any joint is missing required keys.
    """
    gains = json.loads(gains_json)
    for jname, jgains in gains.items():
        missing = REQUIRED_GAIN_KEYS - set(jgains.keys())
        if missing:
            raise ValueError(
                f"Joint {jname} missing PID keys: {missing}"
            )
    return gains
 
 
def build_controller_params(gains: dict) -> dict:
    """Convert per-joint gains dict to flat ros2 parameter dict.
 
    Returns: {"gains.joint_1.p": 500.0, "gains.joint_1.i": 100.0, ...}
    """
    params = {}
    for jname, jgains in gains.items():
        for key, value in jgains.items():
            params[f"gains.{jname}.{key}"] = float(value)
    return params
 
 
class TuningBridge:
    """Bridge between Zenoh (optimizer) and ROS 2 (controller_manager)."""
 
    def __init__(self, zenoh_connect: str):
        import zenoh
 
        self._lock = threading.Lock()
        self._status = "idle"
 
        conf = zenoh.Config()
        if zenoh_connect:
            conf.insert_json5("connect/endpoints", json.dumps([zenoh_connect]))
        self._session = zenoh.open(conf)
 
        self._status_pub = self._session.declare_publisher("ar4/tuning/status")
        self._session.declare_subscriber("ar4/tuning/gains", self._on_gains)
        self._session.declare_subscriber("ar4/tuning/command", self._on_command)
 
        self._pending_gains = None
 
        # ROS 2 imports (only available inside sim container)
        import rclpy
        from rclpy.node import Node
 
        rclpy.init()
        self._ros_node = rclpy.create_node("tuning_bridge")
        self._ros_executor_thread = threading.Thread(
            target=rclpy.spin, args=(self._ros_node,), daemon=True
        )
        self._ros_executor_thread.start()
 
    def _publish_status(self, status: str, detail: str = ""):
        msg = json.dumps({"status": status, "detail": detail, "ts": time.time()})
        self._status_pub.put(msg.encode())
 
    def _on_gains(self, sample):
        """Receive new PID gains from the optimizer."""
        try:
            gains = parse_gains_json(bytes(sample.payload).decode())
            with self._lock:
                self._pending_gains = gains
            self._publish_status("gains_received")
        except (json.JSONDecodeError, ValueError) as e:
            self._publish_status("error", str(e))
 
    def _on_command(self, sample):
        """Receive trial commands from the optimizer."""
        try:
            cmd = json.loads(bytes(sample.payload).decode())
            action = cmd.get("action", "")
 
            if action == "reconfigure":
                self._do_reconfigure()
            elif action == "move_to_reach":
                self._do_move(REACH_POSE, "reach")
            elif action == "move_to_home":
                self._do_move(HOME_POSE, "home")
            else:
                self._publish_status("error", f"unknown action: {action}")
        except Exception as e:
            self._publish_status("error", str(e))
 
    def _do_reconfigure(self):
        """Deactivate JTC, set new gains, reactivate."""
        from controller_manager_msgs.srv import (
            SwitchController,
            ListControllers,
        )
 
        with self._lock:
            gains = self._pending_gains
        if gains is None:
            self._publish_status("error", "no gains received")
            return
 
        self._publish_status("reconfiguring")
 
        # Deactivate
        switch_client = self._ros_node.create_client(
            SwitchController, "/controller_manager/switch_controller"
        )
        switch_client.wait_for_service(timeout_sec=10.0)
 
        req = SwitchController.Request()
        req.deactivate_controllers = ["joint_trajectory_controller"]
        req.strictness = SwitchController.Request.BEST_EFFORT
        future = switch_client.call_async(req)
        while not future.done():
            time.sleep(0.05)
 
        # Set parameters
        params = build_controller_params(gains)
        from rcl_interfaces.msg import Parameter, ParameterValue, ParameterType
 
        set_params_client = self._ros_node.create_client(
            type("SetParameters", (), {}),  # placeholder
            "/joint_trajectory_controller/set_parameters",
        )
        # Use ros2 param set via subprocess as a simpler approach
        import subprocess
        for param_name, param_value in params.items():
            subprocess.run(
                [
                    "ros2", "param", "set",
                    "/joint_trajectory_controller",
                    param_name,
                    str(param_value),
                ],
                capture_output=True,
                timeout=5,
            )
 
        # Reactivate
        req2 = SwitchController.Request()
        req2.activate_controllers = ["joint_trajectory_controller"]
        req2.strictness = SwitchController.Request.BEST_EFFORT
        future2 = switch_client.call_async(req2)
        while not future2.done():
            time.sleep(0.05)
 
        self._publish_status("ready")
 
    def _do_move(self, positions: list, label: str):
        """Send a trajectory goal to the JTC."""
        from trajectory_msgs.msg import JointTrajectory, JointTrajectoryPoint
        from builtin_interfaces.msg import Duration
 
        self._publish_status("moving", label)
 
        pub = self._ros_node.create_publisher(
            JointTrajectory,
            "/joint_trajectory_controller/joint_trajectory",
            10,
        )
        time.sleep(0.5)  # wait for publisher to be discovered
 
        msg = JointTrajectory()
        msg.joint_names = JOINT_NAMES
        point = JointTrajectoryPoint()
        point.positions = positions
        point.velocities = [0.0] * 6
        point.time_from_start = Duration(sec=3, nanosec=0)
        msg.points = [point]
 
        pub.publish(msg)
        self._publish_status("trajectory_sent", label)
 
    def spin(self):
        """Block until interrupted."""
        self._publish_status("idle")
        try:
            while True:
                time.sleep(1.0)
        except KeyboardInterrupt:
            pass
 
 
def main():
    parser = argparse.ArgumentParser(description="PID Tuning Bridge Node")
    parser.add_argument("--connect", default="tcp/localhost:7447",
                        help="Zenoh router endpoint")
    args = parser.parse_args()
 
    bridge = TuningBridge(args.connect)
    bridge.spin()
 
 
if __name__ == "__main__":
    main()

Step 4: Run the unit tests (pure functions only, no ROS)

Run: cd /mnt/nvme_internal_drive/robotics/arms/ar4-physical-ai && python -m pytest tests/test_tuning_bridge.py -v 2>&1 | tail -10

Expected: 3 tests PASS.

Step 5: Commit

git add ar4_skills/tuning/ tests/test_tuning_bridge.py
git commit -m "feat: add tuning bridge node with gain parsing and controller reconfiguration"

Task 4: Optuna PID Optimizer (Standalone, Zenoh-only)

Files:

  • Create: ar4_skills/tuning/pid_optimizer.py
  • Create: tests/test_pid_optimizer.py

Step 1: Write the failing test

Create tests/test_pid_optimizer.py:

"""Unit tests for the PID optimizer's cost function and gain sampling."""
import numpy as np
import pytest
 
 
def test_compute_steady_state_error():
    """Steady-state error is the mean absolute error over the last N samples."""
    from ar4_skills.tuning.pid_optimizer import compute_steady_state_error
 
    # 6 joints, target all zeros, positions have some error
    positions = np.array([
        [0.01, 0.02, 0.005, 0.01, 0.03, 0.01],
        [0.01, 0.02, 0.005, 0.01, 0.03, 0.01],
        [0.01, 0.02, 0.005, 0.01, 0.03, 0.01],
    ])
    target = np.zeros(6)
    error = compute_steady_state_error(positions, target)
    np.testing.assert_allclose(error, [0.01, 0.02, 0.005, 0.01, 0.03, 0.01])
 
 
def test_compute_settling_time():
    """Settling time is when all joints stay within threshold."""
    from ar4_skills.tuning.pid_optimizer import compute_settling_time
 
    # Timestamps at 0.1s intervals
    timestamps = np.arange(0, 3.0, 0.1)
    # Joint starts at 0.1 rad error, settles to 0.005 at t=1.5s
    positions = np.zeros((len(timestamps), 1))
    positions[:15, 0] = 0.05  # first 1.5s: error > threshold
    positions[15:, 0] = 0.005  # after 1.5s: error < threshold
 
    target = np.zeros(1)
    t = compute_settling_time(timestamps, positions, target, threshold=0.01)
    assert abs(t - 1.5) < 0.15  # within one sample
 
 
def test_compute_cost():
    """Cost combines steady-state error and settling time."""
    from ar4_skills.tuning.pid_optimizer import compute_cost
 
    ss_errors = np.array([0.01, 0.02, 0.005])
    settling_time = 2.5
    cost = compute_cost(ss_errors, settling_time, w1=10.0, w2=1.0)
    expected = 10.0 * np.sum(ss_errors) + 1.0 * settling_time
    assert abs(cost - expected) < 1e-6
 
 
def test_sample_gains():
    """Sampled gains should be within bounds for all joints."""
    from ar4_skills.tuning.pid_optimizer import sample_gains
    import optuna
 
    study = optuna.create_study(direction="minimize")
    trial = study.ask()
    gains = sample_gains(trial)
    assert len(gains) == 6
    for jname, jgains in gains.items():
        assert 100 <= jgains["p"] <= 10000
        assert 10 <= jgains["i"] <= 2000
        assert 5 <= jgains["d"] <= 500
        assert 10 <= jgains["i_clamp"] <= 500

Step 2: Run to verify failure

Run: python -m pytest tests/test_pid_optimizer.py -v 2>&1 | tail -10

Expected: FAIL — module not found.

Step 3: Create the optimizer

Create ar4_skills/tuning/pid_optimizer.py:

"""
Optuna PID Optimizer — standalone, communicates only via Zenoh.
 
Runs TPE optimization over 24 PID parameters (P, I, D, i_clamp x 6 joints).
Evaluates cost = w1 * steady_state_error + w2 * settling_time.
 
Usage:
    python -m ar4_skills.tuning.pid_optimizer \
        --connect tcp/localhost:7447 \
        --n-trials 300 \
        --db data/tuning/optuna_study.db
"""
 
import argparse
import json
import os
import time
import threading
 
import numpy as np
 
JOINT_NAMES = [f"joint_{i}" for i in range(1, 7)]
HOME_POSE = np.zeros(6)
REACH_POSE = np.array([0.5, 0.3, -0.4, 0.8, -0.5, 0.3])
 
# Search space bounds
BOUNDS = {
    "p": (100.0, 10000.0),
    "i": (10.0, 2000.0),
    "d": (5.0, 500.0),
    "i_clamp": (10.0, 500.0),
}
 
# Cost weights
W1 = 10.0  # steady-state error
W2 = 1.0   # settling time
 
 
def compute_steady_state_error(
    positions: np.ndarray, target: np.ndarray
) -> np.ndarray:
    """Mean absolute error per joint over the given position samples.
 
    Args:
        positions: (N, n_joints) array of joint positions
        target: (n_joints,) target positions
 
    Returns: (n_joints,) mean absolute error per joint
    """
    return np.mean(np.abs(positions - target), axis=0)
 
 
def compute_settling_time(
    timestamps: np.ndarray,
    positions: np.ndarray,
    target: np.ndarray,
    threshold: float = 0.01,
) -> float:
    """Time after which all joints stay within threshold of target.
 
    Args:
        timestamps: (N,) monotonic timestamps in seconds
        positions: (N, n_joints) joint positions
        target: (n_joints,) target positions
        threshold: max acceptable error in radians
 
    Returns: settling time in seconds (or max timestamp if never settled)
    """
    errors = np.abs(positions - target)
    all_settled = np.all(errors < threshold, axis=1)
 
    # Find last index where NOT settled, settling time is just after
    not_settled = np.where(~all_settled)[0]
    if len(not_settled) == 0:
        return 0.0  # always settled
    last_unsettled = not_settled[-1]
    if last_unsettled >= len(timestamps) - 1:
        return timestamps[-1] - timestamps[0]  # never settled
    return timestamps[last_unsettled + 1] - timestamps[0]
 
 
def compute_cost(
    ss_errors: np.ndarray,
    settling_time: float,
    w1: float = W1,
    w2: float = W2,
) -> float:
    """Weighted cost: w1 * sum(errors) + w2 * settling_time."""
    return w1 * float(np.sum(ss_errors)) + w2 * settling_time
 
 
def sample_gains(trial) -> dict:
    """Sample PID gains for all 6 joints from Optuna trial.
 
    Returns: {"joint_1": {"p": ..., "i": ..., "d": ..., "i_clamp": ...}, ...}
    """
    gains = {}
    for jname in JOINT_NAMES:
        gains[jname] = {}
        for param, (lo, hi) in BOUNDS.items():
            gains[jname][param] = trial.suggest_float(
                f"{jname}_{param}", lo, hi, log=True
            )
    return gains
 
 
class OptunaOptimizer:
    """Runs Optuna trials, communicating with the sim via Zenoh."""
 
    def __init__(self, zenoh_connect: str, db_path: str):
        import zenoh
 
        self._lock = threading.Lock()
        self._status = None
        self._joint_positions = []
        self._joint_timestamps = []
 
        conf = zenoh.Config()
        if zenoh_connect:
            conf.insert_json5("connect/endpoints", json.dumps([zenoh_connect]))
        self._session = zenoh.open(conf)
 
        self._gains_pub = self._session.declare_publisher("ar4/tuning/gains")
        self._cmd_pub = self._session.declare_publisher("ar4/tuning/command")
        self._session.declare_subscriber("ar4/tuning/status", self._on_status)
        self._session.declare_subscriber("ar4/joint_states", self._on_joint_states)
 
        self._db_path = db_path
 
    def _on_status(self, sample):
        try:
            msg = json.loads(bytes(sample.payload).decode())
            with self._lock:
                self._status = msg.get("status", "")
        except (json.JSONDecodeError, UnicodeDecodeError):
            pass
 
    def _on_joint_states(self, sample):
        """Receive joint states (CDR-encoded sensor_msgs/JointState)."""
        try:
            from pycdr2 import deserialize
            from pycdr2.types import float64, sequence, uint32
 
            # Simplified: try JSON first (bridge may publish JSON),
            # fall back to CDR
            payload = bytes(sample.payload)
            try:
                msg = json.loads(payload.decode())
                positions = msg.get("position", [])
                ts = msg.get("ts", time.time())
            except (json.JSONDecodeError, UnicodeDecodeError):
                # CDR deserialization for sensor_msgs/JointState
                # Skip the first fields and extract positions
                # This is a simplified parser — full CDR needs pycdr2
                return
 
            if len(positions) >= 8:
                # Skip gripper joints (first 2), take arm joints (next 6)
                arm_positions = positions[2:8]
                with self._lock:
                    self._joint_positions.append(arm_positions)
                    self._joint_timestamps.append(ts)
        except Exception:
            pass
 
    def _send_command(self, action: str):
        self._cmd_pub.put(json.dumps({"action": action}).encode())
 
    def _send_gains(self, gains: dict):
        self._gains_pub.put(json.dumps(gains).encode())
 
    def _wait_for_status(self, expected: str, timeout: float = 30.0) -> bool:
        start = time.time()
        while time.time() - start < timeout:
            with self._lock:
                if self._status == expected:
                    return True
            time.sleep(0.1)
        return False
 
    def _clear_observations(self):
        with self._lock:
            self._joint_positions.clear()
            self._joint_timestamps.clear()
 
    def _get_observations(self):
        with self._lock:
            return (
                np.array(self._joint_positions.copy()),
                np.array(self._joint_timestamps.copy()),
            )
 
    def objective(self, trial) -> float:
        """Single Optuna trial: set gains, hold home, move to reach, measure."""
        gains = sample_gains(trial)
 
        # 1. Send gains and reconfigure controller
        self._send_gains(gains)
        time.sleep(0.5)
        self._send_command("reconfigure")
        if not self._wait_for_status("ready", timeout=30.0):
            return 1e6  # penalty for failed reconfigure
 
        # 2. Phase 1: Home hold — observe for ~10s sim time
        self._send_command("move_to_home")
        self._clear_observations()
        time.sleep(15.0)  # ~10s sim time at ~60% RT factor
 
        positions_home, ts_home = self._get_observations()
        if len(positions_home) < 5:
            return 1e6  # not enough data
 
        home_ss_error = compute_steady_state_error(
            positions_home[-10:], HOME_POSE
        )
 
        # 3. Phase 2: Move to reach pose
        self._clear_observations()
        self._send_command("move_to_reach")
        time.sleep(25.0)  # ~20s sim time
 
        positions_reach, ts_reach = self._get_observations()
        if len(positions_reach) < 5:
            return 1e6
 
        reach_ss_error = compute_steady_state_error(
            positions_reach[-10:], REACH_POSE
        )
        settling = compute_settling_time(
            ts_reach, positions_reach, REACH_POSE, threshold=0.01
        )
 
        # 4. Compute combined cost
        total_ss = home_ss_error.sum() + reach_ss_error.sum()
        cost = compute_cost(
            np.concatenate([home_ss_error, reach_ss_error]),
            settling,
        )
 
        # Log per-joint errors for analysis
        trial.set_user_attr("home_ss_error", home_ss_error.tolist())
        trial.set_user_attr("reach_ss_error", reach_ss_error.tolist())
        trial.set_user_attr("settling_time", settling)
 
        return cost
 
    def run(self, n_trials: int = 300):
        """Run the Optuna study."""
        import optuna
 
        os.makedirs(os.path.dirname(self._db_path) or ".", exist_ok=True)
        storage = f"sqlite:///{self._db_path}"
        study = optuna.create_study(
            study_name="ar4_pid_tuning",
            storage=storage,
            direction="minimize",
            load_if_exists=True,
        )
 
        study.optimize(self.objective, n_trials=n_trials)
 
        # Write best gains
        best = study.best_trial
        best_gains = sample_gains(best)  # won't work — need to reconstruct
        # Reconstruct from params
        best_gains = {}
        for jname in JOINT_NAMES:
            best_gains[jname] = {
                param: best.params[f"{jname}_{param}"]
                for param in BOUNDS
            }
 
        best_path = os.path.join(os.path.dirname(self._db_path), "best_gains.yaml")
        import yaml
        with open(best_path, "w") as f:
            yaml.dump(best_gains, f, default_flow_style=False)
 
        print(f"\nBest cost: {study.best_value:.6f}")
        print(f"Best gains saved to: {best_path}")
        return study
 
 
def main():
    parser = argparse.ArgumentParser(description="Optuna PID Optimizer")
    parser.add_argument("--connect", default="tcp/localhost:7447",
                        help="Zenoh router endpoint")
    parser.add_argument("--n-trials", type=int, default=300,
                        help="Number of Optuna trials")
    parser.add_argument("--db", default="data/tuning/optuna_study.db",
                        help="SQLite database path for Optuna study")
    args = parser.parse_args()
 
    optimizer = OptunaOptimizer(args.connect, args.db)
    optimizer.run(n_trials=args.n_trials)
 
 
if __name__ == "__main__":
    main()

Step 4: Run unit tests

Run: pip install optuna numpy && python -m pytest tests/test_pid_optimizer.py -v 2>&1 | tail -10

Expected: 4 tests PASS.

Step 5: Commit

git add ar4_skills/tuning/pid_optimizer.py tests/test_pid_optimizer.py
git commit -m "feat: add Optuna PID optimizer with cost function and TPE sampling"

Task 5: Optimizer Dockerfile

Files:

  • Create: docker/Dockerfile.tuning
  • Create: ar4_skills/tuning/requirements.txt

Step 1: Create requirements file

Create ar4_skills/tuning/requirements.txt:

optuna>=4.0
eclipse-zenoh>=1.0.0
pycdr2>=0.4.0
numpy>=1.26
pyyaml>=6.0

Step 2: Create the Dockerfile

Create docker/Dockerfile.tuning:

FROM python:3.12-slim
 
WORKDIR /app
 
# Install dependencies
COPY ar4_skills/tuning/requirements.txt /tmp/requirements.txt
RUN pip install --no-cache-dir -r /tmp/requirements.txt
 
# Copy tuning code
COPY ar4_skills/tuning/ /app/ar4_skills/tuning/
COPY ar4_skills/__init__.py /app/ar4_skills/__init__.py
 
ENTRYPOINT ["python", "-m", "ar4_skills.tuning.pid_optimizer"]

Step 3: Add docker-compose service

Append to docker-compose.yaml:

  # Optuna PID tuner (standalone, Zenoh-only, no ROS)
  pid-tuner:
    build:
      context: .
      dockerfile: docker/Dockerfile.tuning
    network_mode: host
    ipc: host
    command: --connect tcp/localhost:7447 --n-trials 300 --db /data/tuning/optuna_study.db
    volumes:
      - ./data/tuning:/data/tuning:rw
    depends_on:
      - zenoh-router

Step 4: Build and verify

Run: docker compose build pid-tuner 2>&1 | tail -5

Expected: Successful build.

Step 5: Commit

git add docker/Dockerfile.tuning ar4_skills/tuning/requirements.txt docker-compose.yaml
git commit -m "feat: add standalone Optuna PID tuner Docker image"

Task 6: Integration Test — Full Pipeline

Files:

  • Create: scripts/run_pid_tuning.sh

Step 1: Create the run script

Create scripts/run_pid_tuning.sh:

#!/usr/bin/env bash
# Run the full Optuna PID tuning pipeline.
#
# Usage:
#   ./scripts/run_pid_tuning.sh [N_TRIALS]
#
# Prerequisites:
#   docker compose build overlay pid-tuner
 
set -euo pipefail
 
N_TRIALS=${1:-300}
 
echo "=== Starting Zenoh router ==="
docker compose up -d zenoh-router
sleep 3
 
echo "=== Starting headless simulation ==="
docker compose up -d sim-tabletop-headless
echo "Waiting 60s for Gazebo to initialize..."
sleep 60
 
echo "=== Starting Zenoh DDS bridge ==="
docker compose up -d zenoh-bridge
sleep 5
 
echo "=== Starting tuning bridge node ==="
docker compose exec -d sim-tabletop-headless \
    bash -c "source /entrypoint.sh && python -m ar4_skills.tuning.tuning_bridge_node --connect tcp/localhost:7447"
sleep 3
 
echo "=== Starting Optuna optimizer ($N_TRIALS trials) ==="
docker compose run --rm pid-tuner \
    --connect tcp/localhost:7447 \
    --n-trials "$N_TRIALS" \
    --db /data/tuning/optuna_study.db
 
echo "=== Tuning complete ==="
echo "Best gains: data/tuning/best_gains.yaml"
echo "Study DB: data/tuning/optuna_study.db"
 
# Cleanup
docker compose down zenoh-router zenoh-bridge sim-tabletop-headless

Step 2: Make executable

Run: chmod +x scripts/run_pid_tuning.sh

Step 3: Smoke test with 1 trial

Run: ./scripts/run_pid_tuning.sh 1

Expected: One trial completes, data/tuning/best_gains.yaml is written.

Step 4: Commit

git add scripts/run_pid_tuning.sh
git commit -m "feat: add run script for Optuna PID tuning pipeline"

Task 7: Write Best Gains to Controller Config

After the Optuna study completes (or whenever results look good):

Step 1: Review best gains

Run: cat data/tuning/best_gains.yaml

Step 2: Update controllers_gravity.yaml with the best gains

Manually copy the best values from data/tuning/best_gains.yaml into ar4_skills/config/controllers_gravity.yaml, replacing the current gains: section. Update the comment to say "tuning round 4 (Optuna-optimized)".

Step 3: Verify in simulation

Run: docker compose up sim-tabletop-headless and check joint states as we did earlier.

Step 4: Commit

git add ar4_skills/config/controllers_gravity.yaml
git commit -m "feat: apply Optuna-optimized PID gains (round 4)"

Dependency Order

Task 1 (Zenoh config)
  └─> Task 2 (Docker infra)
        ├─> Task 3 (Bridge node)
        │     └─> Task 6 (Integration)
        └─> Task 4 (Optimizer)
              └─> Task 5 (Dockerfile)
                    └─> Task 6 (Integration)
                          └─> Task 7 (Apply results)

Tasks 3 and 4 can be developed in parallel since they have independent unit tests.