From 73c88b8f2150f41ff4db70ef769d626b7aa52806 Mon Sep 17 00:00:00 2001 From: Stefano Alberto Russo <stefano.russo@gmail.com> Date: Wed, 13 May 2020 17:55:09 +0200 Subject: [PATCH] Added hopped, remote computing support. Minor fixes. --- .../rosetta/core_app/computing_managers.py | 160 +++++++++++++++++- .../webapp/code/rosetta/core_app/models.py | 12 ++ .../core_app/templates/add_container.html | 5 +- .../templates/components/computing.html | 2 +- .../templates/components/container.html | 4 +- .../core_app/templates/create_task.html | 10 ++ .../templates/edit_computing_conf.html | 2 +- .../webapp/code/rosetta/core_app/views.py | 28 ++- 8 files changed, 208 insertions(+), 15 deletions(-) diff --git a/services/webapp/code/rosetta/core_app/computing_managers.py b/services/webapp/code/rosetta/core_app/computing_managers.py index 6efb0ad..118ee95 100644 --- a/services/webapp/code/rosetta/core_app/computing_managers.py +++ b/services/webapp/code/rosetta/core_app/computing_managers.py @@ -242,7 +242,6 @@ class RemoteComputingManager(ComputingManager): # Stop the task remotely stop_command = 'ssh -i {} -4 -o StrictHostKeyChecking=no {}@{} \'/bin/bash -c "kill -9 {}"\''.format(user_keys.private_key_file, user, host, task.pid) - logger.debug(stop_command) out = os_shell(stop_command, capture=True) if out.exit_code != 0: if not 'No such process' in out.stderr: @@ -265,7 +264,7 @@ class RemoteComputingManager(ComputingManager): host = task.computing.get_conf_param('host') user = task.computing.get_conf_param('user') - # Stop the task remotely + # View log remotely view_log_command = 'ssh -i {} -4 -o StrictHostKeyChecking=no {}@{} \'/bin/bash -c "cat \$HOME/{}.log"\''.format(user_keys.private_key_file, user, host, task.uuid) out = os_shell(view_log_command, capture=True) @@ -400,7 +399,6 @@ class SlurmComputingManager(ComputingManager): # Stop the task remotely stop_command = 'ssh -i {} -4 -o StrictHostKeyChecking=no {}@{} \'/bin/bash -c "scancel {}"\''.format(user_keys.private_key_file, user, host, task.pid) - logger.debug(stop_command) out = os_shell(stop_command, capture=True) if out.exit_code != 0: raise Exception(out.stderr) @@ -422,7 +420,7 @@ class SlurmComputingManager(ComputingManager): host = task.computing.get_conf_param('master') user = task.computing.get_conf_param('user') - # Stop the task remotely + # View log remotely view_log_command = 'ssh -i {} -4 -o StrictHostKeyChecking=no {}@{} \'/bin/bash -c "cat \$HOME/{}.log"\''.format(user_keys.private_key_file, user, host, task.uuid) out = os_shell(view_log_command, capture=True) @@ -433,6 +431,160 @@ class SlurmComputingManager(ComputingManager): +class RemotehopComputingManager(ComputingManager): + + def _start_task(self, task, **kwargs): + logger.debug('Starting a remote task "{}"'.format(task.computing)) + + # Get computing params + first_host = task.computing.get_conf_param('first_host') + first_user = task.computing.get_conf_param('first_user') + second_host = task.computing.get_conf_param('second_host') + second_user = task.computing.get_conf_param('second_user') + setup_command = task.computing.get_conf_param('setup_command') + + # De hard-code + use_agent = False + + # Get user keys + if task.computing.requires_user_keys: + user_keys = KeyPair.objects.get(user=task.user, default=True) + else: + raise NotImplementedError('Remote tasks not requiring keys are not yet supported') + + # Get webapp conn string + from.utils import get_webapp_conn_string + webapp_conn_string = get_webapp_conn_string() + + # Run the container on the host (non blocking) + if task.container.type == 'singularity': + + task.tid = task.uuid + task.save() + + # Set pass if any + if task.auth_pass: + authstring = ' export SINGULARITYENV_AUTH_PASS={} && '.format(task.auth_pass) + else: + authstring = '' + + run_command = 'ssh -i {} -4 -o StrictHostKeyChecking=no {}@{} '.format(user_keys.private_key_file, first_user, first_host) + run_command += '"ssh -4 -o StrictHostKeyChecking=no {}@{} /bin/bash -c \''.format(second_user, second_host) + + if use_agent: + run_command += '\'wget {}/api/v1/base/agent/?task_uuid={} -O \$HOME/agent_{}.py &> /dev/null && export BASE_PORT=\$(python \$HOME/agent_{}.py 2> \$HOME/{}.log) && '.format(webapp_conn_string, task.uuid, task.uuid, task.uuid, task.uuid) + if setup_command: + run_command += setup_command + ' && ' + run_command += '\'export SINGULARITY_NOHTTPS=true && export SINGULARITYENV_BASE_PORT=\$BASE_PORT && {} '.format(authstring) + run_command += 'exec nohup singularity run --pid --writable-tmpfs --containall --cleanenv ' + else: + run_command += ' : && ' # Trick to prevent some issues in exporting variables + if setup_command: + run_command += setup_command + ' && ' + run_command += 'export SINGULARITY_NOHTTPS=true && export SINGULARITYENV_BASE_PORT={} && {} '.format(task.port, authstring) + run_command += 'exec nohup singularity run --pid --writable-tmpfs --containall --cleanenv ' + + # Set registry + if task.container.registry == 'docker_local': + raise Exception('This computing resource does not support local Docker registries yet') + # Get local Docker registry conn string + from.utils import get_local_docker_registry_conn_string + local_docker_registry_conn_string = get_local_docker_registry_conn_string() + registry = 'docker://{}/'.format(local_docker_registry_conn_string) + elif task.container.registry == 'docker_hub': + registry = 'docker://' + else: + raise NotImplementedError('Registry {} not supported'.format(task.container.registry)) + + run_command+='{}{} &>> \$HOME/{}.log & echo \$!\'"'.format(registry, task.container.image, task.uuid) + + else: + raise NotImplementedError('Container {} not supported'.format(task.container.type)) + + out = os_shell(run_command, capture=True) + if out.exit_code != 0: + raise Exception(out.stderr) + + # Log + logger.debug('Shell exec output: "{}"'.format(out)) + + + # Load back the task to avoid concurrency problems in the agent call + task_uuid = task.uuid + task = Task.objects.get(uuid=task_uuid) + + # Save pid echoed by the command above + task_pid = out.stdout + + # Set fields + task.status = TaskStatuses.running + task.pid = task_pid + task.ip = second_host + + # Save + task.save() + + + def _stop_task(self, task, **kwargs): + + # Get user keys + if task.computing.requires_user_keys: + user_keys = KeyPair.objects.get(user=task.user, default=True) + else: + raise NotImplementedError('Remote tasks not requiring keys are not yet supported') + + # Get computing params + first_host = task.computing.get_conf_param('first_host') + first_user = task.computing.get_conf_param('first_user') + second_host = task.computing.get_conf_param('second_host') + second_user = task.computing.get_conf_param('second_user') + + # Stop the task remotely + stop_command = 'ssh -i {} -4 -o StrictHostKeyChecking=no {}@{} '.format(user_keys.private_key_file, first_user, first_host) + stop_command += '"ssh -4 -o StrictHostKeyChecking=no {}@{} '.format(second_user, second_host) + stop_command += 'kill -9 {}"'.format(task.pid) + + out = os_shell(stop_command, capture=True) + if out.exit_code != 0: + if not 'No such process' in out.stderr: + raise Exception(out.stderr) + + # Set task as stopped + task.status = TaskStatuses.stopped + task.save() + + + def _get_task_log(self, task, **kwargs): + + # Get user keys + if task.computing.requires_user_keys: + user_keys = KeyPair.objects.get(user=task.user, default=True) + else: + raise NotImplementedError('Remote tasks not requiring keys are not yet supported') + + # Get computing params + first_host = task.computing.get_conf_param('first_host') + first_user = task.computing.get_conf_param('first_user') + second_host = task.computing.get_conf_param('second_host') + second_user = task.computing.get_conf_param('second_user') + + # View log remotely + view_log_command = 'ssh -i {} -4 -o StrictHostKeyChecking=no {}@{} '.format(user_keys.private_key_file, first_user, first_host) + view_log_command += '"ssh -4 -o StrictHostKeyChecking=no {}@{} '.format(second_user, second_host) + view_log_command += 'cat \\\\\\$HOME/{}.log"'.format(task.uuid) + + out = os_shell(view_log_command, capture=True) + if out.exit_code != 0: + raise Exception(out.stderr) + else: + return out.stdout + + + + + + + diff --git a/services/webapp/code/rosetta/core_app/models.py b/services/webapp/code/rosetta/core_app/models.py index 0af86c0..c70db45 100644 --- a/services/webapp/code/rosetta/core_app/models.py +++ b/services/webapp/code/rosetta/core_app/models.py @@ -1,4 +1,5 @@ import uuid +import json from django.conf import settings from django.db import models from django.contrib.auth.models import User @@ -158,6 +159,12 @@ class Computing(models.Model): return ComputingSysConf.objects.get(computing=self).data except ComputingSysConf.DoesNotExist: return None + + + @property + def sys_conf_data_json(self): + return json.dumps(self.sys_conf_data) + @property def user_conf_data(self): @@ -166,6 +173,11 @@ class Computing(models.Model): except AttributeError: raise AttributeError('User conf data is not yet attached, please attach it before accessing.') + + @property + def user_conf_data_json(self): + return json.dumps(self.user_conf_data) + def attach_user_conf_data(self, user): if self.user and self.user != user: diff --git a/services/webapp/code/rosetta/core_app/templates/add_container.html b/services/webapp/code/rosetta/core_app/templates/add_container.html index 4ff7f72..2afe149 100644 --- a/services/webapp/code/rosetta/core_app/templates/add_container.html +++ b/services/webapp/code/rosetta/core_app/templates/add_container.html @@ -56,9 +56,8 @@ </tr> <tr> - <td><b>Ports</b></td> - <td> - <input type="text" name="container_ports" value="" placeholder="" size="5" /> + <td colspan=2><b>Default port(s)</b> + <input type="text" name="container_ports" value="" placeholder="" size="5" /> </td> </tr> diff --git a/services/webapp/code/rosetta/core_app/templates/components/computing.html b/services/webapp/code/rosetta/core_app/templates/components/computing.html index 44ef020..f5605df 100644 --- a/services/webapp/code/rosetta/core_app/templates/components/computing.html +++ b/services/webapp/code/rosetta/core_app/templates/components/computing.html @@ -77,7 +77,7 @@ <div style="padding:10px;"> <b>Type:</b> {{ computing.type.title }}<br/> - <b>Owner:</b> {% if computing.user %}{{ data.computing.user }}{% else %}Platform{% endif %}<br/> + <b>Owner:</b> {% if computing.user %}{{ computing.user }}{% else %}Platform{% endif %}<br/> <b>Supports:</b> {% if computing.supports_docker %}Docker <img src="/static/img/docker-logo.svg" style="height:18px; width:18px; margin-bottom:2px" />{% endif %} {% if computing.supports_singularity %}Singularity <img src="/static/img/singularity-logo.svg" style="height:18px; width:18px; margin-bottom:2px" />{% endif %} diff --git a/services/webapp/code/rosetta/core_app/templates/components/container.html b/services/webapp/code/rosetta/core_app/templates/components/container.html index 4422753..d8842b0 100644 --- a/services/webapp/code/rosetta/core_app/templates/components/container.html +++ b/services/webapp/code/rosetta/core_app/templates/components/container.html @@ -40,8 +40,8 @@ </tr> <tr> - <td><b>Ports</b></td> - <td>{{ container.ports }}</td> + <td colspan=2><b>Default port(s)</b> + {{ container.ports }}</td> </tr> <tr> diff --git a/services/webapp/code/rosetta/core_app/templates/create_task.html b/services/webapp/code/rosetta/core_app/templates/create_task.html index 533fff8..f7fb9bc 100644 --- a/services/webapp/code/rosetta/core_app/templates/create_task.html +++ b/services/webapp/code/rosetta/core_app/templates/create_task.html @@ -130,6 +130,16 @@ </tr> {% endif %} + {% if data.task_container.supports_dynamic_ports and data.task_computing.type == 'remotehop' %} + <tr> + <td valign="top" style="width:180px"><b>Set custom port </b></td> + <td> + <input type="text" name="task_base_port" value="" placeholder="" size="23" style="margin-bottom:5px"/><br> + <p style="line-height: 0.9"><font size=-1>This container supports dynamic ports and you can thus set a custom port (>5900) to avoid clashes with services already running on the computing resource.</font></p> + </td> + </tr> + {% endif %} + {% if data.task_container.supports_pass_auth %} <tr> <td valign="top"><b>Set task password</b></td> diff --git a/services/webapp/code/rosetta/core_app/templates/edit_computing_conf.html b/services/webapp/code/rosetta/core_app/templates/edit_computing_conf.html index 82092ba..10e0cb6 100644 --- a/services/webapp/code/rosetta/core_app/templates/edit_computing_conf.html +++ b/services/webapp/code/rosetta/core_app/templates/edit_computing_conf.html @@ -23,7 +23,7 @@ <tr> <td> - <textarea name="new_conf" style="height:300px; width:500px">{{ data.computing_conf_data_json}}</textarea> + <textarea name="new_conf" style="height:300px; width:500px">{{ data.computing_conf_data_json }}</textarea> </td> </tr> diff --git a/services/webapp/code/rosetta/core_app/views.py b/services/webapp/code/rosetta/core_app/views.py index c80aba8..3e710b8 100644 --- a/services/webapp/code/rosetta/core_app/views.py +++ b/services/webapp/code/rosetta/core_app/views.py @@ -393,7 +393,21 @@ def tasks(request): logger.debug('Task "{}" has no running tunnel, creating it'.format(task)) # Tunnel command - tunnel_command= 'ssh -4 -o StrictHostKeyChecking=no -nNT -L 0.0.0.0:{}:{}:{} localhost & '.format(task.tunnel_port, task.ip, task.port) + if task.computing.type == 'remotehop': + + # Get computing params + first_host = task.computing.get_conf_param('first_host') + first_user = task.computing.get_conf_param('first_user') + #second_host = task.computing.get_conf_param('second_host') + #second_user = task.computing.get_conf_param('second_user') + #setup_command = task.computing.get_conf_param('setup_command') + #base_port = task.computing.get_conf_param('base_port') + + tunnel_command= 'ssh -4 -o StrictHostKeyChecking=no -nNT -L 0.0.0.0:{}:{}:{} {}@{} & '.format(task.tunnel_port, task.ip, task.port, first_user, first_host) + + else: + tunnel_command= 'ssh -4 -o StrictHostKeyChecking=no -nNT -L 0.0.0.0:{}:{}:{} localhost & '.format(task.tunnel_port, task.ip, task.port) + background_tunnel_command = 'nohup {} >/dev/null 2>&1 &'.format(tunnel_command) # Log @@ -534,6 +548,10 @@ def create_task(request): task.auth_user = request.POST.get('auth_user', None) task.auth_pass = request.POST.get('auth_password', None) task.access_method = request.POST.get('access_method', None) + task_base_port = request.POST.get('task_base_port', None) + + if task_base_port: + task.port = task_base_port # Cheks if task.auth_pass and len(task.auth_pass) < 6: @@ -811,15 +829,17 @@ def computings(request): data['computing'] = Computing.objects.get(uuid=computing_uuid, user=request.user) except Computing.DoesNotExist: data['computing'] = Computing.objects.get(uuid=computing_uuid, user=None) + + # Attach user conf in any + data['computing'].attach_user_conf_data(request.user) + else: data['computings'] = list(Computing.objects.filter(user=None)) + list(Computing.objects.filter(user=request.user)) + # Attach user conf in any for computing in data['computings']: computing.attach_user_conf_data(request.user) - computing.user_conf_data_json = json.dumps(computing.user_conf_data) - computing.sys_conf_data_json = json.dumps(computing.sys_conf_data) - return render(request, 'computings.html', {'data': data}) -- GitLab