Plugins and Protocol¶
In the previous chapter we created a Sandbox with a minimal set of functionalities.
As the number of features grows, so does the complexity of the system. SEE has been designed to contain the development cost providing high flexibility and promoting code re-usability.
As shown previously, the Context allows to subscribe event handlers and trigger Events. This is commonly referred as the Observer pattern.
Rather than adding additional features, let’s try to encapsulate the ones we already provided in a re-usable module.
The Plugins¶
If the Context is an observable, the Hooks or plugins play the observer’s role. They can register their handlers to the desired Events using them as a mean of synchronisation.
The Environment class accepts an arbitrary amount of Hooks. Once initialized, each Hook receives a reference to the Context and its own specific configuration.
Here follows as an example the VNC Hook. To make it more generic, we allow the User to configure at which Event to open the VNC connection.
"""VNC Hook.
Opens a VNC connection with the Guest at the given event.
Configuration:
{
"start_vnc": "event_at_which_starting_vnc"
}
"""
import subprocess
from see import Hook
class VNCHook(Hook):
def __init__(self, parameters):
super().__init__(parameters)
if 'start_vnc' in self.configuration:
self.context.subscribe_async(self.configuration['start_vnc'], self.vnc_handler)
def vnc_handler(self, event):
self.logger.info("Event %s: starting VNC connection.", event)
command = ('virt-viewer', '--connect', 'qemu:///system', self.identifier)
subprocess.call(command)
The Protocol¶
In the natural sciences a protocol is a predefined written procedural method in the design and implementation of experiments. When writing test cases to analyse unknown samples, we define a protocol as a sequence of instructions affecting the Context and leading to several Events to be triggered.
With a well defined set of protocols and a collection of independent plugins, we can assemble a vaste amount of test cases without the need of writing any specific code.
The protocol will act on the Context which will be observed by the Hooks and the Events will be the transport mechanism for such protocol.
TIMEOUT = 60
RUNTIME = 60
def protocol(context, sample_path, execution_command):
context.poweron()
wait_for_ip_address(context, TIMEOUT)
context.trigger('run_sample', sample=sample_path, command=execution_command)
time.sleep(RUNTIME)
context.pause()
context.trigger('snapshots_capture')
context.resume()
context.shutdown()
context.trigger('start_analysis')
context.trigger('wait_analysis')
def wait_for_ip_address(context, timeout):
timestamp = time.time()
while time.time() - timestamp < timeout:
if context.ip_address is not None:
context.trigger('ip_address', address=context.ip_address)
return
time.sleep(1)
raise TimeoutError("Waiting for IP address")
The above example is pretty simple to understand. After powering on the Sandbox, we wait for its IP address to be ready. We then inject the sample and let it run for a given amount of time. We notify the plugins that the VM is about to be shut down letting them capture any necessary information. Once powered off the VM, we proceed analysing the gathered information.
The Event sequence is the following.
Triggered by the Context.poweron method:
- pre_poweron
- post_poweron
Triggered by the wait_for_ip_address function once the IP address is available:
- ip_address
Triggered in order to start start the sample:
- run_sample
Triggered by the Context.pause method:
- pre_pause
- post_pause
Triggered in order to take snapshots of the virtual machine state:
- snapshots_capture
Triggered by the Context.resume methods:
- pre_resume
- post_resume
Triggered by the Context.shutdown method:
- pre_shutdown
- post_shutdown
Triggered in order to start analysis plugins:
- start_analysis
- wait_analysis
To conclude the chapter, we show the new script. The sample path, its execution command as well as the Hooks configuration path have been parametrised.
Refer to the Documetation to configure the Hooks.
#!/usr/bin/env python3
import time
import argparse
from see import Environment
from see.context import QEMUContextFactory
TIMEOUT = 60
RUNTIME = 60
def main():
arguments = parse_arguments()
context_factory = QEMUContextFactory(arguments.context)
with Environment(context_factory, arguments.hooks) as environment:
protocol(environment.context, arguments.sample, arguments.command)
def protocol(context, sample_path, execution_command):
context.poweron()
wait_for_ip_address(context, TIMEOUT)
context.trigger('run_sample', sample=sample_path, command=execution_command)
time.sleep(RUNTIME)
context.trigger('snapshots_capture')
context.poweroff()
context.trigger('start_analysis')
context.trigger('wait_analysis')
def wait_for_ip_address(context, timeout):
timestamp = time.time()
while time.time() - timestamp < timeout:
if context.ip4_address is not None:
context.trigger('ip_address', address=context.ip4_address)
return
raise TimeoutError("Waiting for IP address")
def parse_arguments():
parser = argparse.ArgumentParser(description='Execute a sample within a Sandbox.')
parser.add_argument('context', help='path to Context JSON configuration')
parser.add_argument('sample', help='path to Sample to execute')
parser.add_argument('-k', '--hooks', default={}, help='path to Hooks JSON configuration')
parser.add_argument('-c', '--command', default='start {sample}',
help="""command used to start the sample.
The string {sample} will be expanded to the actual file name within the guest.
Example: 'notepad.exe {sample}'""")
return parser.parse_args()
if __name__ == '__main__':
main()