Source code for tuoni.TuoniJob
import time
from tuoni.TuoniEvent import *
from tuoni.TuoniExceptions import ExceptionTuoniDeleted
[docs]
class TuoniJob:
"""
A class that provides data and functionality for a job.
Attributes:
job_id (int): The unique identifier of the job.
name (str): Name of the job.
status (str): Status of the job.
source (dict): Source for the job.
supportedActions (list): What actions are allowed on the job.
openResources (list): What resources are open related to this job.
messages (list): Messages related to job.
createEvent (TuoniEvent): Event about job creation.
lastUpdateEvent (TuoniEvent): Event about last job update.
"""
def __init__(self, conf, c2):
"""
Constructor for the job class.
Args:
conf (dict): Data from the server.
c2 (TuoniC2): The related server object that manages communication.
"""
self.c2 = c2
self._load_conf(conf)
def _load_conf(self, conf):
self.job_id = conf["id"]
self.name = conf["name"]
self.status = conf["status"]
self.source = conf["source"]
self.supportedActions = conf["supportedActions"]
self.openResources = conf["openResources"]
self.messages = conf["messages"]
self.createEvent = TuoniEvent(conf["createEvent"], self.c2) if (conf.get("createEvent") is not None) else None
self.lastUpdateEvent = TuoniEvent(conf["lastUpdateEvent"], self.c2) if (conf.get("lastUpdateEvent") is not None) else None
[docs]
def reload(self):
"""
Reload the job data from the C2 server.
"""
if self.job_id is None:
raise ExceptionTuoniDeleted("")
data = self.c2.request_get(f"/api/v1/job/{self.job_id}")
self._load_conf(data)
[docs]
def restart(self):
"""
Run restart operation on the job.
"""
if self.job_id is None:
raise ExceptionTuoniDeleted("")
self.c2.request_post(f"/api/v1/job/{self.job_id}/restart")
[docs]
def pause(self):
"""
Run pause operation on the job.
"""
if self.job_id is None:
raise ExceptionTuoniDeleted("")
self.c2.request_post(f"/api/v1/job/{self.job_id}/pause")
[docs]
def resume(self):
"""
Run resume operation on the job.
"""
if self.job_id is None:
raise ExceptionTuoniDeleted("")
self.c2.request_post(f"/api/v1/job/{self.job_id}/resume")