diff --git a/services/webapp/code/rosetta/base_app/tasks.py b/services/webapp/code/rosetta/base_app/computing_managers.py similarity index 72% rename from services/webapp/code/rosetta/base_app/tasks.py rename to services/webapp/code/rosetta/base_app/computing_managers.py index bd95299755ddf985b81f3a7a457fb78effcd0f8c..dc797c2ac48286846e6eec956d59854d52136caf 100644 --- a/services/webapp/code/rosetta/base_app/tasks.py +++ b/services/webapp/code/rosetta/base_app/computing_managers.py @@ -10,15 +10,71 @@ logger = logging.getLogger(__name__) TASK_DATA_DIR = "/data" -def start_task(task): +class ComputingManager(object): + + def start_task(self, task, **kwargs): + + # Check for run task logic implementation + try: + self._start_task + except AttributeError: + raise NotImplementedError('Not implemented') + + # Call actual run task logic + self._start_task(task, **kwargs) + + + def stop_task(self, task, **kwargs): + + # Check for stop task logic implementation + try: + self._stop_task + except AttributeError: + raise NotImplementedError('Not implemented') + + # Call actual stop task logic + self._stop_task(task, **kwargs) + + # Ok, save status as deleted + task.status = 'stopped' + task.save() + + # Check if the tunnel is active and if so kill it + logger.debug('Checking if task "{}" has a running tunnel'.format(task.tid)) + check_command = 'ps -ef | grep ":'+str(task.tunnel_port)+':'+str(task.ip)+':'+str(task.port)+'" | grep -v grep | awk \'{print $2}\'' + logger.debug(check_command) + out = os_shell(check_command, capture=True) + logger.debug(out) + if out.exit_code == 0: + logger.debug('Task "{}" has a running tunnel, killing it'.format(task.tid)) + tunnel_pid = out.stdout + # Kill Tunnel command + kill_tunnel_command= 'kill -9 {}'.format(tunnel_pid) + + # Log + logger.debug('Killing tunnel with command: {}'.format(kill_tunnel_command)) + + # Execute + os_shell(kill_tunnel_command, capture=True) + if out.exit_code != 0: + raise Exception(out.stderr) + + + def get_task_log(self, task, **kwargs): + + # Check for get task log logic implementation + try: + self._get_task_log + except AttributeError: + raise NotImplementedError('Not implemented') + + # Call actual get task log logic + return self._get_task_log(task, **kwargs) - # Handle proper config - if task.computing.type == 'local': - # Get our ip address - #import netifaces - #netifaces.ifaddresses('eth0') - #backend_ip = netifaces.ifaddresses('eth0')[netifaces.AF_INET][0]['addr'] +class LocalComputingManager(ComputingManager): + + def _start_task(self, task): # Init run command #--cap-add=NET_ADMIN --cap-add=NET_RAW run_command = 'sudo docker run --network=rosetta_default --name rosetta-task-{}'.format( task.id) @@ -39,8 +95,10 @@ def start_task(task): # Host name, image entry command run_command += ' -h task-{} -d -t {}{}'.format(task.id, registry_string, task.container.image) - # Run the task Debug + # Debug logger.debug('Running new task with command="{}"'.format(run_command)) + + # Run the task out = os_shell(run_command, capture=True) if out.exit_code != 0: raise Exception(out.stderr) @@ -48,7 +106,6 @@ def start_task(task): task_tid = out.stdout logger.debug('Created task with id: "{}"'.format(task_tid)) - # Get task IP address out = os_shell('sudo docker inspect --format \'{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}\' ' + task_tid + ' | tail -n1', capture=True) if out.exit_code != 0: @@ -65,9 +122,34 @@ def start_task(task): task.save() + def _stop_task(self, task): + + # Delete the Docker container + standby_supported = False + if standby_supported: + stop_command = 'sudo docker stop {}'.format(task.tid) + else: + stop_command = 'sudo docker stop {} && sudo docker rm {}'.format(task.tid,task.tid) + + out = os_shell(stop_command, capture=True) + if out.exit_code != 0: + raise Exception(out.stderr) + + def _get_task_log(self, task, **kwargs): + + # View the Docker container log (attach) + view_log_command = 'sudo docker logs {}'.format(task.tid,) + logger.debug(view_log_command) + out = os_shell(view_log_command, capture=True) + if out.exit_code != 0: + raise Exception(out.stderr) + else: + return out.stdout - elif task.computing.type == 'remote': +class RemoteComputingManager(ComputingManager): + + def _start_task(self, task, **kwargs): logger.debug('Starting a remote task "{}"'.format(task.computing)) # Get computing host @@ -143,11 +225,8 @@ def start_task(task): # Save task.save() - elif task.computing.type == 'remoteOLD': - logger.debug('Starting a remote task "{}"'.format(task.computing)) - # Get computing host - host = task.computing.get_conf_param('host') + def _stop_task(self, task, **kwargs): # Get user keys if task.computing.require_user_keys: @@ -155,64 +234,43 @@ def start_task(task): else: raise NotImplementedError('Remote tasks not requiring keys are not yet supported') - # 1) Run the container on the host (non blocking) - - if task.container.type == 'singularity': + # Get computing host + host = task.computing.get_conf_param('host') - # Set pass if any - if task.auth_pass: - authstring = ' export SINGULARITYENV_AUTH_PASS={} && '.format(task.auth_pass) - else: - authstring = '' + # Stop the task remotely + stop_command = 'ssh -i {} -4 -o StrictHostKeyChecking=no {} "kill -9 {}"'.format(user_keys.private_key_file, host, task.pid) + logger.debug(stop_command) + out = os_shell(stop_command, capture=True) + if out.exit_code != 0: + if not 'No such process' in out.stderr: + raise Exception(out.stderr) - run_command = 'ssh -i {} -4 -o StrictHostKeyChecking=no {} '.format(user_keys.private_key_file, host) - run_command += '"export SINGULARITY_NOHTTPS=true && {} '.format(authstring) - run_command += 'exec nohup singularity run --pid --writable-tmpfs --containall --cleanenv ' - - # Set registry - if task.container.registry == 'docker_local': - registry = 'docker://dregistry:5000/' - elif task.container.registry == 'docker_hub': - registry = 'docker://' - else: - raise NotImplementedError('Registry {} not supported'.format(task.container.registry)) - - run_command+='{}{} &> /tmp/{}.log & echo \$!"'.format(registry, task.container.image, task.uuid) - + + def _get_task_log(self, task, **kwargs): + # Get computing host + host = task.computing.get_conf_param('host') + + # Get id_rsa + if task.computing.require_user_keys: + user_keys = Keys.objects.get(user=task.user, default=True) + id_rsa_file = user_keys.private_key_file else: - raise NotImplementedError('Container {} not supported'.format(task.container.type)) + raise NotImplementedError('temote with no keys not yet') - out = os_shell(run_command, capture=True) + # View the Singularity container log + view_log_command = 'ssh -i {} -4 -o StrictHostKeyChecking=no {} "cat /tmp/{}.log"'.format(id_rsa_file, host, task.uuid) + logger.debug(view_log_command) + out = os_shell(view_log_command, capture=True) if out.exit_code != 0: raise Exception(out.stderr) - - # Save pid echoed by the command above - task_pid = out.stdout + else: + return out.stdout - # 2) Simulate the agent (i.e. report container IP and port port) - - # Get task IP address - out = os_shell('sudo docker inspect --format \'{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}\' '+host+' | tail -n1', capture=True) - if out.exit_code != 0: - raise Exception('Error: ' + out.stderr) - task_ip = out.stdout - - # Set fields - task.tid = task.uuid - task.status = TaskStatuses.running - task.ip = task_ip - task.pid = task_pid - task.port = int(task.container.service_ports.split(',')[0]) - - # Save - task.save() - #============================== - # Slurm - #============================== - elif task.computing.type == 'slurm': - +class SlurmComputingManager(ComputingManager): + + def _start_task(self, task, **kwargs): logger.debug('Starting a remote task "{}"'.format(task.computing)) # Get computing host #Key Error ATM @@ -271,67 +329,18 @@ def start_task(task): raise Exception(out.stderr) + def _stop_task(self, task, **kwargs): + raise NotImplementedError('Not implemented') + + + def _get_task_log(self, task, **kwargs): + raise NotImplementedError('Not implemented') + + - else: - raise Exception('Consistency exception: invalid computing resource "{}'.format(task.computing)) -def stop_task(task): - if task.computing.type == 'local': - - # Delete the Docker container - standby_supported = False - if standby_supported: - stop_command = 'sudo docker stop {}'.format(task.tid) - else: - stop_command = 'sudo docker stop {} && sudo docker rm {}'.format(task.tid,task.tid) - - out = os_shell(stop_command, capture=True) - if out.exit_code != 0: - raise Exception(out.stderr) - - elif task.computing.type == 'remote': - # Get user keys - if task.computing.require_user_keys: - user_keys = Keys.objects.get(user=task.user, default=True) - else: - raise NotImplementedError('Remote tasks not requiring keys are not yet supported') - # Get computing host - host = task.computing.get_conf_param('host') - # Stop the task remotely - stop_command = 'ssh -i {} -4 -o StrictHostKeyChecking=no {} "kill -9 {}"'.format(user_keys.private_key_file, host, task.pid) - logger.debug(stop_command) - out = os_shell(stop_command, capture=True) - if out.exit_code != 0: - if not 'No such process' in out.stderr: - raise Exception(out.stderr) - else: - raise Exception('Don\'t know how to stop tasks on "{}" computing resource.'.format(task.computing)) - - # Ok, save status as deleted - task.status = 'stopped' - task.save() - - # Check if the tunnel is active and if so kill it - logger.debug('Checking if task "{}" has a running tunnel'.format(task.tid)) - check_command = 'ps -ef | grep ":'+str(task.tunnel_port)+':'+str(task.ip)+':'+str(task.port)+'" | grep -v grep | awk \'{print $2}\'' - logger.debug(check_command) - out = os_shell(check_command, capture=True) - logger.debug(out) - if out.exit_code == 0: - logger.debug('Task "{}" has a running tunnel, killing it'.format(task.tid)) - tunnel_pid = out.stdout - # Kill Tunnel command - kill_tunnel_command= 'kill -9 {}'.format(tunnel_pid) - - # Log - logger.debug('Killing tunnel with command: {}'.format(kill_tunnel_command)) - - # Execute - os_shell(kill_tunnel_command, capture=True) - if out.exit_code != 0: - raise Exception(out.stderr) diff --git a/services/webapp/code/rosetta/base_app/management/commands/base_app_populate.py b/services/webapp/code/rosetta/base_app/management/commands/base_app_populate.py index 68d997f3cbfe050bc372505e3b3ae242b7b714e2..bad2e58bb28083927e812c4e125884276f368e36 100644 --- a/services/webapp/code/rosetta/base_app/management/commands/base_app_populate.py +++ b/services/webapp/code/rosetta/base_app/management/commands/base_app_populate.py @@ -108,23 +108,10 @@ class Command(BaseCommand): #============================== - # Demo remote computing - #============================== - demo_remote_computing = Computing.objects.create(user = None, - name = 'Demo remote', - type = 'remote', - require_sys_conf = True, - require_user_conf = False, - require_user_keys = False) - ComputingSysConf.objects.create(computing = demo_remote_computing, - data = {'host': 'slurmclusterworker-one'}) - - - #============================== - # Demo remote (auth) computing + # Demo remote computing #============================== demo_remote_auth_computing = Computing.objects.create(user = None, - name = 'Demo remote (auth)', + name = 'Demo remote', type = 'remote', require_sys_conf = True, require_user_conf = True, diff --git a/services/webapp/code/rosetta/base_app/models.py b/services/webapp/code/rosetta/base_app/models.py index f32c7cfd0b03dfc34cfb2e81c4b54002d9bbd3da..263750bb11c873b7aae9bab1cd5d74fc1caa3aa2 100644 --- a/services/webapp/code/rosetta/base_app/models.py +++ b/services/webapp/code/rosetta/base_app/models.py @@ -200,6 +200,12 @@ class Computing(models.Model): param_value = self.user_conf_data[param] return param_value + @property + def manager(self): + from . import computing_managers + ComputingManager = getattr(computing_managers, '{}ComputingManager'.format(self.type.title())) + return ComputingManager() + class ComputingSysConf(models.Model): uuid = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False) diff --git a/services/webapp/code/rosetta/base_app/templates/account.html b/services/webapp/code/rosetta/base_app/templates/account.html index 918cc0cff8b7624864396ab07bdaa3b7b908fc56..865dbac83fb77c993eea8d26e082b2430932a7da 100644 --- a/services/webapp/code/rosetta/base_app/templates/account.html +++ b/services/webapp/code/rosetta/base_app/templates/account.html @@ -83,6 +83,20 @@ {% endif %} </td> </tr> + </table> + + <br /> + <h3>Keys</h3> + <table class="dashboard"> + + <tr> + <td valign="top"> + <b>Default public key</b> + </td> + <td> + <pre style="max-width:300px; height:">{{ data.default_public_key }}</pre> + </td> + </tr> </table> diff --git a/services/webapp/code/rosetta/base_app/templates/components/task.html b/services/webapp/code/rosetta/base_app/templates/components/task.html index 2eb422af13c4456c9bd06973dba0a8da5b9bdc75..bbebbdcf863dbe89af9ef5b3700ac4bf334b3575 100644 --- a/services/webapp/code/rosetta/base_app/templates/components/task.html +++ b/services/webapp/code/rosetta/base_app/templates/components/task.html @@ -71,11 +71,7 @@ <font color="#c0c0c0">Stop</font> | {% endif %} - {% if task.status == "exited" or task.status == "stopped" %} <a href="?uuid={{task.uuid}}&action=delete&details=False">Delete</a> - {% else %} - <font color="#c0c0c0">Delete</font> - {% endif %} {% if task.status == "running" %} | <a href="?uuid={{task.uuid}}&action=connect">Connect</a> | <a href="/task_log/?uuid={{task.uuid}}&action=viewlog">View Log</a> diff --git a/services/webapp/code/rosetta/base_app/templates/tasks.html b/services/webapp/code/rosetta/base_app/templates/tasks.html index 65696bc78e88f626f00dcdab85720be7a66a40dd..46e808cddd025993f7afc005ff25c1bece7da111 100644 --- a/services/webapp/code/rosetta/base_app/templates/tasks.html +++ b/services/webapp/code/rosetta/base_app/templates/tasks.html @@ -24,7 +24,7 @@ {% include "components/task.html" with task=task %} {% endfor %} <br /> - <a href="/create_task">Add new...</a> + <a href="/create_task">Create new...</a> {% endif %} <br/> diff --git a/services/webapp/code/rosetta/base_app/views.py b/services/webapp/code/rosetta/base_app/views.py index 2782259f5ff9bac1235eb53e255cc6beb498cef2..3cf6b9062c6998b4ed0a8be8d9ab7deb731a751e 100644 --- a/services/webapp/code/rosetta/base_app/views.py +++ b/services/webapp/code/rosetta/base_app/views.py @@ -9,7 +9,6 @@ from django.shortcuts import redirect from .models import Profile, LoginToken, Task, TaskStatuses, Container, Computing, Keys from .utils import send_email, format_exception, timezonize, os_shell, booleanize, debug_param from .decorators import public_view, private_view -from .tasks import start_task, stop_task from .exceptions import ErrorMessage # Setup logging @@ -180,6 +179,10 @@ def account(request): if edit and edit.upper() == 'NONE': edit = None + # Set data.default_public_key + with open(Keys.objects.get(user=request.user, default=True).public_key_file) as f: + data['default_public_key'] = f.read() + # Edit values if edit and value: try: @@ -269,8 +272,10 @@ def tasks(request): if action=='delete': if task.status not in [TaskStatuses.stopped, TaskStatuses.exited]: - data['error'] = 'Can delete only tasks in the stopped state' - return render(request, 'error.html', {'data': data}) + try: + task.computing.manager.stop_task(task) + except: + pass try: # Get the task (raises if none available including no permission) task = Task.objects.get(user=request.user, uuid=uuid) @@ -287,57 +292,51 @@ def tasks(request): return render(request, 'error.html', {'data': data}) elif action=='stop': # or delete,a and if delete also remove object - stop_task(task) + task.computing.manager.stop_task(task) elif action=='connect': - - # Create task tunnel - if task.computing.type in ['local', 'remote', 'slurm']: - - # If there is no tunnel port allocated yet, find one - if not task.tunnel_port: - - # Get a free port fot the tunnel: - allocated_tunnel_ports = [] - for other_task in Task.objects.all(): - if other_task.tunnel_port and not other_task.status in [TaskStatuses.exited, TaskStatuses.stopped]: - allocated_tunnel_ports.append(other_task.tunnel_port) - - for port in range(7000, 7006): - if not port in allocated_tunnel_ports: - tunnel_port = port - break - if not tunnel_port: - logger.error('Cannot find a free port for the tunnel for task "{}"'.format(task.tid)) - raise ErrorMessage('Cannot find a free port for the tunnel to the task') - - task.tunnel_port = tunnel_port - task.save() - - - # Check if the tunnel is active and if not create it - logger.debug('Checking if task "{}" has a running tunnel'.format(task.tid)) - - out = os_shell('ps -ef | grep ":{}:{}:{}" | grep -v grep'.format(task.tunnel_port, task.ip, task.port), capture=True) - - if out.exit_code == 0: - logger.debug('Task "{}" has a running tunnel, using it'.format(task.tid)) - else: - logger.debug('Task "{}" has no running tunnel, creating it'.format(task.tid)) - - # Tunnel command - tunnel_command= 'ssh -4 -o StrictHostKeyChecking=no -nNT -L 0.0.0.0:{}:{}:{} localhost & '.format(task.tunnel_port, task.ip, task.port) - background_tunnel_command = 'nohup {} >/dev/null 2>&1 &'.format(tunnel_command) - - # Log - logger.debug('Opening tunnel with command: {}'.format(background_tunnel_command)) - - # Execute - subprocess.Popen(background_tunnel_command, shell=True) - + + # If there is no tunnel port allocated yet, find one + if not task.tunnel_port: + + # Get a free port fot the tunnel: + allocated_tunnel_ports = [] + for other_task in Task.objects.all(): + if other_task.tunnel_port and not other_task.status in [TaskStatuses.exited, TaskStatuses.stopped]: + allocated_tunnel_ports.append(other_task.tunnel_port) + + for port in range(7000, 7006): + if not port in allocated_tunnel_ports: + tunnel_port = port + break + if not tunnel_port: + logger.error('Cannot find a free port for the tunnel for task "{}"'.format(task.tid)) + raise ErrorMessage('Cannot find a free port for the tunnel to the task') + + task.tunnel_port = tunnel_port + task.save() + + + # Check if the tunnel is active and if not create it + logger.debug('Checking if task "{}" has a running tunnel'.format(task.tid)) + + out = os_shell('ps -ef | grep ":{}:{}:{}" | grep -v grep'.format(task.tunnel_port, task.ip, task.port), capture=True) + + if out.exit_code == 0: + logger.debug('Task "{}" has a running tunnel, using it'.format(task.tid)) else: - raise ErrorMessage('Connecting to tasks on computing "{}" is not supported yet'.format(task.computing)) - + logger.debug('Task "{}" has no running tunnel, creating it'.format(task.tid)) + + # Tunnel command + tunnel_command= 'ssh -4 -o StrictHostKeyChecking=no -nNT -L 0.0.0.0:{}:{}:{} localhost & '.format(task.tunnel_port, task.ip, task.port) + background_tunnel_command = 'nohup {} >/dev/null 2>&1 &'.format(tunnel_command) + + # Log + logger.debug('Opening tunnel with command: {}'.format(background_tunnel_command)) + + # Execute + subprocess.Popen(background_tunnel_command, shell=True) + # Ok, now redirect to the task through the tunnel return redirect('http://localhost:{}'.format(task.tunnel_port)) @@ -462,7 +461,11 @@ def create_task(request): task.computing.attach_user_conf_data(task.user) # Start the task - start_task(task) + #try: + task.computing.manager.start_task(task) + #except: + # task.delete() + # raise # Set step data['step'] = 'created' @@ -510,41 +513,7 @@ def task_log(request): # Get the log try: - if task.computing.type == 'local': - - # View the Docker container log (attach) - view_log_command = 'sudo docker logs {}'.format(task.tid,) - logger.debug(view_log_command) - out = os_shell(view_log_command, capture=True) - if out.exit_code != 0: - raise Exception(out.stderr) - else: - data['log'] = out.stdout - - elif task.computing.type == 'remote': - - # Get computing host - host = task.computing.get_conf_param('host') - - # Get id_rsa - if task.computing.require_user_keys: - user_keys = Keys.objects.get(user=task.user, default=True) - id_rsa_file = user_keys.private_key_file - else: - raise NotImplementedError('temote with no keys not yet') - - # View the Singularity container log - view_log_command = 'ssh -i {} -4 -o StrictHostKeyChecking=no {} "cat /tmp/{}.log"'.format(id_rsa_file, host, task.uuid) - logger.debug(view_log_command) - out = os_shell(view_log_command, capture=True) - if out.exit_code != 0: - raise Exception(out.stderr) - else: - data['log'] = out.stdout - - else: - data['error']= 'Don\'t know how to view task logs on "{}" computing resource.'.format(task.computing) - return render(request, 'error.html', {'data': data}) + data['log'] = task.computing.manager.get_task_log(task) except Exception as e: data['error'] = 'Error in viewing task log'