Source code for tuoni.TuoniC2

import os
import time
import requests
import json
import threading
import shutil
import base64
import inspect
from pathlib import Path
from urllib.parse import quote
from tuoni.TuoniExceptions import *
from tuoni.TuoniListenerPlugin import *
from tuoni.TuoniListener import *
from tuoni.TuoniAgent import *
from tuoni.TuoniPayloadPlugin import *
from tuoni.TuoniPayload import *
from tuoni.TuoniAlias import *
from tuoni.TuoniUser import *
from tuoni.TuoniFile import *
from tuoni.TuoniDataHost import *
from tuoni.TuoniDataService import *
from tuoni.TuoniDataCredential import *
from tuoni.TuoniJob import *
from tuoni.TuoniCommandPlugin import *
from tuoni.TuoniCommandTemplate import *


[docs] class TuoniC2: """ The primary class for establishing connections to and managing interactions with the Tuoni server. It provides functionality for controlling server operations and facilitating communication. Args: verify (str | bool) [default = False]: A flag to enable or disable SSL verification. If a string is provided, it is treated as the path to a CA bundle file. """ def __init__(self, verify: str | bool = False): self._token: str = None self._url: str = None self._monitoring_threads: list = [] self._set_verify(verify)
[docs] def login(self, url: str, username: str, password: str): """ Login to the Tuoni server. Args: url (str): The URL of the Tuoni server to connect to. username (str): The username for authentication. password (str): The password for authentication. Examples: >>> tuoni_server = TuoniC2() # Disabling SSL verification >>> tuoni_server = TuoniC2(verify=False) # Set path for CA bunle >>> tuoni_server = TuoniC2(verify="/path/to/ca_bundle.pem") # Login to the server >>> tuoni_server.login("https://localhost:8443", "my_user", "S3cr37") """ headers = { "Authorization": "Basic " + base64.b64encode(f"{username}:{password}".encode('utf-8')).decode('utf-8') } response = requests.post(f"{url}/api/v1/auth/login", headers=headers, verify=self._verify) if response.status_code < 200 or response.status_code >= 300: raise ExceptionTuoniAuthentication(response.text) self._token = response.text self._url = url
def _request_check(self): if self._token is None: raise ExceptionTuoniAuthentication("You have not done the login") def _set_verify(self, verify: str | bool): self._verify = False if isinstance(verify, bool) and verify: self._verify = True elif isinstance(verify, bool) and not verify: self._verify = False elif isinstance(verify, str): if Path(verify).is_file(): self._verify = verify else: raise Exception("The path to the CA bundle is not valid") @staticmethod def _raise_request_exception(result_msg: str): try: data = json.loads(result_msg) msg = data.get("message", result_msg) except json.JSONDecodeError: msg = result_msg raise ExceptionTuoniRequestFailed(msg) def _make_request(self, method: str, uri: str, **kwargs): self._request_check() headers = {"Authorization": f"Bearer {self._token}"} response = requests.request(method, f"{self._url}{uri}", headers=headers, verify=self._verify, **kwargs) if response.status_code < 200 or response.status_code >= 300: self._raise_request_exception(response.text) return response
[docs] def request_get(self, uri: str, result_as_json: bool = True, result_as_bytes: bool = False): """ Send a GET request to the Tuoni server. Args: uri (str): The URI endpoint to send the request to. result_as_json (bool): If True, the server's response is treated as JSON and converted to a dictionary. result_as_bytes (bool): If True, the server's response is returned as a bytes. Returns: str | dict | bytes: The server's response, either as a raw string or a dictionary if `result_as_json` is True or as bytes if `result_as_bytes` is True. """ response = self._make_request("GET", uri) if len(response.content) == 0: return None if result_as_json: return json.loads(response.text) if result_as_bytes: return response.content return response.text
[docs] def request_get_file(self, uri: str, file_name: str): """ Send a GET request to the Tuoni server and save the result to the filesystem. Args: uri (str): The URI endpoint to send the request to. file_name (str): The name of the file where the response will be saved. """ response = self._make_request("GET", uri, stream=True) with open(file_name, 'wb') as out_file: shutil.copyfileobj(response.raw, out_file)
[docs] def request_post(self, uri: str, json_data: dict = None, files: dict = None): """ Send a POST request to the Tuoni server. Args: uri (str): The URI endpoint to send the request to. json_data (dict): A dictionary containing the JSON payload to include in the POST request. files (dict): A dictionary of files to upload with the POST request. Returns: dict: The server's response as a dictionary. Examples: >>> tuoni = TuoniC2() >>> tuoni_server.request_post( >>> "/api/v1/command-alias", >>> { >>> "name": "bofX", >>> "description": "Example", >>> "baseTemplate": "bof", >>> "fixedConfiguration": {"method": "go"} >>> }, >>> {"bofFile" : ["some_bof.o", open("some_bof", "rb").read()]} >>> ) """ if files is None: response = self._make_request("POST", uri, json=json_data) else: all_data = {} if json_data is not None: all_data["requestBody"] = (None, json.dumps(json_data), 'application/json') for var_name, file_info in files.items(): all_data[var_name] = (file_info[0], file_info[1], 'application/octet-stream') response = self._make_request("POST", uri, files=all_data) return json.loads(response.text) if response.text else None
[docs] def request_patch(self, uri: str, json_data: dict = None, files: dict = None): """ Send a PATCH request to the Tuoni server. Args: uri (str): The URI endpoint to send the request to. json_data (dict): A dictionary containing the JSON payload to include in the POST request. files (dict): A dictionary of files to upload with the POST request. Returns: dict: The server's response as a dictionary. """ if files is None: response = self._make_request("PATCH", uri, json=json_data) else: all_data = {} if json_data is not None: all_data["requestBody"] = (None, json.dumps(json_data), 'application/json') for var_name, file_info in files.items(): all_data[var_name] = (file_info[0], file_info[1], 'application/octet-stream') response = self._make_request("PATCH", uri, files=all_data) return json.loads(response.text) if response.text else None
[docs] def request_put(self, uri: str, json_data: dict = None): """ Send a PUT request to the Tuoni server. Args: uri (str): The URI endpoint to send the request to. json_data (dict): A dictionary containing the JSON payload to include in the PUT request. Returns: dict: The server's response as a dictionary. """ response = self._make_request("PUT", uri, json=json_data) return json.loads(response.text) if response.text else None
[docs] def request_put_multipart(self, uri: str, files: dict): """ Send a PUT request with multipart/form-data to the Tuoni server. Args: uri (str): The URI endpoint to send the request to. files (dict): A dictionary of files to upload. Each key is the form field name and the value is a list of [filename, file_content]. Returns: dict: The server's response as a dictionary, or None. """ all_data = {} for var_name, file_info in files.items(): all_data[var_name] = (file_info[0], file_info[1], "application/octet-stream") response = self._make_request("PUT", uri, files=all_data) return json.loads(response.text) if response.text else None
[docs] def request_delete(self, uri: str, json_data: dict = None): """ Send a DELETE request to the Tuoni server. Args: uri (str): The URI endpoint to send the request to. json_data (dict): A dictionary containing the JSON payload to include in the DELETE request. Returns: dict: The server's response as a dictionary. """ response = self._make_request("DELETE", uri, json=json_data) return json.loads(response.text) if response.text else None
[docs] def load_listener_plugins(self): """ Retrieve a list of listener plugins. Returns: list[TuoniListenerPlugin]: A list of available listener plugins. Examples: >>> http_listener_plugin = tuoni_server.load_listener_plugins()[ >>> "shelldot.listener.agent-reverse-http" >>> ] >>> conf = http_listener_plugin.conf_examples["default"] >>> conf["sleep"] = 2 >>> conf["instantResponses"] = True >>> listener = http_listener_plugin.create(conf) """ plugins_data = self.request_get("/api/v1/plugins/listeners") return {plugin_data["identifier"]["id"]: TuoniListenerPlugin(plugin_data, self) for plugin_data in plugins_data.values()}
[docs] def load_listeners(self): """ Retrieve a list of listeners. Returns: list[TuoniListener]: A list of active listeners. """ listeners_data = self.request_get("/api/v1/listeners") return [TuoniListener(listener_data, self) for listener_data in listeners_data.values()]
[docs] def load_payload_plugins(self): """ Retrieve a list of payload plugins. Returns: list[TuoniPayloadPlugin]: A list of available payload plugins. """ plugins_data = self.request_get("/api/v1/plugins/payloads") return {plugin_data["identifier"]["id"]: TuoniPayloadPlugin(plugin_data, self) for plugin_data in plugins_data.values()}
[docs] def create_payload(self, payload_template: str, payload_listener: int, payload_conf: dict, encrypted: bool = True, payload_name: str = None): """ Create a new payload. Args: payload_template (str): The payload template to use. payload_listener (int): The ID of the listener associated with this payload. payload_conf (dict): A dictionary containing the payload's configuration. encrypted (bool): Specifies whether traffic for this payload should be encrypted. payload_name (str): The name to assign to the payload. Returns: int: The unique ID of the created payload. Examples: >>> payload_id = tuoni_server.create_payload( >>> "shelldot.payload.windows-x64", >>> listener_id, >>> {"type": "executable"} >>> ) """ json_data = { "payloadTemplateId": payload_template, "configuration": payload_conf, "listenerId": payload_listener, "encrypted": encrypted, "name": payload_name } payload_data = self.request_post("/api/v1/payloads", json_data) return payload_data["id"]
[docs] def download_payload(self, payload_id: int, file_name: str): """ Download a payload. Args: payload_id (int): The unique ID of the payload to download. file_name (str): The name of the file to save the downloaded payload. """ self.request_get_file(f"/api/v1/payloads/{payload_id}/download", file_name)
[docs] def rename_payload(self, payload_id: int, new_name: str): """ Rename a payload. Args: payload_id (int): The unique ID of the payload to rename. new_name (str): The new name for the payload. """ self.request_patch(f"/api/v1/payloads/{payload_id}", {"name": new_name})
[docs] def load_payloads(self, include_non_active: bool = False): """ Retrieve a list of payloads. Args: include_non_active (bool): If True, archived and unavailable payloads are included. Returns: list[TuoniPayload]: A list of payloads. Examples: >>> payloads = tuoni_server.load_payloads() >>> for p in payloads: >>> print(p.payload_id, p.name) """ param = "true" if include_non_active else "false" payloads_data = self.request_get(f"/api/v1/payloads?includeNonActive={param}") if payloads_data is None: return [] return [TuoniPayload(p, self) for p in payloads_data]
[docs] def load_payload(self, payload_id: int): """ Retrieve a single payload by its ID. Args: payload_id (int): The unique ID of the payload. Returns: TuoniPayload: The requested payload. Examples: >>> payload = tuoni_server.load_payload(42) >>> print(payload.status) """ payload_data = self.request_get(f"/api/v1/payloads/{payload_id}") return TuoniPayload(payload_data, self)
[docs] def load_agents(self, active = True, unactive = False): """ Retrieve a list of agents. Args: active (bool): Should active agents be returned unactive (bool): Should unactive agents be returned Returns: list[TuoniAgent]: A list of agents. """ agents_data = [] if active and not unactive: agents_data = self.request_get("/api/v1/agents/active") elif not active and unactive: agents_data = self.request_get("/api/v1/agents/inactive") elif active and unactive: agents_data = self.request_get("/api/v1/agents") return [TuoniAgent(agent_data, self) for agent_data in agents_data]
[docs] def wait_new_agent(self, interval: int = 1, max_wait: int = 0): """ Wait for a new agent to connect. Args: interval (int): The interval, in seconds, to check for new connections. max_wait (int): The maximum time to wait, in seconds. A value of 0 means to wait indefinitely. Returns: TuoniAgent: The newly connected agent. """ original_agents = {agent_data["guid"] for agent_data in self.request_get("/api/v1/agents/active")} while True: time.sleep(interval) agents_data = self.request_get("/api/v1/agents/active") for agent_data in agents_data: if agent_data["guid"] not in original_agents: return TuoniAgent(agent_data, self) if max_wait > 0: max_wait -= interval if max_wait <= 0: return None
[docs] def on_new_agent(self, function, interval: int = 1): """ Set a callback function to be triggered when a new agent connects. Args: function (func): The function to execute when a new agent connects. interval (int): The interval, in seconds, to check for new connections. Examples: >>> def new_agent_callback(agent): >>> print( >>> f"We got outselves a new agent {agent.guid} from {agent.metadata['hostname']}" >>> ) >>> >>> tuoni_server.on_new_agent(new_agent_callback) """ monitor_thread = threading.Thread(target=self._monitor_for_new_agents, args=(function, interval), daemon=True) monitor_thread.start() self._monitoring_threads.append(monitor_thread)
def _monitor_for_new_agents(self, function, interval: int = 1): original_agents = {agent_data["guid"] for agent_data in self.request_get("/api/v1/agents")} while True: time.sleep(interval) agents_data = self.request_get("/api/v1/agents") for agent_data in agents_data: if agent_data["guid"] not in original_agents: original_agents.add(agent_data["guid"]) agent = TuoniAgent(agent_data, self) threading.Thread(target=function, args=(agent,)).start()
[docs] def load_command_plugins(self): """ Retrieve a list of command plugins. Returns: list[TuoniCommandPlugin]: A list of available command plugins. """ plugins_data = self.request_get("/api/v1/plugins/commands") return {plugin_data["identifier"]["id"]: TuoniCommandPlugin(plugin_data, self) for plugin_data in plugins_data.values()}
[docs] def load_command_templates(self): """ Retrieve a list of command templates. Returns: list[TuoniCommandTemplate]: A list of available command templates. """ templates_data = self.request_get("/api/v1/command-templates") return [TuoniCommandTemplate(template_data, self) for template_data in templates_data]
[docs] def load_command_template(self, guid): """ Retrieve a specific command template by its unique identifier. Args: guid (str): The unique identifier of the command template to retrieve. Returns: TuoniCommandTemplate: The requested command template, or None if not found. """ template_data = self.request_get(f"/api/v1/command-templates/{guid}") if template_data: return TuoniCommandTemplate(template_data, self) return None
[docs] def load_aliases(self): """ Retrieve a list of aliases. Returns: list[TuoniAlias]: A list of aliases. """ all_aliases = self.request_get("/api/v1/command-alias") return [TuoniAlias(alias_data, self) for alias_data in all_aliases]
[docs] def add_alias(self, name, description, command_type, command_conf=None, files = None): """ Add a new alias. Args: name (str): The name of the alias to create. description (str): A description of the alias. command_type (str | TuoniDefaultCommand): The base command to associate with the alias. command_conf (dict): Configuration settings for the alias. files (dict): File parameters associated with the alias. Returns: TuoniAlias: The newly created alias. Examples: >>> alias1 = tuoni_server.add_alias( >>> "ls1", >>> "Alias made with python lib based on 'ls' default command class", >>> TuoniCommandLs, >>> {"depth": 1} >>> ) >>> alias2 = tuoni_server.add_alias( >>> "ls2", >>> "Alias made with python lib based on command string name", >>> "ls", >>> {"depth": 2} >>> ) >>> alias3 = tuoni_server.add_alias( >>> "easm", >>> "Alias for execute assembly default command class", >>> TuoniCommandexecuteAssembly, >>> {}, >>> files = { >>> "executable": ["dotnet.exe",open("dotnet.exe", "rb").read()] >>> } >>> ) >>> alias4 = tuoni_server.add_alias( >>> "bof1", >>> "Alias for bof based on command string name", >>> "bof", >>> files = { >>> "bofFile": ["bof.o",open("bof.o", "rb").read()] >>> } >>> ) >>> alias5 = tuoni_server.add_alias( >>> "bof2", >>> "Alias for bof based on command ID", >>> "2ac58f33-d35e-4afd-a1ac-00e460ceb9f4", >>> files = { >>> "bofFile": ["bof.o",open("bof.o", "rb").read()] >>> } >>> ) """ if isinstance(command_type, TuoniDefaultCommand): if command_conf is None: command_conf = command_type.command_conf if files is None: files = command_type.files command_type = command_type.command_type if inspect.isclass(command_type) and issubclass(command_type, TuoniDefaultCommand): command_type = command_type._class_base_type if command_conf is None: command_conf = {} json_data = { "name": name, "description": description, "baseTemplate": command_type, "fixedConfiguration": command_conf } alias_data = self.request_post("/api/v1/command-alias", json_data, files = files) return TuoniAlias(alias_data, self)
[docs] def load_hosted(self): """ Retrieve a list of hosted files. Returns: list[TuoniFile]: A list of objects containing the details of hosted files. """ all_files = self.request_get("/api/v1/files") return [TuoniFile(file_data, self) for file_data in all_files]
[docs] def load_hosted_by_path(self, path: str): """ Retrieve a hosted file by its path. Args: path (str): The hosted path of the file (e.g. ``/somedir/file.txt``). Returns: TuoniFile: The hosted file. Examples: >>> f = tuoni_server.load_hosted_by_path("/payloads/agent.exe") """ encoded = quote(path.lstrip("/"), safe="/") file_data = self.request_get(f"/api/v1/file/by-path/{encoded}") return TuoniFile(file_data, self)
[docs] def delete_hosted_by_path(self, path: str): """ Delete a hosted file by its path. Args: path (str): The hosted path of the file (e.g. ``/somedir/file.txt``). Examples: >>> tuoni_server.delete_hosted_by_path("/payloads/agent.exe") """ encoded = quote(path.lstrip("/"), safe="/") self.request_delete(f"/api/v1/files/{encoded}")
[docs] def add_hosted(self, filename, file_content, original_filename = None): """ Add a hosted file. Args: filename (str): The name of the file to host. file_content (bytes): The content of the file in bytes. Returns: str: The API URI for the uploaded file. Examples: >>> tuoni_server.add_hosted("/hosted/file/here.txt", b"HELLO WORLD") """ if original_filename is None: original_filename = os.path.basename(filename) return self.request_post("/api/v1/files", files = {filename: [original_filename, file_content]})
[docs] def delete_hosted(self, hosted): """ Delete a hosted file. Returns: None """ if isinstance(hosted, TuoniFile): self.request_delete("/api/v1/file/" + hosted.fileId) else: self.request_delete("/api/v1/file/" + hosted)
[docs] def load_users(self): """ Retrieve a list of users. Returns: list[TuoniUser]: A list of users. """ all_users = self.request_get("/api/v1/users") return [TuoniUser(user_data, self) for user_data in all_users]
[docs] def add_user(self, username, password, authorities): """ Add a new user. Args: username (str): The username for the new user. password (str): The initial password for the user. authorities (list[str]): A list of authorities or roles assigned to the user. Returns: TuoniUser: The newly created user. Examples: >>> user = tuoni_server.add_user( >>> "cool_new_user", >>> "cool_new_password", >>> [ >>> "MANAGE_LISTENERS", >>> "MANAGE_USERS", >>> "SEND_COMMANDS", >>> "MANAGE_PAYLOADS", >>> "MODIFY_FILES", >>> "VIEW_RESOURCES", >>> "MANAGE_AGENTS" >>> ] >>> ) """ json_data = { "username": username, "password": password, "authorities": authorities } user_data = self.request_post("/api/v1/users", json_data, None) return TuoniUser(user_data, self)
[docs] def load_datamodel_hosts(self, page = 0, pageSize = 256, filter = None): """ Retrieve a list of hosts. Args: page (int): Page of the result. pageSize (int): How many results per page. filter (str): Additional filters. Returns: list[TuoniDataHost]: A list of hosts. """ if filter is None: filter = "" else: filter = "&" + filter all_hosts = self.request_get(f"/api/v1/discovery/hosts?page={page}&pageSize={pageSize}{filter}") if "items" in all_hosts: return [TuoniDataHost(host_data, self) for host_data in all_hosts["items"]] return []
[docs] def add_datamodel_host(self, address, name, note): """ Add a host data model entry. Args: address (str): The host address. name (str): Given name to the host. note (str): Additional notes. Returns: TuoniDataHost: The newly created host. Examples: >>> host = tuoni_server.add_datamodel_host("10.20.30.40", "WS1", "Open windows machine") """ json_data = { "address": address, "name": name, "note": note } host_data = self.request_post("/api/v1/discovery/hosts", json_data, None) return TuoniDataHost(host_data, self)
[docs] def load_datamodel_services(self, page = 0, pageSize = 256, filter = None): """ Retrieve a list of services. Args: page (int): Page of the result. pageSize (int): How many results per page. filter (str): Additional filters. Returns: list[TuoniDataService]: A list of services. """ if filter is None: filter = "" else: filter = "&" + filter all_services = self.request_get(f"/api/v1/discovery/services?page={page}&pageSize={pageSize}{filter}") if "items" in all_services: return [TuoniDataService(service_data, self) for service_data in all_services["items"]] return []
[docs] def add_datamodel_service(self, address, port, protocol, banner, note): """ Add a service data model entry. Args: address (str): The service address. port (int): Port number. protocol (str): Service protocol. banner (str): Service banner. note (str): Additional notes. Returns: TuoniDataService: The newly created service. Examples: >>> service = tuoni_server.add_datamodel_service("10.20.30.40", "443", "HTTPS", "", "") """ json_data = { "address": address, "port": port, "protocol": protocol, "banner": banner, "note": note } service_data = self.request_post("/api/v1/discovery/services", json_data, None) return TuoniDataService(service_data, self)
[docs] def load_datamodel_credentials(self, page = 0, pageSize = 256, filter = None): """ Retrieve a list of credentials. Args: page (int): Page of the result. pageSize (int): How many results per page. filter (str): Additional filters. Returns: list[TuoniDataCredential]: A list of credentials. """ if filter is None: filter = "" else: filter = "&" + filter all_credentials = self.request_get(f"/api/v1/discovery/credentials?page={page}&pageSize={pageSize}{filter}") if "items" in all_credentials: return [TuoniDataCredential(credential_data, self) for credential_data in all_credentials["items"]] return []
[docs] def add_datamodel_credential(self, username, password, host, realm, source, note): """ Add a credential data model entry. Args: username (str): The username. password (str): The password. host (str): Host where credential works. realm (str): Credential realm. source (str): Source of the credential. note (str): Additional notes. Returns: TuoniDataCredential: The newly created credential. Examples: >>> credential = tuoni_server.add_datamodel_credential( >>> "rick", >>> "NeverGonnaBypassYourEDR", >>> "10.20.30.40", >>> "", "", "" >>> ) """ json_data = { "username": username, "password": password, "host": host, "realm": realm, "source": source, "note": note } credential_data = self.request_post("/api/v1/discovery/credentials", json_data, None) return TuoniDataCredential(credential_data, self)
[docs] def load_jobs(self, page = 0, pageSize = 256, sort_col = "id", sort_order = "asc", inactives = False): """ Retrieve a list of jobs. Args: page (int): Page of the result (only used when ``inactives=True``). pageSize (int): How many results per page (only used when ``inactives=True``). sort_col (str): Based of what field to sort sort_order (str): Order of sort inactives (bool): Should also inactive jobs be returned Returns: list[TuoniJob]: A list of jobs. """ sort_param = f"sort={sort_col}:{sort_order}" if inactives: all_jobs = self.request_get(f"/api/v1/jobs/all?page={page}&pageSize={pageSize}&{sort_param}") if all_jobs is None: return [] if isinstance(all_jobs, dict) and "items" in all_jobs: return [TuoniJob(job_data, self) for job_data in all_jobs["items"]] return [] active_jobs = self.request_get(f"/api/v1/jobs/active?{sort_param}") if active_jobs is None: return [] return [TuoniJob(job_data, self) for job_data in active_jobs]
[docs] def load_settings(self): """ Retrieve all server settings. Returns: dict: A dict with ``server`` and ``plugin`` keys, each containing setting objects. Examples: >>> settings = tuoni_server.load_settings() >>> for key, s in settings["server"].items(): >>> print(key, s["currentValue"]) """ return self.request_get("/api/v1/settings")
[docs] def update_settings(self, settings: dict): """ Update multiple server settings at once. Args: settings (dict): A mapping of setting key to new value. Follows the same structure as ``AllSettingsUpdateRequest`` on the server. Returns: dict: The updated settings. """ return self.request_patch("/api/v1/settings", settings)
[docs] def load_setting(self, key: str): """ Retrieve a single server setting by key. Args: key (str): The setting key. Returns: dict: The setting object including ``currentValue``, ``defaultValue``, ``type``, etc. Examples: >>> s = tuoni_server.load_setting("tuoni.websocket.command-result-max-size") >>> print(s["currentValue"]) """ return self.request_get(f"/api/v1/setting/{key}")
[docs] def update_setting(self, key: str, value): """ Update a single server setting. Args: key (str): The setting key. value: The new value for the setting (must be JSON-serialisable). Returns: dict: The updated setting object. Examples: >>> tuoni_server.update_setting("tuoni.agent.default-sleep", 5) """ return self.request_put(f"/api/v1/setting/{key}", {"value": value})
[docs] def connect_websocket(self, timeout: float = 10): """ Connect to the Tuoni server's real-time WebSocket endpoint. Requires the ``websocket-client`` package (``pip install websocket-client``). Args: timeout (float): Seconds to wait for the STOMP handshake to complete. Returns: TuoniWebSocket: A connected WebSocket client ready for subscriptions. Raises: Exception: If the connection cannot be established within ``timeout`` seconds. Examples: >>> ws = tuoni_server.connect_websocket() >>> results = [] >>> ws.on_command_result(lambda data: results.append(data)) """ from tuoni.TuoniWebSocket import TuoniWebSocket ws = TuoniWebSocket(self) connected = ws.connect(timeout=timeout) if not connected: raise Exception("Failed to connect to Tuoni WebSocket within timeout") return ws
[docs] def let_it_run(self): """ Block execution and wait indefinitely if any callback functions are initialized, or until all monitoring threads have completed. """ for monitoring_thread in self._monitoring_threads: monitoring_thread.join()