Skip to content
Snippets Groups Projects
Commit 463bac9c authored by Stefano Alberto Russo's avatar Stefano Alberto Russo
Browse files

Moved to computing managers. minor fixes.

parent 37f9580d
Branches
Tags
No related merge requests found
......@@ -10,15 +10,71 @@ logger = logging.getLogger(__name__)
TASK_DATA_DIR = "/data"
def start_task(task):
class ComputingManager(object):
def start_task(self, task, **kwargs):
# Check for run task logic implementation
try:
self._start_task
except AttributeError:
raise NotImplementedError('Not implemented')
# Call actual run task logic
self._start_task(task, **kwargs)
def stop_task(self, task, **kwargs):
# Check for stop task logic implementation
try:
self._stop_task
except AttributeError:
raise NotImplementedError('Not implemented')
# Call actual stop task logic
self._stop_task(task, **kwargs)
# Ok, save status as deleted
task.status = 'stopped'
task.save()
# Check if the tunnel is active and if so kill it
logger.debug('Checking if task "{}" has a running tunnel'.format(task.tid))
check_command = 'ps -ef | grep ":'+str(task.tunnel_port)+':'+str(task.ip)+':'+str(task.port)+'" | grep -v grep | awk \'{print $2}\''
logger.debug(check_command)
out = os_shell(check_command, capture=True)
logger.debug(out)
if out.exit_code == 0:
logger.debug('Task "{}" has a running tunnel, killing it'.format(task.tid))
tunnel_pid = out.stdout
# Kill Tunnel command
kill_tunnel_command= 'kill -9 {}'.format(tunnel_pid)
# Log
logger.debug('Killing tunnel with command: {}'.format(kill_tunnel_command))
# Execute
os_shell(kill_tunnel_command, capture=True)
if out.exit_code != 0:
raise Exception(out.stderr)
def get_task_log(self, task, **kwargs):
# Check for get task log logic implementation
try:
self._get_task_log
except AttributeError:
raise NotImplementedError('Not implemented')
# Call actual get task log logic
return self._get_task_log(task, **kwargs)
# Handle proper config
if task.computing.type == 'local':
# Get our ip address
#import netifaces
#netifaces.ifaddresses('eth0')
#backend_ip = netifaces.ifaddresses('eth0')[netifaces.AF_INET][0]['addr']
class LocalComputingManager(ComputingManager):
def _start_task(self, task):
# Init run command #--cap-add=NET_ADMIN --cap-add=NET_RAW
run_command = 'sudo docker run --network=rosetta_default --name rosetta-task-{}'.format( task.id)
......@@ -39,8 +95,10 @@ def start_task(task):
# Host name, image entry command
run_command += ' -h task-{} -d -t {}{}'.format(task.id, registry_string, task.container.image)
# Run the task Debug
# Debug
logger.debug('Running new task with command="{}"'.format(run_command))
# Run the task
out = os_shell(run_command, capture=True)
if out.exit_code != 0:
raise Exception(out.stderr)
......@@ -48,7 +106,6 @@ def start_task(task):
task_tid = out.stdout
logger.debug('Created task with id: "{}"'.format(task_tid))
# Get task IP address
out = os_shell('sudo docker inspect --format \'{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}\' ' + task_tid + ' | tail -n1', capture=True)
if out.exit_code != 0:
......@@ -65,9 +122,34 @@ def start_task(task):
task.save()
def _stop_task(self, task):
# Delete the Docker container
standby_supported = False
if standby_supported:
stop_command = 'sudo docker stop {}'.format(task.tid)
else:
stop_command = 'sudo docker stop {} && sudo docker rm {}'.format(task.tid,task.tid)
out = os_shell(stop_command, capture=True)
if out.exit_code != 0:
raise Exception(out.stderr)
def _get_task_log(self, task, **kwargs):
# View the Docker container log (attach)
view_log_command = 'sudo docker logs {}'.format(task.tid,)
logger.debug(view_log_command)
out = os_shell(view_log_command, capture=True)
if out.exit_code != 0:
raise Exception(out.stderr)
else:
return out.stdout
elif task.computing.type == 'remote':
class RemoteComputingManager(ComputingManager):
def _start_task(self, task, **kwargs):
logger.debug('Starting a remote task "{}"'.format(task.computing))
# Get computing host
......@@ -143,11 +225,8 @@ def start_task(task):
# Save
task.save()
elif task.computing.type == 'remoteOLD':
logger.debug('Starting a remote task "{}"'.format(task.computing))
# Get computing host
host = task.computing.get_conf_param('host')
def _stop_task(self, task, **kwargs):
# Get user keys
if task.computing.require_user_keys:
......@@ -155,64 +234,43 @@ def start_task(task):
else:
raise NotImplementedError('Remote tasks not requiring keys are not yet supported')
# 1) Run the container on the host (non blocking)
if task.container.type == 'singularity':
# Get computing host
host = task.computing.get_conf_param('host')
# Set pass if any
if task.auth_pass:
authstring = ' export SINGULARITYENV_AUTH_PASS={} && '.format(task.auth_pass)
else:
authstring = ''
# Stop the task remotely
stop_command = 'ssh -i {} -4 -o StrictHostKeyChecking=no {} "kill -9 {}"'.format(user_keys.private_key_file, host, task.pid)
logger.debug(stop_command)
out = os_shell(stop_command, capture=True)
if out.exit_code != 0:
if not 'No such process' in out.stderr:
raise Exception(out.stderr)
run_command = 'ssh -i {} -4 -o StrictHostKeyChecking=no {} '.format(user_keys.private_key_file, host)
run_command += '"export SINGULARITY_NOHTTPS=true && {} '.format(authstring)
run_command += 'exec nohup singularity run --pid --writable-tmpfs --containall --cleanenv '
# Set registry
if task.container.registry == 'docker_local':
registry = 'docker://dregistry:5000/'
elif task.container.registry == 'docker_hub':
registry = 'docker://'
else:
raise NotImplementedError('Registry {} not supported'.format(task.container.registry))
run_command+='{}{} &> /tmp/{}.log & echo \$!"'.format(registry, task.container.image, task.uuid)
def _get_task_log(self, task, **kwargs):
# Get computing host
host = task.computing.get_conf_param('host')
# Get id_rsa
if task.computing.require_user_keys:
user_keys = Keys.objects.get(user=task.user, default=True)
id_rsa_file = user_keys.private_key_file
else:
raise NotImplementedError('Container {} not supported'.format(task.container.type))
raise NotImplementedError('temote with no keys not yet')
out = os_shell(run_command, capture=True)
# View the Singularity container log
view_log_command = 'ssh -i {} -4 -o StrictHostKeyChecking=no {} "cat /tmp/{}.log"'.format(id_rsa_file, host, task.uuid)
logger.debug(view_log_command)
out = os_shell(view_log_command, capture=True)
if out.exit_code != 0:
raise Exception(out.stderr)
# Save pid echoed by the command above
task_pid = out.stdout
else:
return out.stdout
# 2) Simulate the agent (i.e. report container IP and port port)
# Get task IP address
out = os_shell('sudo docker inspect --format \'{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}\' '+host+' | tail -n1', capture=True)
if out.exit_code != 0:
raise Exception('Error: ' + out.stderr)
task_ip = out.stdout
# Set fields
task.tid = task.uuid
task.status = TaskStatuses.running
task.ip = task_ip
task.pid = task_pid
task.port = int(task.container.service_ports.split(',')[0])
# Save
task.save()
#==============================
# Slurm
#==============================
elif task.computing.type == 'slurm':
class SlurmComputingManager(ComputingManager):
def _start_task(self, task, **kwargs):
logger.debug('Starting a remote task "{}"'.format(task.computing))
# Get computing host #Key Error ATM
......@@ -271,67 +329,18 @@ def start_task(task):
raise Exception(out.stderr)
def _stop_task(self, task, **kwargs):
raise NotImplementedError('Not implemented')
def _get_task_log(self, task, **kwargs):
raise NotImplementedError('Not implemented')
else:
raise Exception('Consistency exception: invalid computing resource "{}'.format(task.computing))
def stop_task(task):
if task.computing.type == 'local':
# Delete the Docker container
standby_supported = False
if standby_supported:
stop_command = 'sudo docker stop {}'.format(task.tid)
else:
stop_command = 'sudo docker stop {} && sudo docker rm {}'.format(task.tid,task.tid)
out = os_shell(stop_command, capture=True)
if out.exit_code != 0:
raise Exception(out.stderr)
elif task.computing.type == 'remote':
# Get user keys
if task.computing.require_user_keys:
user_keys = Keys.objects.get(user=task.user, default=True)
else:
raise NotImplementedError('Remote tasks not requiring keys are not yet supported')
# Get computing host
host = task.computing.get_conf_param('host')
# Stop the task remotely
stop_command = 'ssh -i {} -4 -o StrictHostKeyChecking=no {} "kill -9 {}"'.format(user_keys.private_key_file, host, task.pid)
logger.debug(stop_command)
out = os_shell(stop_command, capture=True)
if out.exit_code != 0:
if not 'No such process' in out.stderr:
raise Exception(out.stderr)
else:
raise Exception('Don\'t know how to stop tasks on "{}" computing resource.'.format(task.computing))
# Ok, save status as deleted
task.status = 'stopped'
task.save()
# Check if the tunnel is active and if so kill it
logger.debug('Checking if task "{}" has a running tunnel'.format(task.tid))
check_command = 'ps -ef | grep ":'+str(task.tunnel_port)+':'+str(task.ip)+':'+str(task.port)+'" | grep -v grep | awk \'{print $2}\''
logger.debug(check_command)
out = os_shell(check_command, capture=True)
logger.debug(out)
if out.exit_code == 0:
logger.debug('Task "{}" has a running tunnel, killing it'.format(task.tid))
tunnel_pid = out.stdout
# Kill Tunnel command
kill_tunnel_command= 'kill -9 {}'.format(tunnel_pid)
# Log
logger.debug('Killing tunnel with command: {}'.format(kill_tunnel_command))
# Execute
os_shell(kill_tunnel_command, capture=True)
if out.exit_code != 0:
raise Exception(out.stderr)
......@@ -108,23 +108,10 @@ class Command(BaseCommand):
#==============================
# Demo remote computing
#==============================
demo_remote_computing = Computing.objects.create(user = None,
name = 'Demo remote',
type = 'remote',
require_sys_conf = True,
require_user_conf = False,
require_user_keys = False)
ComputingSysConf.objects.create(computing = demo_remote_computing,
data = {'host': 'slurmclusterworker-one'})
#==============================
# Demo remote (auth) computing
# Demo remote computing
#==============================
demo_remote_auth_computing = Computing.objects.create(user = None,
name = 'Demo remote (auth)',
name = 'Demo remote',
type = 'remote',
require_sys_conf = True,
require_user_conf = True,
......
......@@ -200,6 +200,12 @@ class Computing(models.Model):
param_value = self.user_conf_data[param]
return param_value
@property
def manager(self):
from . import computing_managers
ComputingManager = getattr(computing_managers, '{}ComputingManager'.format(self.type.title()))
return ComputingManager()
class ComputingSysConf(models.Model):
uuid = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
......
......@@ -83,6 +83,20 @@
{% endif %}
</td>
</tr>
</table>
<br />
<h3>Keys</h3>
<table class="dashboard">
<tr>
<td valign="top">
<b>Default public key</b>
</td>
<td>
<pre style="max-width:300px; height:">{{ data.default_public_key }}</pre>
</td>
</tr>
</table>
......
......@@ -71,11 +71,7 @@
<font color="#c0c0c0">Stop</font> |
{% endif %}
{% if task.status == "exited" or task.status == "stopped" %}
<a href="?uuid={{task.uuid}}&action=delete&details=False">Delete</a>
{% else %}
<font color="#c0c0c0">Delete</font>
{% endif %}
{% if task.status == "running" %}
| <a href="?uuid={{task.uuid}}&action=connect">Connect</a>
| <a href="/task_log/?uuid={{task.uuid}}&action=viewlog">View Log</a>
......
......@@ -24,7 +24,7 @@
{% include "components/task.html" with task=task %}
{% endfor %}
<br />
<a href="/create_task">Add new...</a>
<a href="/create_task">Create new...</a>
{% endif %}
<br/>
......
......@@ -9,7 +9,6 @@ from django.shortcuts import redirect
from .models import Profile, LoginToken, Task, TaskStatuses, Container, Computing, Keys
from .utils import send_email, format_exception, timezonize, os_shell, booleanize, debug_param
from .decorators import public_view, private_view
from .tasks import start_task, stop_task
from .exceptions import ErrorMessage
# Setup logging
......@@ -180,6 +179,10 @@ def account(request):
if edit and edit.upper() == 'NONE':
edit = None
# Set data.default_public_key
with open(Keys.objects.get(user=request.user, default=True).public_key_file) as f:
data['default_public_key'] = f.read()
# Edit values
if edit and value:
try:
......@@ -269,8 +272,10 @@ def tasks(request):
if action=='delete':
if task.status not in [TaskStatuses.stopped, TaskStatuses.exited]:
data['error'] = 'Can delete only tasks in the stopped state'
return render(request, 'error.html', {'data': data})
try:
task.computing.manager.stop_task(task)
except:
pass
try:
# Get the task (raises if none available including no permission)
task = Task.objects.get(user=request.user, uuid=uuid)
......@@ -287,57 +292,51 @@ def tasks(request):
return render(request, 'error.html', {'data': data})
elif action=='stop': # or delete,a and if delete also remove object
stop_task(task)
task.computing.manager.stop_task(task)
elif action=='connect':
# Create task tunnel
if task.computing.type in ['local', 'remote', 'slurm']:
# If there is no tunnel port allocated yet, find one
if not task.tunnel_port:
# Get a free port fot the tunnel:
allocated_tunnel_ports = []
for other_task in Task.objects.all():
if other_task.tunnel_port and not other_task.status in [TaskStatuses.exited, TaskStatuses.stopped]:
allocated_tunnel_ports.append(other_task.tunnel_port)
for port in range(7000, 7006):
if not port in allocated_tunnel_ports:
tunnel_port = port
break
if not tunnel_port:
logger.error('Cannot find a free port for the tunnel for task "{}"'.format(task.tid))
raise ErrorMessage('Cannot find a free port for the tunnel to the task')
task.tunnel_port = tunnel_port
task.save()
# Check if the tunnel is active and if not create it
logger.debug('Checking if task "{}" has a running tunnel'.format(task.tid))
out = os_shell('ps -ef | grep ":{}:{}:{}" | grep -v grep'.format(task.tunnel_port, task.ip, task.port), capture=True)
if out.exit_code == 0:
logger.debug('Task "{}" has a running tunnel, using it'.format(task.tid))
else:
logger.debug('Task "{}" has no running tunnel, creating it'.format(task.tid))
# Tunnel command
tunnel_command= 'ssh -4 -o StrictHostKeyChecking=no -nNT -L 0.0.0.0:{}:{}:{} localhost & '.format(task.tunnel_port, task.ip, task.port)
background_tunnel_command = 'nohup {} >/dev/null 2>&1 &'.format(tunnel_command)
# Log
logger.debug('Opening tunnel with command: {}'.format(background_tunnel_command))
# Execute
subprocess.Popen(background_tunnel_command, shell=True)
# If there is no tunnel port allocated yet, find one
if not task.tunnel_port:
# Get a free port fot the tunnel:
allocated_tunnel_ports = []
for other_task in Task.objects.all():
if other_task.tunnel_port and not other_task.status in [TaskStatuses.exited, TaskStatuses.stopped]:
allocated_tunnel_ports.append(other_task.tunnel_port)
for port in range(7000, 7006):
if not port in allocated_tunnel_ports:
tunnel_port = port
break
if not tunnel_port:
logger.error('Cannot find a free port for the tunnel for task "{}"'.format(task.tid))
raise ErrorMessage('Cannot find a free port for the tunnel to the task')
task.tunnel_port = tunnel_port
task.save()
# Check if the tunnel is active and if not create it
logger.debug('Checking if task "{}" has a running tunnel'.format(task.tid))
out = os_shell('ps -ef | grep ":{}:{}:{}" | grep -v grep'.format(task.tunnel_port, task.ip, task.port), capture=True)
if out.exit_code == 0:
logger.debug('Task "{}" has a running tunnel, using it'.format(task.tid))
else:
raise ErrorMessage('Connecting to tasks on computing "{}" is not supported yet'.format(task.computing))
logger.debug('Task "{}" has no running tunnel, creating it'.format(task.tid))
# Tunnel command
tunnel_command= 'ssh -4 -o StrictHostKeyChecking=no -nNT -L 0.0.0.0:{}:{}:{} localhost & '.format(task.tunnel_port, task.ip, task.port)
background_tunnel_command = 'nohup {} >/dev/null 2>&1 &'.format(tunnel_command)
# Log
logger.debug('Opening tunnel with command: {}'.format(background_tunnel_command))
# Execute
subprocess.Popen(background_tunnel_command, shell=True)
# Ok, now redirect to the task through the tunnel
return redirect('http://localhost:{}'.format(task.tunnel_port))
......@@ -462,7 +461,11 @@ def create_task(request):
task.computing.attach_user_conf_data(task.user)
# Start the task
start_task(task)
#try:
task.computing.manager.start_task(task)
#except:
# task.delete()
# raise
# Set step
data['step'] = 'created'
......@@ -510,41 +513,7 @@ def task_log(request):
# Get the log
try:
if task.computing.type == 'local':
# View the Docker container log (attach)
view_log_command = 'sudo docker logs {}'.format(task.tid,)
logger.debug(view_log_command)
out = os_shell(view_log_command, capture=True)
if out.exit_code != 0:
raise Exception(out.stderr)
else:
data['log'] = out.stdout
elif task.computing.type == 'remote':
# Get computing host
host = task.computing.get_conf_param('host')
# Get id_rsa
if task.computing.require_user_keys:
user_keys = Keys.objects.get(user=task.user, default=True)
id_rsa_file = user_keys.private_key_file
else:
raise NotImplementedError('temote with no keys not yet')
# View the Singularity container log
view_log_command = 'ssh -i {} -4 -o StrictHostKeyChecking=no {} "cat /tmp/{}.log"'.format(id_rsa_file, host, task.uuid)
logger.debug(view_log_command)
out = os_shell(view_log_command, capture=True)
if out.exit_code != 0:
raise Exception(out.stderr)
else:
data['log'] = out.stdout
else:
data['error']= 'Don\'t know how to view task logs on "{}" computing resource.'.format(task.computing)
return render(request, 'error.html', {'data': data})
data['log'] = task.computing.manager.get_task_log(task)
except Exception as e:
data['error'] = 'Error in viewing task log'
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment