Skip to content
Get SDK Access

Button Click Example (Python)

This is a small Python script that connects to an RCSP server and subscribes to the ButtonPushed device event of a connected device. Each time the device button is pressed it prints the press type (SinglePress, DoublePress, or LongPress). It is intentionally minimal — its job is to make the wire format and the event payload easy to read in code, alongside the RCSP specification.

The flow is:

  1. Open a TCP socket to the RCSP server (default localhost:45451).
  2. If --device-id is not given, send ListDevices and pick the first one.
  3. DeviceSubscribe to the DeviceEvents / ButtonPushed topic.
  4. Loop, printing each ButtonPushed event until interrupted (Ctrl-C).
  5. DeviceUnsubscribe on the way out.

The full script is available in the SDK’s examples/python folder as rcsp_button_click.py and also written out below.

To run the script, navigate to $ROKOKO_SDK_HOME/examples/python in a terminal, and execute:

Terminal window
python3 rcsp_button_click.py

This subscribes to button presses on the first connected device. To target a specific device, pass --device-id <id>. Press the device button to see events; press Ctrl-C to stop.

"""Minimal RCSP ButtonPushed example.
This script is a small, self-contained walkthrough of subscribing to a
device event from the Rokoko Command Server Protocol (see /reference/rcsp
on the docs site). It mirrors the style of `rcsp_cmd_example.py`: all
protocol code lives inline so the wire format and command envelope can be
read alongside the spec.
`ButtonPushed` is a *device-specific* publisher, so unlike the non-device
`Subscribe` command it is subscribed via `DeviceSubscribe`, which requires
a `DeviceId`. The flow is:
1. Open a TCP socket to the RCSP server (default localhost:45451).
2. If --device-id is not given, run `ListDevices` and pick the first one.
3. `DeviceSubscribe` to the `DeviceEvents` / `ButtonPushed` topic.
4. Loop, printing each ButtonPushed event until interrupted (Ctrl-C).
5. `DeviceUnsubscribe` on the way out.
Run it:
python3 rcsp_button_click.py
python3 rcsp_button_click.py --host localhost --device-id 1
"""
from __future__ import annotations
import argparse
import enum
import itertools
import json
import socket
import sys
from typing import Any
# ── Protocol constants ────────────────────────────────────────────────────────
# These mirror the `struct header` definition in the "Message structure"
# section of the RCSP spec. Every message on the wire is an 8-byte header
# followed by a JSON payload.
DEFAULT_PORT = 45451 # The server listens here.
HEADER_MARKER = 0xDC # Sentinel byte ("DeviceCommands"); every header starts with it.
HEADER_VERSION = 1 # Binary header format version.
HEADER_SIZE = 8 # Total bytes in the header.
# The device event we care about (device-specific publisher / topic).
DEVICE_EVENTS_PUBLISHER = "DeviceEvents"
BUTTON_PUSHED_TOPIC = "ButtonPushed"
class PayloadType(enum.IntEnum):
"""The four payload kinds the protocol carries (the header's `type` field)."""
COMMAND = 1 # client → server (always elicits exactly one response)
RESPONSE_OK = 2 # server → client
RESPONSE_ERROR = 3 # server → client
EVENT = 4 # server → client (pub/sub; may interleave with responses)
# ── Binary header ─────────────────────────────────────────────────────────────
# Layout (all little-endian):
# marker : uint8 (always 0xDC)
# header_version : uint8 (currently 1)
# header_size : uint8 (== 8)
# type : uint8 (PayloadType)
# payload_size : uint32 (length of the JSON body that follows)
def build_header(payload_type: PayloadType, payload_size: int) -> bytes:
return bytes([HEADER_MARKER, HEADER_VERSION, HEADER_SIZE, int(payload_type)]) \
+ payload_size.to_bytes(4, byteorder="little")
def parse_header(header: bytes) -> tuple[PayloadType, int]:
if len(header) != HEADER_SIZE:
raise RuntimeError(f"Short header: got {len(header)} bytes, expected {HEADER_SIZE}")
if header[0] != HEADER_MARKER:
raise RuntimeError(f"Bad marker: 0x{header[0]:02x} (expected 0x{HEADER_MARKER:02x})")
return PayloadType(header[3]), int.from_bytes(header[4:8], byteorder="little")
# ── Wire framing ──────────────────────────────────────────────────────────────
# TCP is a raw byte stream — it has no message boundaries of its own. We use
# the header's `payload_size` field to know exactly how many bytes of JSON to
# read for each message.
def _recv_exact(sock: socket.socket, nbytes: int) -> bytes:
"""Read exactly `nbytes` from the socket, looping until the buffer is full."""
buf = bytearray()
while len(buf) < nbytes:
chunk = sock.recv(nbytes - len(buf))
if not chunk:
raise ConnectionError("Server closed the connection mid-message")
buf.extend(chunk)
return bytes(buf)
def send_message(sock: socket.socket, payload_type: PayloadType, payload: dict[str, Any]) -> None:
body = json.dumps(payload).encode()
sock.sendall(build_header(payload_type, len(body)) + body)
def recv_message(sock: socket.socket) -> tuple[PayloadType, dict[str, Any]]:
payload_type, payload_size = parse_header(_recv_exact(sock, HEADER_SIZE))
payload = json.loads(_recv_exact(sock, payload_size).decode())
return payload_type, payload
# ── Command envelope ──────────────────────────────────────────────────────────
# Every command is wrapped in the `rcsp_cmd` object from the spec:
#
# { "Command": <str>, "TrackId": <str>, "Version": <num>, "Arguments": <opt obj> }
#
# The server echoes `TrackId` in the matching response so a client can pair
# requests with replies. We just use a monotonically increasing counter.
_track_ids = itertools.count(1)
def make_command(name: str, arguments: dict[str, Any] | None = None) -> dict[str, Any]:
cmd: dict[str, Any] = {
"Command": name,
"TrackId": str(next(_track_ids)),
"Version": 1,
}
if arguments is not None:
cmd["Arguments"] = arguments
return cmd
# ── Round-trip helper ─────────────────────────────────────────────────────────
# Per the spec, every command MUST get exactly one response (ok or error), and
# responses arrive in the same order commands were sent. Events MAY arrive
# between a command and its response, so we skip them while waiting.
def send_command(sock: socket.socket, name: str,
arguments: dict[str, Any] | None = None) -> dict[str, Any]:
"""Send a command and return the server's `Response` object.
Raises RuntimeError on a `response_error` payload.
"""
send_message(sock, PayloadType.COMMAND, make_command(name, arguments))
while True:
ptype, payload = recv_message(sock)
if ptype == PayloadType.EVENT:
continue
if ptype == PayloadType.RESPONSE_ERROR:
err = payload.get("Error", {})
raise RuntimeError(f"{name} failed: {err.get('Code')}: {err.get('Message')}")
if ptype == PayloadType.RESPONSE_OK:
# An "Ok with no data" response may omit "Response" entirely.
return payload.get("Response", {})
raise RuntimeError(f"Unexpected payload type from server: {ptype!r}")
# ── Event handling ──────────────────────────────────────────────────────────
# A ButtonPushed event arrives as an `event` payload wrapping the spec's
# event object:
#
# { "Publisher": <str>, "Topic": <str>, "EventData": <obj> }
#
# For DeviceEvents/ButtonPushed the EventData is `rcsp_device_event_data`,
# whose "ButtonPushType" is one of "SinglePress", "DoublePress", "LongPress".
def handle_button_event(event: dict[str, Any]) -> None:
if event.get("Publisher") != DEVICE_EVENTS_PUBLISHER:
return
if event.get("Topic") != BUTTON_PUSHED_TOPIC:
return
data = event.get("EventData", {}) or {}
push_type = data.get("ButtonPushType", "<unknown>")
device_id = data.get("DeviceId")
print(f"Button pushed on DeviceId={device_id}: {push_type}")
# ── Demo ──────────────────────────────────────────────────────────────────────
def main(host: str, port: int, device_id: int | None) -> int:
with socket.create_connection((host, port), timeout=5.0) as sock:
print(f"Connected to RCSP server at {host}:{port}\n")
if device_id is None:
devices_resp = send_command(sock, "ListDevices")
devices = devices_resp.get("Devices", [])
if not devices:
print("No devices connected — nothing to subscribe to.")
return 0
device_id = devices[0]["DeviceId"]
print(f"Using first device: DeviceId={device_id}")
# ButtonPushed is a device-specific publisher, so we use DeviceSubscribe
# and pass the DeviceId alongside the publisher/topic list.
subscription = {
"DeviceId": device_id,
"Publishers": [
{"Publisher": DEVICE_EVENTS_PUBLISHER, "Topics": [BUTTON_PUSHED_TOPIC]}
],
}
send_command(sock, "DeviceSubscribe", subscription)
print(f"Subscribed to {DEVICE_EVENTS_PUBLISHER}/{BUTTON_PUSHED_TOPIC}.")
print("Press the device button to see events. Ctrl-C to stop.\n")
# Events arrive unsolicited from now on. Drop the read timeout so we
# can sit and wait for button presses for as long as the user likes.
sock.settimeout(None)
try:
while True:
ptype, payload = recv_message(sock)
if ptype == PayloadType.EVENT:
handle_button_event(payload)
# Any stray responses (none expected here) are simply ignored.
except KeyboardInterrupt:
print("\nStopping...")
finally:
try:
send_command(sock, "DeviceUnsubscribe", subscription)
except Exception as e:
print(f"DeviceUnsubscribe failed (ignored): {e}", file=sys.stderr)
return 0
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Minimal RCSP ButtonPushed example.")
parser.add_argument("--host", default="localhost",
help="RCSP server host (default: localhost)")
parser.add_argument("--port", type=int, default=DEFAULT_PORT,
help=f"RCSP server port (default: {DEFAULT_PORT})")
parser.add_argument("--device-id", type=int, default=None,
help="DeviceId to subscribe to (default: first device returned by ListDevices)")
args = parser.parse_args()
sys.exit(main(args.host, args.port, args.device_id))