import time
import requests
import json
import threading
import shutil
import base64
import inspect
from tuoni.TuoniExceptions import *
from tuoni.TuoniListenerPlugin import *
from tuoni.TuoniListener import *
from tuoni.TuoniAgent import *
from tuoni.TuoniPayloadPlugin import *
from tuoni.TuoniAlias import *
from tuoni.TuoniUser import *
[docs]
class TuoniC2:
"""
The primary class for establishing connections to and managing interactions with the Tuoni server. It provides functionality for controlling server operations and facilitating communication.
"""
def __init__(self):
self._token: str = None
self._url: str = None
self._monitoring_threads: list = []
[docs]
def login(self, url: str, username: str, password: str):
"""
Login to the Tuoni server.
Args:
url (str): The URL of the Tuoni server to connect to.
username (str): The username for authentication.
password (str): The password for authentication.
Examples:
>>> tuoni_server = TuoniC2()
>>> tuoni_server.login("https://localhost:8443", "my_user", "S3cr37")
"""
headers = {
"Authorization": "Basic " + base64.b64encode(f"{username}:{password}".encode('utf-8')).decode('utf-8')
}
response = requests.post(f"{url}/api/v1/auth/login", headers=headers, verify=False)
if response.status_code != 200:
raise ExceptionTuoniAuthentication(response.text)
self._token = response.text
self._url = url
def _request_check(self):
if self._token is None:
raise ExceptionTuoniAuthentication("You have not done the login")
@staticmethod
def _raise_request_exception(result_msg: str):
try:
data = json.loads(result_msg)
msg = data.get("message", result_msg)
except json.JSONDecodeError:
msg = result_msg
raise ExceptionTuoniRequestFailed(msg)
def _make_request(self, method: str, uri: str, **kwargs):
self._request_check()
headers = {"Authorization": f"Bearer {self._token}"}
response = requests.request(method, f"{self._url}{uri}", headers=headers, verify=False, **kwargs)
if response.status_code != 200:
self._raise_request_exception(response.text)
return response
[docs]
def request_get(self, uri: str, result_as_json: bool = True):
"""
Send a GET request to the Tuoni server.
Args:
uri (str): The URI endpoint to send the request to.
result_as_json (bool): If True, the server's response is treated as JSON and converted to a dictionary.
Returns:
str | dict: The server's response, either as a raw string or a dictionary if `result_as_json` is True.
"""
response = self._make_request("GET", uri)
if response.text == "":
return None
return json.loads(response.text) if result_as_json else response.text
[docs]
def request_get_file(self, uri: str, file_name: str):
"""
Send a GET request to the Tuoni server and save the result to the filesystem.
Args:
uri (str): The URI endpoint to send the request to.
file_name (str): The name of the file where the response will be saved.
"""
response = self._make_request("GET", uri, stream=True)
with open(file_name, 'wb') as out_file:
shutil.copyfileobj(response.raw, out_file)
[docs]
def request_post(self, uri: str, json_data: dict = None, files: dict = None):
"""
Send a POST request to the Tuoni server.
Args:
uri (str): The URI endpoint to send the request to.
json_data (dict): A dictionary containing the JSON payload to include in the POST request.
files (dict): A dictionary of files to upload with the POST request.
Returns:
dict: The server's response as a dictionary.
Examples:
>>> tuoni = TuoniC2()
>>> tuoni_server.request_post("/api/v1/command-alias", {"name": "bofX","description": "Example","baseTemplate": "bof","fixedConfiguration": {"method": "go"}},{"bofFile" : ["some_bof.o", open("some_bof", "rb").read()]})
"""
if files is None:
response = self._make_request("POST", uri, json=json_data)
else:
all_data = {}
if json_data is not None:
all_data["requestBody"] = (None, json.dumps(json_data), 'application/json')
for var_name, file_info in files.items():
all_data[var_name] = (file_info[0], file_info[1], 'application/octet-stream')
response = self._make_request("POST", uri, files=all_data)
return json.loads(response.text) if response.text else None
[docs]
def request_put(self, uri: str, json_data: dict = None):
"""
Send a PUT request to the Tuoni server.
Args:
uri (str): The URI endpoint to send the request to.
json_data (dict): A dictionary containing the JSON payload to include in the PUT request.
Returns:
dict: The server's response as a dictionary.
"""
response = self._make_request("PUT", uri, json=json_data)
return json.loads(response.text) if response.text else None
[docs]
def request_delete(self, uri: str, json_data: dict = None):
"""
Send a DELETE request to the Tuoni server.
Args:
uri (str): The URI endpoint to send the request to.
json_data (dict): A dictionary containing the JSON payload to include in the DELETE request.
Returns:
dict: The server's response as a dictionary.
"""
response = self._make_request("DELETE", uri, json=json_data)
return json.loads(response.text) if response.text else None
[docs]
def load_listener_plugins(self):
"""
Retrieve a list of listener plugins.
Returns:
list[TuoniListenerPlugin]: A list of available listener plugins.
Examples:
>>> http_listener_plugin = tuoni_server.load_listener_plugins()["shelldot.listener.agent-reverse-http"]
>>> conf = http_listener_plugin.conf_examples["default"]
>>> conf["sleep"] = 2
>>> conf["instantResponses"] = True
>>> listener = http_listener_plugin.create(conf)
"""
plugins_data = self.request_get("/api/v1/plugins/listeners")
return {plugin_data["identifier"]["id"]: TuoniListenerPlugin(plugin_data, self) for plugin_data in plugins_data.values()}
[docs]
def load_listeners(self):
"""
Retrieve a list of listeners.
Returns:
list[TuoniListener]: A list of active listeners.
"""
listeners_data = self.request_get("/api/v1/listeners")
return [TuoniListener(listener_data, self) for listener_data in listeners_data.values()]
[docs]
def load_payload_plugins(self):
"""
Retrieve a list of payload plugins.
Returns:
list[TuoniPayloadPlugin]: A list of available payload plugins.
"""
plugins_data = self.request_get("/api/v1/plugins/payloads")
return {plugin_data["identifier"]["id"]: TuoniPayloadPlugin(plugin_data, self) for plugin_data in plugins_data.values()}
[docs]
def create_payload(self, payload_template: str, payload_listener: int, payload_conf: dict, encrypted: bool = True, payload_name: str = None):
"""
Create a new payload.
Args:
payload_template (str): The payload template to use.
payload_listener (int): The ID of the listener associated with this payload.
payload_conf (dict): A dictionary containing the payload's configuration.
encrypted (bool): Specifies whether traffic for this payload should be encrypted.
payload_name (str): The name to assign to the payload.
Returns:
id: The unique ID of the created payload.
Examples:
>>> payload_id = tuoni_server.create_payload("shelldot.payload.windows-x64", listener_id, {"type": "executable"})
"""
json_data = {
"payloadTemplateId": payload_template,
"configuration": payload_conf,
"listenerId": payload_listener,
"encrypted": encrypted,
"name": payload_name
}
payload_data = self.request_post("/api/v1/payloads", json_data)
return payload_data["id"]
[docs]
def download_payload(self, payload_id: int, file_name: str):
"""
Download a payload.
Args:
payload_id (int): The unique ID of the payload to download.
file_name (str): The name of the file to save the downloaded payload.
"""
self.request_get_file(f"/api/v1/payloads/{payload_id}/download", file_name)
[docs]
def load_agents(self):
"""
Retrieve a list of agents.
Returns:
list[TuoniAgent]: A list of agents.
"""
agents_data = self.request_get("/api/v1/agents/active")
return [TuoniAgent(agent_data, self) for agent_data in agents_data]
[docs]
def wait_new_agent(self, interval: int = 1, max_wait: int = 0):
"""
Wait for a new agent to connect.
Args:
interval (int): The interval, in seconds, to check for new connections.
max_wait (int): The maximum time to wait, in seconds. A value of 0 means to wait indefinitely.
Returns:
TuoniAgent: The newly connected agent.
"""
original_agents = {agent_data["guid"] for agent_data in self.request_get("/api/v1/agents/active")}
while True:
time.sleep(interval)
agents_data = self.request_get("/api/v1/agents/active")
for agent_data in agents_data:
if agent_data["guid"] not in original_agents:
return TuoniAgent(agent_data, self)
if max_wait > 0:
max_wait -= interval
if max_wait <= 0:
return None
[docs]
def on_new_agent(self, function, interval: int = 1):
"""
Set a callback function to be triggered when a new agent connects.
Args:
function (func): The function to execute when a new agent connects.
interval (int): The interval, in seconds, to check for new connections.
Examples:
>>> def new_agent_callback(agent):
>>> print(f"We got outselves a new agent {agent.guid} from {agent.metadata['hostname']}")
>>>
>>> tuoni_server.on_new_agent(new_agent_callback)
"""
monitor_thread = threading.Thread(target=self._monitor_for_new_agents, args=(function, interval), daemon=True)
monitor_thread.start()
self._monitoring_threads.append(monitor_thread)
def _monitor_for_new_agents(self, function, interval: int = 1):
original_agents = {agent_data["guid"] for agent_data in self.request_get("/api/v1/agents")}
while True:
time.sleep(interval)
agents_data = self.request_get("/api/v1/agents")
for agent_data in agents_data:
if agent_data["guid"] not in original_agents:
original_agents.add(agent_data["guid"])
agent = TuoniAgent(agent_data, self)
threading.Thread(target=function, args=(agent,)).start()
[docs]
def load_aliases(self):
"""
Retrieve a list of aliases.
Returns:
list[TuoniAlias]: A list of aliases.
"""
all_aliases = self.request_get("/api/v1/command-alias")
return [TuoniAlias(alias_data, self) for alias_data in all_aliases]
[docs]
def add_alias(self, name, description, command_type, command_conf={}, files = None):
"""
Add a new alias.
Args:
name (str): The name of the alias to create.
description (str): A description of the alias.
command_type (str | TuoniDefaultCommand): The base command to associate with the alias.
command_conf (dict): Configuration settings for the alias.
files (dict): File parameters associated with the alias.
Returns:
TuoniAlias: The newly created alias.
Examples:
>>> alias1 = tuoni_server.add_alias("ls1", "Alias made with python lib based on 'ls' default command class", TuoniCommandLs, {"depth": 1})
>>> alias2 = tuoni_server.add_alias("ls2", "Alias made with python lib based on command string name", "ls", {"depth": 2})
>>> alias3 = tuoni_server.add_alias("easm", "Alias for execute assembly default command class", TuoniCommandexecuteAssembly, {}, files={"executable": ["dotnet.exe",open("dotnet.exe", "rb").read()]})
>>> alias4 = tuoni_server.add_alias("bof1", "Alias for bof based on command string name", "bof", files={"bofFile": ["bof.o",open("bof.o", "rb").read()]})
>>> alias5 = tuoni_server.add_alias("bof2", "Alias for bof based on command ID", "2ac58f33-d35e-4afd-a1ac-00e460ceb9f4", files={"bofFile": ["bof.o",open("bof.o", "rb").read()]})
"""
if isinstance(command_type, TuoniDefaultCommand):
command_conf = command_type.command_conf
files = command_type.files
command_type = command_type.command_type
if inspect.isclass(command_type) and issubclass(command_type, TuoniDefaultCommand):
command_type = command_type._class_base_type
json_data = {
"name": name,
"description": description,
"baseTemplate": command_type,
"fixedConfiguration": command_conf
}
alias_data = self.request_post("/api/v1/command-alias", json_data, files = files)
return TuoniAlias(alias_data, self)
[docs]
def load_hosted(self):
"""
Retrieve a list of hosted files.
Returns:
dict: A dictionary containing the details of hosted files.
"""
return self.request_get("/api/v1/files")
[docs]
def add_hosted(self, filename, file_content):
"""
Add a hosted file.
Args:
filename (str): The name of the file to host.
file_content (bytes): The content of the file in bytes.
Returns:
str: The API URI for the uploaded file.
Examples:
>>> tuoni_server.add_hosted("/hosted/file/here.txt", b"HELLO WORLD")
"""
return self.request_post("/api/v1/files", files = {"file": [filename, file_content]})
[docs]
def delete_hosted(self, hosted):
"""
Delete a hosted file.
Returns:
dict: The updated list of hosted files after deletion.
"""
full_uri = hosted
if "/api/v1/files/" not in hosted:
hosted = "/api/v1/files/" + hosted
return self.request_delete(hosted)
[docs]
def load_users(self):
"""
Retrieve a list of users.
Returns:
list[TuoniUser]: A list of users.
"""
all_users = self.request_get("/api/v1/users")
return [TuoniUser(user_data, self) for user_data in all_users]
[docs]
def add_user(self, username, password, authorities):
"""
Add a new user.
Args:
username (str): The username for the new user.
password (str): The initial password for the user.
authorities (list[str]): A list of authorities or roles assigned to the user.
Returns:
TuoniUser: The newly created user.
Examples:
>>> user = tuoni_server.add_user("cool_new_user", "cool_new_password", ["MANAGE_LISTENERS","MANAGE_USERS","SEND_COMMANDS","MANAGE_PAYLOADS","MODIFY_FILES","VIEW_RESOURCES","MANAGE_AGENTS"])
"""
json_data = {
"username": username,
"password": password,
"authorities": authorities
}
user_data = self.request_post("/api/v1/users", json_data, None)
return TuoniUser(user_data, self)
[docs]
def let_it_run(self):
"""
Block execution and wait indefinitely if any callback functions are initialized,
or until all monitoring threads have completed.
"""
for monitoring_thread in self._monitoring_threads:
monitoring_thread.join()