import os
import time
import requests
import json
import threading
import shutil
import base64
import inspect
from pathlib import Path
from tuoni.TuoniExceptions import *
from tuoni.TuoniListenerPlugin import *
from tuoni.TuoniListener import *
from tuoni.TuoniAgent import *
from tuoni.TuoniPayloadPlugin import *
from tuoni.TuoniAlias import *
from tuoni.TuoniUser import *
from tuoni.TuoniFile import *
from tuoni.TuoniDataHost import *
from tuoni.TuoniDataService import *
from tuoni.TuoniDataCredential import *
[docs]
class TuoniC2:
"""
The primary class for establishing connections to and managing interactions with the Tuoni server. It provides functionality for controlling server operations and facilitating communication.
Args:
verify (str | bool) [default = False]: A flag to enable or disable SSL verification. If a string is provided, it is treated as the path to a CA bundle file.
"""
def __init__(self, verify: str | bool = False):
self._token: str = None
self._url: str = None
self._monitoring_threads: list = []
self._set_verify(verify)
[docs]
def login(self, url: str, username: str, password: str):
"""
Login to the Tuoni server.
Args:
url (str): The URL of the Tuoni server to connect to.
username (str): The username for authentication.
password (str): The password for authentication.
Examples:
>>> tuoni_server = TuoniC2()
# Disabling SSL verification
>>> tuoni_server = TuoniC2(verify=False)
# Set path for CA bunle
>>> tuoni_server = TuoniC2(verify="/path/to/ca_bundle.pem")
# Login to the server
>>> tuoni_server.login("https://localhost:8443", "my_user", "S3cr37")
"""
headers = {
"Authorization": "Basic " + base64.b64encode(f"{username}:{password}".encode('utf-8')).decode('utf-8')
}
response = requests.post(f"{url}/api/v1/auth/login", headers=headers, verify=self._verify)
if response.status_code != 200:
raise ExceptionTuoniAuthentication(response.text)
self._token = response.text
self._url = url
def _request_check(self):
if self._token is None:
raise ExceptionTuoniAuthentication("You have not done the login")
def _set_verify(self, verify: str | bool):
self._verify = False
if isinstance(verify, bool) and verify:
self._verify = True
elif isinstance(verify, bool) and not verify:
self._verify = False
elif isinstance(verify, str):
if Path(verify).is_file():
self._verify = verify
else:
raise Exception("The path to the CA bundle is not valid")
@staticmethod
def _raise_request_exception(result_msg: str):
try:
data = json.loads(result_msg)
msg = data.get("message", result_msg)
except json.JSONDecodeError:
msg = result_msg
raise ExceptionTuoniRequestFailed(msg)
def _make_request(self, method: str, uri: str, **kwargs):
self._request_check()
headers = {"Authorization": f"Bearer {self._token}"}
response = requests.request(method, f"{self._url}{uri}", headers=headers, verify=self._verify, **kwargs)
if response.status_code != 200:
self._raise_request_exception(response.text)
return response
[docs]
def request_get(self, uri: str, result_as_json: bool = True):
"""
Send a GET request to the Tuoni server.
Args:
uri (str): The URI endpoint to send the request to.
result_as_json (bool): If True, the server's response is treated as JSON and converted to a dictionary.
Returns:
str | dict: The server's response, either as a raw string or a dictionary if `result_as_json` is True.
"""
response = self._make_request("GET", uri)
if response.text == "":
return None
return json.loads(response.text) if result_as_json else response.text
[docs]
def request_get_file(self, uri: str, file_name: str):
"""
Send a GET request to the Tuoni server and save the result to the filesystem.
Args:
uri (str): The URI endpoint to send the request to.
file_name (str): The name of the file where the response will be saved.
"""
response = self._make_request("GET", uri, stream=True)
with open(file_name, 'wb') as out_file:
shutil.copyfileobj(response.raw, out_file)
[docs]
def request_post(self, uri: str, json_data: dict = None, files: dict = None):
"""
Send a POST request to the Tuoni server.
Args:
uri (str): The URI endpoint to send the request to.
json_data (dict): A dictionary containing the JSON payload to include in the POST request.
files (dict): A dictionary of files to upload with the POST request.
Returns:
dict: The server's response as a dictionary.
Examples:
>>> tuoni = TuoniC2()
>>> tuoni_server.request_post(
>>> "/api/v1/command-alias",
>>> {
>>> "name": "bofX",
>>> "description": "Example",
>>> "baseTemplate": "bof",
>>> "fixedConfiguration": {"method": "go"}
>>> },
>>> {"bofFile" : ["some_bof.o", open("some_bof", "rb").read()]}
>>> )
"""
if files is None:
response = self._make_request("POST", uri, json=json_data)
else:
all_data = {}
if json_data is not None:
all_data["requestBody"] = (None, json.dumps(json_data), 'application/json')
for var_name, file_info in files.items():
all_data[var_name] = (file_info[0], file_info[1], 'application/octet-stream')
response = self._make_request("POST", uri, files=all_data)
return json.loads(response.text) if response.text else None
[docs]
def request_patch(self, uri: str, json_data: dict = None, files: dict = None):
"""
Send a PATCH request to the Tuoni server.
Args:
uri (str): The URI endpoint to send the request to.
json_data (dict): A dictionary containing the JSON payload to include in the POST request.
files (dict): A dictionary of files to upload with the POST request.
Returns:
dict: The server's response as a dictionary.
"""
if files is None:
response = self._make_request("PATCH", uri, json=json_data)
else:
all_data = {}
if json_data is not None:
all_data["requestBody"] = (None, json.dumps(json_data), 'application/json')
for var_name, file_info in files.items():
all_data[var_name] = (file_info[0], file_info[1], 'application/octet-stream')
response = self._make_request("PATCH", uri, files=all_data)
return json.loads(response.text) if response.text else None
[docs]
def request_put(self, uri: str, json_data: dict = None):
"""
Send a PUT request to the Tuoni server.
Args:
uri (str): The URI endpoint to send the request to.
json_data (dict): A dictionary containing the JSON payload to include in the PUT request.
Returns:
dict: The server's response as a dictionary.
"""
response = self._make_request("PUT", uri, json=json_data)
return json.loads(response.text) if response.text else None
[docs]
def request_delete(self, uri: str, json_data: dict = None):
"""
Send a DELETE request to the Tuoni server.
Args:
uri (str): The URI endpoint to send the request to.
json_data (dict): A dictionary containing the JSON payload to include in the DELETE request.
Returns:
dict: The server's response as a dictionary.
"""
response = self._make_request("DELETE", uri, json=json_data)
return json.loads(response.text) if response.text else None
[docs]
def load_listener_plugins(self):
"""
Retrieve a list of listener plugins.
Returns:
list[TuoniListenerPlugin]: A list of available listener plugins.
Examples:
>>> http_listener_plugin = tuoni_server.load_listener_plugins()[
>>> "shelldot.listener.agent-reverse-http"
>>> ]
>>> conf = http_listener_plugin.conf_examples["default"]
>>> conf["sleep"] = 2
>>> conf["instantResponses"] = True
>>> listener = http_listener_plugin.create(conf)
"""
plugins_data = self.request_get("/api/v1/plugins/listeners")
return {plugin_data["identifier"]["id"]: TuoniListenerPlugin(plugin_data, self) for plugin_data in plugins_data.values()}
[docs]
def load_listeners(self):
"""
Retrieve a list of listeners.
Returns:
list[TuoniListener]: A list of active listeners.
"""
listeners_data = self.request_get("/api/v1/listeners")
return [TuoniListener(listener_data, self) for listener_data in listeners_data.values()]
[docs]
def load_payload_plugins(self):
"""
Retrieve a list of payload plugins.
Returns:
list[TuoniPayloadPlugin]: A list of available payload plugins.
"""
plugins_data = self.request_get("/api/v1/plugins/payloads")
return {plugin_data["identifier"]["id"]: TuoniPayloadPlugin(plugin_data, self) for plugin_data in plugins_data.values()}
[docs]
def create_payload(self, payload_template: str, payload_listener: int, payload_conf: dict, encrypted: bool = True, payload_name: str = None):
"""
Create a new payload.
Args:
payload_template (str): The payload template to use.
payload_listener (int): The ID of the listener associated with this payload.
payload_conf (dict): A dictionary containing the payload's configuration.
encrypted (bool): Specifies whether traffic for this payload should be encrypted.
payload_name (str): The name to assign to the payload.
Returns:
id: The unique ID of the created payload.
Examples:
>>> payload_id = tuoni_server.create_payload(
>>> "shelldot.payload.windows-x64",
>>> listener_id,
>>> {"type": "executable"}
>>> )
"""
json_data = {
"payloadTemplateId": payload_template,
"configuration": payload_conf,
"listenerId": payload_listener,
"encrypted": encrypted,
"name": payload_name
}
payload_data = self.request_post("/api/v1/payloads", json_data)
return payload_data["id"]
[docs]
def download_payload(self, payload_id: int, file_name: str):
"""
Download a payload.
Args:
payload_id (int): The unique ID of the payload to download.
file_name (str): The name of the file to save the downloaded payload.
"""
self.request_get_file(f"/api/v1/payloads/{payload_id}/download", file_name)
[docs]
def load_agents(self, active = True, unactive = False):
"""
Retrieve a list of agents.
Args:
active (bool): Should active agents be returned
unactive (bool): Should unactive agents be returned
Returns:
list[TuoniAgent]: A list of agents.
"""
agents_data = []
if active and not unactive:
agents_data = self.request_get("/api/v1/agents/active")
elif not active and unactive:
agents_data = self.request_get("/api/v1/agents/inactive")
elif active and unactive:
agents_data = self.request_get("/api/v1/agents")
return [TuoniAgent(agent_data, self) for agent_data in agents_data]
[docs]
def wait_new_agent(self, interval: int = 1, max_wait: int = 0):
"""
Wait for a new agent to connect.
Args:
interval (int): The interval, in seconds, to check for new connections.
max_wait (int): The maximum time to wait, in seconds. A value of 0 means to wait indefinitely.
Returns:
TuoniAgent: The newly connected agent.
"""
original_agents = {agent_data["guid"] for agent_data in self.request_get("/api/v1/agents/active")}
while True:
time.sleep(interval)
agents_data = self.request_get("/api/v1/agents/active")
for agent_data in agents_data:
if agent_data["guid"] not in original_agents:
return TuoniAgent(agent_data, self)
if max_wait > 0:
max_wait -= interval
if max_wait <= 0:
return None
[docs]
def on_new_agent(self, function, interval: int = 1):
"""
Set a callback function to be triggered when a new agent connects.
Args:
function (func): The function to execute when a new agent connects.
interval (int): The interval, in seconds, to check for new connections.
Examples:
>>> def new_agent_callback(agent):
>>> print(
>>> f"We got outselves a new agent {agent.guid} from {agent.metadata['hostname']}"
>>> )
>>>
>>> tuoni_server.on_new_agent(new_agent_callback)
"""
monitor_thread = threading.Thread(target=self._monitor_for_new_agents, args=(function, interval), daemon=True)
monitor_thread.start()
self._monitoring_threads.append(monitor_thread)
def _monitor_for_new_agents(self, function, interval: int = 1):
original_agents = {agent_data["guid"] for agent_data in self.request_get("/api/v1/agents")}
while True:
time.sleep(interval)
agents_data = self.request_get("/api/v1/agents")
for agent_data in agents_data:
if agent_data["guid"] not in original_agents:
original_agents.add(agent_data["guid"])
agent = TuoniAgent(agent_data, self)
threading.Thread(target=function, args=(agent,)).start()
[docs]
def load_aliases(self):
"""
Retrieve a list of aliases.
Returns:
list[TuoniAlias]: A list of aliases.
"""
all_aliases = self.request_get("/api/v1/command-alias")
return [TuoniAlias(alias_data, self) for alias_data in all_aliases]
[docs]
def add_alias(self, name, description, command_type, command_conf={}, files = None):
"""
Add a new alias.
Args:
name (str): The name of the alias to create.
description (str): A description of the alias.
command_type (str | TuoniDefaultCommand): The base command to associate with the alias.
command_conf (dict): Configuration settings for the alias.
files (dict): File parameters associated with the alias.
Returns:
TuoniAlias: The newly created alias.
Examples:
>>> alias1 = tuoni_server.add_alias(
>>> "ls1",
>>> "Alias made with python lib based on 'ls' default command class",
>>> TuoniCommandLs,
>>> {"depth": 1}
>>> )
>>> alias2 = tuoni_server.add_alias(
>>> "ls2",
>>> "Alias made with python lib based on command string name",
>>> "ls",
>>> {"depth": 2}
>>> )
>>> alias3 = tuoni_server.add_alias(
>>> "easm",
>>> "Alias for execute assembly default command class",
>>> TuoniCommandexecuteAssembly,
>>> {},
>>> files = {
>>> "executable": ["dotnet.exe",open("dotnet.exe", "rb").read()]
>>> }
>>> )
>>> alias4 = tuoni_server.add_alias(
>>> "bof1",
>>> "Alias for bof based on command string name",
>>> "bof",
>>> files = {
>>> "bofFile": ["bof.o",open("bof.o", "rb").read()]
>>> }
>>> )
>>> alias5 = tuoni_server.add_alias(
>>> "bof2",
>>> "Alias for bof based on command ID",
>>> "2ac58f33-d35e-4afd-a1ac-00e460ceb9f4",
>>> files = {
>>> "bofFile": ["bof.o",open("bof.o", "rb").read()]
>>> }
>>> )
"""
if isinstance(command_type, TuoniDefaultCommand):
command_conf = command_type.command_conf
files = command_type.files
command_type = command_type.command_type
if inspect.isclass(command_type) and issubclass(command_type, TuoniDefaultCommand):
command_type = command_type._class_base_type
json_data = {
"name": name,
"description": description,
"baseTemplate": command_type,
"fixedConfiguration": command_conf
}
alias_data = self.request_post("/api/v1/command-alias", json_data, files = files)
return TuoniAlias(alias_data, self)
[docs]
def load_hosted(self):
"""
Retrieve a list of hosted files.
Returns:
list[TuoniFile]: A list of objects containing the details of hosted files.
"""
all_files = self.request_get("/api/v1/files")
return [TuoniFile(file_data, self) for file_data in all_files]
[docs]
def add_hosted(self, filename, file_content, original_filename = None):
"""
Add a hosted file.
Args:
filename (str): The name of the file to host.
file_content (bytes): The content of the file in bytes.
Returns:
str: The API URI for the uploaded file.
Examples:
>>> tuoni_server.add_hosted("/hosted/file/here.txt", b"HELLO WORLD")
"""
if original_filename is None:
original_filename = os.path.basename(filename)
return self.request_post("/api/v1/files", files = {filename: [original_filename, file_content]})
[docs]
def delete_hosted(self, hosted):
"""
Delete a hosted file.
Returns:
None
"""
if isinstance(hosted, TuoniFile):
self.request_delete("/api/v1/file/" + hosted.fileId)
else:
self.request_delete("/api/v1/file/" + hosted)
[docs]
def load_users(self):
"""
Retrieve a list of users.
Returns:
list[TuoniUser]: A list of users.
"""
all_users = self.request_get("/api/v1/users")
return [TuoniUser(user_data, self) for user_data in all_users]
[docs]
def add_user(self, username, password, authorities):
"""
Add a new user.
Args:
username (str): The username for the new user.
password (str): The initial password for the user.
authorities (list[str]): A list of authorities or roles assigned to the user.
Returns:
TuoniUser: The newly created user.
Examples:
>>> user = tuoni_server.add_user(
>>> "cool_new_user",
>>> "cool_new_password",
>>> [
>>> "MANAGE_LISTENERS",
>>> "MANAGE_USERS",
>>> "SEND_COMMANDS",
>>> "MANAGE_PAYLOADS",
>>> "MODIFY_FILES",
>>> "VIEW_RESOURCES",
>>> "MANAGE_AGENTS"
>>> ]
>>> )
"""
json_data = {
"username": username,
"password": password,
"authorities": authorities
}
user_data = self.request_post("/api/v1/users", json_data, None)
return TuoniUser(user_data, self)
[docs]
def load_datamodel_hosts(self, page = 0, pageSize = 256, filter = None):
"""
Retrieve a list of hosts.
Returns:
list[TuoniDataHost]: A list of hosts.
"""
if filter is None:
filter = ""
else:
filter = "&" + filter
all_hosts = self.request_get(f"/api/v1/discovery/hosts?page={page}&pageSize={pageSize}{filter}")
if "items" in all_hosts:
return [TuoniDataHost(host_data, self) for host_data in all_hosts["items"]]
return []
[docs]
def add_datamodel_host(self, address, name, note):
"""
Add a host data model entry.
Args:
address (str): The host address.
name (str): Given name to the host.
note (str): Additional notes.
Returns:
TuoniDataHost: The newly created host.
Examples:
>>> host = tuoni_server.add_datamodel_host("10.20.30.40", "WS1", "Open windows machine")
"""
json_data = {
"address": address,
"name": name,
"note": note
}
host_data = self.request_post("/api/v1/discovery/hosts", json_data, None)
return TuoniDataHost(host_data, self)
[docs]
def load_datamodel_services(self, page = 0, pageSize = 256, filter = None):
"""
Retrieve a list of services.
Returns:
list[TuoniDataService]: A list of services.
"""
if filter is None:
filter = ""
else:
filter = "&" + filter
all_services = self.request_get(f"/api/v1/discovery/services?page={page}&pageSize={pageSize}{filter}")
if "items" in all_services:
return [TuoniDataService(service_data, self) for service_data in all_services["items"]]
return []
[docs]
def add_datamodel_service(self, address, port, protocol, banner, note):
"""
Add a service data model entry.
Args:
address (str): The service address.
port (int): Port number.
protocol (str): Service protocol.
banner (str): Service banner.
note (str): Additional notes.
Returns:
TuoniDataService: The newly created service.
Examples:
>>> service = tuoni_server.add_datamodel_service("10.20.30.40", "443", "HTTPS", "", "")
"""
json_data = {
"address": address,
"port": port,
"protocol": protocol,
"banner": banner,
"note": note
}
service_data = self.request_post("/api/v1/discovery/services", json_data, None)
return TuoniDataService(service_data, self)
[docs]
def load_datamodel_credentials(self, page = 0, pageSize = 256, filter = None):
"""
Retrieve a list of credentials.
Returns:
list[TuoniDataCredential]: A list of credentials.
"""
if filter is None:
filter = ""
else:
filter = "&" + filter
all_credentials = self.request_get(f"/api/v1/discovery/credentials?page={page}&pageSize={pageSize}{filter}")
if "items" in all_credentials:
return [TuoniDataCredential(credential_data, self) for credential_data in all_credentials["items"]]
return []
[docs]
def add_datamodel_credential(self, username, password, host, realm, source, note):
"""
Add a credential data model entry.
Args:
username (str): The username.
password (str): The password.
host (str): Host where credential works.
realm (str): Credential realm.
source (str): Source of the credential.
note (str): Additional notes.
Returns:
TuoniDataCredential: The newly created credential.
Examples:
>>> credential = tuoni_server.add_datamodel_credential(
>>> "rick",
>>> "NeverGonnaBypassYourEDR",
>>> "10.20.30.40",
>>> "", "", ""
>>> )
"""
json_data = {
"username": username,
"password": password,
"host": host,
"realm": realm,
"source": source,
"note": note
}
credential_data = self.request_post("/api/v1/discovery/credentials", json_data, None)
return TuoniDataCredential(credential_data, self)
[docs]
def let_it_run(self):
"""
Block execution and wait indefinitely if any callback functions are initialized,
or until all monitoring threads have completed.
"""
for monitoring_thread in self._monitoring_threads:
monitoring_thread.join()