Source code for tuoni.TuoniWebSocket

import json
import random
import string
import ssl
import threading

try:
    import websocket
    _WEBSOCKET_AVAILABLE = True
except ImportError:
    _WEBSOCKET_AVAILABLE = False


[docs] class TuoniWebSocket: """ Real-time WebSocket client for the Tuoni server using SockJS/STOMP. Subscribes to server-pushed events (agent checkins, command results, listener changes, etc.) without polling. Requires the ``websocket-client`` package. Attributes: TOPIC_AGENT (str): New agent registered. TOPIC_AGENT_HEARTBEAT (str): Agent heartbeat/checkin. TOPIC_AGENT_ACTIVE (str): Agent active-status change. TOPIC_AGENT_METADATA (str): Agent metadata changed. TOPIC_COMMAND_SENT (str): Command dispatched to agent. TOPIC_COMMAND_RESULT (str): Command result received. TOPIC_COMMAND_TEMPLATE_NEW (str): New command template available. TOPIC_COMMAND_TEMPLATE_REMOVED (str): Command template removed. TOPIC_LISTENER_NEW (str): New listener created. TOPIC_LISTENER_REMOVED (str): Listener deleted. TOPIC_LISTENER_MODIFIED (str): Listener configuration changed. TOPIC_PAYLOAD_NEW (str): New payload created. TOPIC_PAYLOAD_ARCHIVED (str): Payload archived. TOPIC_USER_NEW (str): New user created. TOPIC_USER_MODIFIED (str): User modified. Examples: >>> ws = tuoni_server.connect_websocket() >>> results = [] >>> ws.on_command_result(lambda data: results.append(data)) >>> agent.send_command(TuoniCommandLs(".", 1)) >>> # callback fires when agent returns the result """ TOPIC_AGENT = "/topic/agent" TOPIC_AGENT_HEARTBEAT = "/topic/agent/heartbeat" TOPIC_AGENT_ACTIVE = "/topic/agent/active" TOPIC_AGENT_METADATA = "/topic/agent/metadata" TOPIC_COMMAND_SENT = "/topic/command/sent" TOPIC_COMMAND_RESULT = "/topic/command/result" TOPIC_COMMAND_TEMPLATE_NEW = "/topic/command-templates/new" TOPIC_COMMAND_TEMPLATE_REMOVED = "/topic/command-templates/removed" TOPIC_LISTENER_NEW = "/topic/listener/new" TOPIC_LISTENER_REMOVED = "/topic/listener/removed" TOPIC_LISTENER_MODIFIED = "/topic/listener/modified" TOPIC_PAYLOAD_NEW = "/topic/payload/new" TOPIC_PAYLOAD_ARCHIVED = "/topic/payload/archived" TOPIC_USER_NEW = "/topic/users/new" TOPIC_USER_MODIFIED = "/topic/users/modified" def __init__(self, c2): """ Constructor. Args: c2 (TuoniC2): The server connection to attach to. """ if not _WEBSOCKET_AVAILABLE: raise ImportError( "websocket-client is required for WebSocket support. " "Install it with: pip install websocket-client" ) self.c2 = c2 self._ws = None self._stomp_connected = False self._callbacks = {} # topic -> [callables] self._subscriptions = {} # topic -> sub_id string self._sub_counter = 0 self._lock = threading.Lock() self._connect_event = threading.Event() def _build_ws_url(self): base = self.c2._url if base.startswith("https://"): ws_base = "wss://" + base[8:] elif base.startswith("http://"): ws_base = "ws://" + base[7:] else: ws_base = base server_id = str(random.randint(0, 999)).zfill(3) session_id = "".join(random.choices(string.ascii_lowercase + string.digits, k=8)) return f"{ws_base}/events/{server_id}/{session_id}/websocket"
[docs] def connect(self, timeout: float = 10) -> bool: """ Connect to the server's WebSocket endpoint and complete the STOMP handshake. Args: timeout (float): Seconds to wait for the STOMP CONNECTED frame. Returns: bool: True if the STOMP session was established within ``timeout`` seconds. Examples: >>> ws = TuoniWebSocket(tuoni_server) >>> if not ws.connect(): >>> print("WebSocket connection failed") """ url = self._build_ws_url() headers = [f"Authorization: Bearer {self.c2._token}"] sslopt = {} if not self.c2._verify: sslopt = {"cert_reqs": ssl.CERT_NONE} self._ws = websocket.WebSocketApp( url, header=headers, on_open=self._on_open, on_message=self._on_message, on_error=self._on_error, on_close=self._on_close, ) t = threading.Thread( target=self._ws.run_forever, kwargs={"sslopt": sslopt}, daemon=True, ) t.start() self._connect_event.wait(timeout=timeout) return self._stomp_connected
# ------------------------------------------------------------------ # websocket-client callbacks # ------------------------------------------------------------------ def _on_open(self, ws): pass # Wait for SockJS 'o' frame before sending STOMP CONNECT def _on_message(self, ws, message): if not message: return msg_type = message[0] if msg_type == "o": # SockJS open — send STOMP CONNECT frame = ( "CONNECT\n" f"Authorization:Bearer {self.c2._token}\n" "accept-version:1.1,1.0\n" "heart-beat:0,0\n" "\n\x00" ) ws.send(json.dumps([frame])) elif msg_type == "h": pass # SockJS heartbeat elif msg_type == "a": try: frames = json.loads(message[1:]) for frame in frames: self._handle_stomp_frame(frame) except Exception: pass elif msg_type == "c": with self._lock: self._stomp_connected = False def _on_error(self, ws, error): self._connect_event.set() def _on_close(self, ws, status_code, msg): with self._lock: self._stomp_connected = False # ------------------------------------------------------------------ # STOMP helpers # ------------------------------------------------------------------ def _parse_stomp_frame(self, frame_str): if "\x00" in frame_str: frame_str = frame_str[: frame_str.index("\x00")] lines = frame_str.split("\n") if not lines: return "", {}, "" frame_type = lines[0].strip() headers = {} body_start = len(lines) for i in range(1, len(lines)): line = lines[i] if line == "": body_start = i + 1 break if ":" in line: key, _, value = line.partition(":") headers[key.strip()] = value.strip() body = "\n".join(lines[body_start:]) if body_start < len(lines) else "" return frame_type, headers, body def _send_stomp_subscribe(self, topic, sub_id): try: frame = f"SUBSCRIBE\nid:{sub_id}\ndestination:{topic}\n\n\x00" self._ws.send(json.dumps([frame])) except Exception: pass def _handle_stomp_frame(self, frame_str): frame_type, headers, body = self._parse_stomp_frame(frame_str) if frame_type == "CONNECTED": with self._lock: for topic, sub_id in list(self._subscriptions.items()): self._send_stomp_subscribe(topic, sub_id) self._stomp_connected = True self._connect_event.set() elif frame_type == "MESSAGE": destination = headers.get("destination", "") try: data = json.loads(body) if body.strip() else None except Exception: data = body with self._lock: callbacks = list(self._callbacks.get(destination, [])) for cb in callbacks: try: threading.Thread(target=cb, args=(data,), daemon=True).start() except Exception: pass elif frame_type == "ERROR": self._connect_event.set() # ------------------------------------------------------------------ # Public subscription API # ------------------------------------------------------------------
[docs] def on_event(self, topic: str, callback): """ Subscribe to a STOMP topic and register a callback. If already connected the subscription is sent immediately; otherwise it is queued and sent once the STOMP session is established. Args: topic (str): STOMP topic (e.g. ``TuoniWebSocket.TOPIC_COMMAND_RESULT``). callback (callable): Called with the deserialized JSON payload when a message arrives. Runs in a daemon thread. Examples: >>> ws.on_event(TuoniWebSocket.TOPIC_AGENT, lambda data: print("new agent:", data)) """ with self._lock: if topic not in self._callbacks: self._callbacks[topic] = [] sub_id = f"sub-{self._sub_counter}" self._sub_counter += 1 self._subscriptions[topic] = sub_id if self._stomp_connected: self._send_stomp_subscribe(topic, sub_id) self._callbacks[topic].append(callback)
[docs] def on_new_agent(self, callback): """Register a callback for new agent registrations.""" self.on_event(self.TOPIC_AGENT, callback)
[docs] def on_agent_heartbeat(self, callback): """Register a callback for agent heartbeats.""" self.on_event(self.TOPIC_AGENT_HEARTBEAT, callback)
[docs] def on_agent_active_change(self, callback): """Register a callback for agent active-status changes.""" self.on_event(self.TOPIC_AGENT_ACTIVE, callback)
[docs] def on_command_result(self, callback): """Register a callback for command results.""" self.on_event(self.TOPIC_COMMAND_RESULT, callback)
[docs] def on_command_sent(self, callback): """Register a callback for commands dispatched to agents.""" self.on_event(self.TOPIC_COMMAND_SENT, callback)
[docs] def on_listener_new(self, callback): """Register a callback for new listeners.""" self.on_event(self.TOPIC_LISTENER_NEW, callback)
[docs] def on_listener_modified(self, callback): """Register a callback for listener modifications.""" self.on_event(self.TOPIC_LISTENER_MODIFIED, callback)
[docs] def on_payload_new(self, callback): """Register a callback for new payloads.""" self.on_event(self.TOPIC_PAYLOAD_NEW, callback)
[docs] def disconnect(self): """ Send STOMP DISCONNECT and close the WebSocket. """ if self._ws: with self._lock: if self._stomp_connected: try: self._ws.send(json.dumps(["DISCONNECT\n\n\x00"])) except Exception: pass self._stomp_connected = False try: self._ws.close() except Exception: pass
@property def is_connected(self) -> bool: """True if the STOMP session is active.""" with self._lock: return self._stomp_connected