Set LED Color Example (Python)
This is a small Python script that connects to an RCSP server and sets the
RGB color of an LED on a connected device via the SetColorLed device
command. It is intentionally minimal — its job is to make the wire format
and the command envelope easy to read in code, alongside the
RCSP specification.
The flow is:
- Open a TCP socket to the RCSP server (default
localhost:45451). - If
--device-idis not given, sendListDevicesand pick the first one. - Send a
SetColorLeddevice command with the chosenLedIdand RGB values.
Download
Section titled “Download”The full script is available in the SDK’s examples/python folder as
rcsp_set_color_led_example.py and also written out below.
To run the script, navigate to $ROKOKO_SDK_HOME/examples/python in a terminal, and execute:
python3 rcsp_set_color_led_example.py 255 0 0This lights the RingLed of the first connected device red. To target a
specific device, pass --device-id <id>; to set a different LED, pass
--led-id <name>. The three positional arguments are the Red, Green, and
Blue components, each an integer between 0 and 255.
The full script
Section titled “The full script”"""Minimal RCSP SetColorLed example.
This script is a small, self-contained walkthrough of the `SetColorLed`device command from the Rokoko Command Server Protocol (see /reference/rcspon the docs site). It mirrors the style of `rcsp_cmd_example.py`: allprotocol code lives inline so the wire format and command envelope can beread alongside the spec.
1. Open a TCP socket to the RCSP server (default localhost:45451). 2. If --device-id is not given, run `ListDevices` and pick the first one. 3. Send `SetColorLed` with {DeviceId, LedId, Red, Green, Blue}.
Run it:
python3 rcsp_set_color_led_example.py 255 0 0 # red on first device python3 rcsp_set_color_led_example.py --device-id 1 0 255 0"""
from __future__ import annotations
import argparseimport enumimport itertoolsimport jsonimport socketimport sysfrom typing import Any
# ── Protocol constants ────────────────────────────────────────────────────────# These mirror the `struct header` definition in the "Message structure"# section of the RCSP spec. Every message on the wire is an 8-byte header# followed by a JSON payload.
DEFAULT_PORT = 45451 # The server listens here.HEADER_MARKER = 0xDC # Sentinel byte ("DeviceCommands"); every header starts with it.HEADER_VERSION = 1 # Binary header format version.HEADER_SIZE = 8 # Total bytes in the header.
class PayloadType(enum.IntEnum): """The four payload kinds the protocol carries (the header's `type` field).""" COMMAND = 1 # client → server (always elicits exactly one response) RESPONSE_OK = 2 # server → client RESPONSE_ERROR = 3 # server → client EVENT = 4 # server → client (pub/sub; may interleave with responses)
# ── Binary header ─────────────────────────────────────────────────────────────# Layout (all little-endian):# marker : uint8 (always 0xDC)# header_version : uint8 (currently 1)# header_size : uint8 (== 8)# type : uint8 (PayloadType)# payload_size : uint32 (length of the JSON body that follows)
def build_header(payload_type: PayloadType, payload_size: int) -> bytes: return bytes([HEADER_MARKER, HEADER_VERSION, HEADER_SIZE, int(payload_type)]) \ + payload_size.to_bytes(4, byteorder="little")
def parse_header(header: bytes) -> tuple[PayloadType, int]: if len(header) != HEADER_SIZE: raise RuntimeError(f"Short header: got {len(header)} bytes, expected {HEADER_SIZE}") if header[0] != HEADER_MARKER: raise RuntimeError(f"Bad marker: 0x{header[0]:02x} (expected 0x{HEADER_MARKER:02x})") return PayloadType(header[3]), int.from_bytes(header[4:8], byteorder="little")
# ── Wire framing ──────────────────────────────────────────────────────────────# TCP is a raw byte stream — it has no message boundaries of its own. We use# the header's `payload_size` field to know exactly how many bytes of JSON to# read for each message.
def _recv_exact(sock: socket.socket, nbytes: int) -> bytes: """Read exactly `nbytes` from the socket, looping until the buffer is full.""" buf = bytearray() while len(buf) < nbytes: chunk = sock.recv(nbytes - len(buf)) if not chunk: raise ConnectionError("Server closed the connection mid-message") buf.extend(chunk) return bytes(buf)
def send_message(sock: socket.socket, payload_type: PayloadType, payload: dict[str, Any]) -> None: body = json.dumps(payload).encode() sock.sendall(build_header(payload_type, len(body)) + body)
def recv_message(sock: socket.socket) -> tuple[PayloadType, dict[str, Any]]: payload_type, payload_size = parse_header(_recv_exact(sock, HEADER_SIZE)) payload = json.loads(_recv_exact(sock, payload_size).decode()) return payload_type, payload
# ── Command envelope ──────────────────────────────────────────────────────────# Every command is wrapped in the `rcsp_cmd` object from the spec:## { "Command": <str>, "TrackId": <str>, "Version": <num>, "Arguments": <opt obj> }## The server echoes `TrackId` in the matching response so a client can pair# requests with replies. We just use a monotonically increasing counter.
_track_ids = itertools.count(1)
def make_command(name: str, arguments: dict[str, Any] | None = None) -> dict[str, Any]: cmd: dict[str, Any] = { "Command": name, "TrackId": str(next(_track_ids)), "Version": 1, } if arguments is not None: cmd["Arguments"] = arguments return cmd
# ── Round-trip helper ─────────────────────────────────────────────────────────# Per the spec, every command MUST get exactly one response (ok or error), and# responses arrive in the same order commands were sent. Events MAY arrive# between a command and its response, so we skip them while waiting.
def send_command(sock: socket.socket, name: str, arguments: dict[str, Any] | None = None) -> dict[str, Any]: """Send a command and return the server's `Response` object.
Raises RuntimeError on a `response_error` payload. """ send_message(sock, PayloadType.COMMAND, make_command(name, arguments))
while True: ptype, payload = recv_message(sock) if ptype == PayloadType.EVENT: continue if ptype == PayloadType.RESPONSE_ERROR: err = payload.get("Error", {}) raise RuntimeError(f"{name} failed: {err.get('Code')}: {err.get('Message')}") if ptype == PayloadType.RESPONSE_OK: # An "Ok with no data" response may omit "Response" entirely. return payload.get("Response", {}) raise RuntimeError(f"Unexpected payload type from server: {ptype!r}")
# ── Demo ──────────────────────────────────────────────────────────────────────
def _color_component(value: str) -> int: n = int(value) if not 0 <= n <= 255: raise argparse.ArgumentTypeError(f"color component must be 0..255, got {n}") return n
def main(host: str, port: int, device_id: int | None, led_id: str, red: int, green: int, blue: int) -> int: with socket.create_connection((host, port), timeout=5.0) as sock: print(f"Connected to RCSP server at {host}:{port}\n")
if device_id is None: devices_resp = send_command(sock, "ListDevices") devices = devices_resp.get("Devices", []) if not devices: print("No devices connected — nothing to do.") return 0 device_id = devices[0]["DeviceId"] print(f"Using first device: DeviceId={device_id}")
print(f"Setting {led_id} on DeviceId={device_id} to RGB=({red}, {green}, {blue})") send_command(sock, "SetColorLed", { "DeviceId": device_id, "LedId": led_id, "Red": red, "Green": green, "Blue": blue, }) print("Ok.")
return 0
if __name__ == "__main__": parser = argparse.ArgumentParser(description="Minimal RCSP SetColorLed example.") parser.add_argument("--host", default="localhost", help="RCSP server host (default: localhost)") parser.add_argument("--port", type=int, default=DEFAULT_PORT, help=f"RCSP server port (default: {DEFAULT_PORT})") parser.add_argument("--device-id", type=int, default=None, help="DeviceId to target (default: first device returned by ListDevices)") parser.add_argument("--led-id", default="RingLed", help="LedId to set (default: RingLed)") parser.add_argument("red", type=_color_component, help="Red component (0..255)") parser.add_argument("green", type=_color_component, help="Green component (0..255)") parser.add_argument("blue", type=_color_component, help="Blue component (0..255)") args = parser.parse_args()
sys.exit(main(args.host, args.port, args.device_id, args.led_id, args.red, args.green, args.blue))