From 849ab494d89b9c475b265848187ceb673e3044f9 Mon Sep 17 00:00:00 2001 From: Stefano Alberto Russo <stefano.russo@gmail.com> Date: Mon, 20 Jan 2020 16:22:14 +0100 Subject: [PATCH] Added support for running tasks on remote compute elements over ssh. Fixes in the stop/delete task clycle management. --- images/webapp/code/rosetta/base_app/models.py | 12 +- .../base_app/templates/create_task.html | 7 +- .../rosetta/base_app/templates/tasks.html | 23 +- images/webapp/code/rosetta/base_app/views.py | 197 +++++++++++++----- 4 files changed, 174 insertions(+), 65 deletions(-) diff --git a/images/webapp/code/rosetta/base_app/models.py b/images/webapp/code/rosetta/base_app/models.py index 14ba06c..8cf6770 100644 --- a/images/webapp/code/rosetta/base_app/models.py +++ b/images/webapp/code/rosetta/base_app/models.py @@ -59,6 +59,9 @@ class Task(models.Model): status = models.CharField('Task status', max_length=36, blank=True, null=True) created = models.DateTimeField('Created on', default=timezone.now) compute = models.CharField('Task compute', max_length=36, blank=True, null=True) + pid = models.IntegerField('Task pid', blank=True, null=True) + port = models.IntegerField('Task port', blank=True, null=True) + ip = models.CharField('Task ip address', max_length=36, blank=True, null=True) tunnel_port = models.IntegerField('Task tunnel port', blank=True, null=True) def save(self, *args, **kwargs): @@ -75,14 +78,7 @@ class Task(models.Model): def __str__(self): return str('Task "{}" of user "{}" in status "{}" (TID "{}")'.format(self.name, self.user.email, self.status, self.tid)) - @property - def ip_addr(self): - # TODO: if self.computing (or self.type) == "local": - out = os_shell('sudo docker inspect --format \'{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}\' ' + self.tid + ' | tail -n1', capture=True) - if out.exit_code != 0: - raise Exception('Error: ' + out.stderr) - return out.stdout - + def update_status(self): if self.compute == 'local': diff --git a/images/webapp/code/rosetta/base_app/templates/create_task.html b/images/webapp/code/rosetta/base_app/templates/create_task.html index 7c56e65..c3bf5cf 100644 --- a/images/webapp/code/rosetta/base_app/templates/create_task.html +++ b/images/webapp/code/rosetta/base_app/templates/create_task.html @@ -56,8 +56,9 @@ <tr> <td><b>Computing resource</b></td><td> - <select name="computing" > - <option value="builtin" selected>Local</option> + <select name="compute" > + <option value="local" selected>Local</option> + <option value="demoremote">Demo remote</option> <option value="demoslurm">Demo Slurm cluster</option> </select> </td> @@ -65,7 +66,7 @@ <tr><td colspan=2> <table><tr><td style="border: 1px solid lightgray;" > - I understand that files saved or modified and not explicity saved to a persistent share, including system packages, will be LOST when the task ends. + I understand that files saved or modified and not explicitly saved to a persistent share, including system packages, will be LOST when the task ends. </td><td style="border: 1px solid lightgray;" > <input class="form-check-input" type="checkbox" value="" id="invalidCheck" required> </td></table> diff --git a/images/webapp/code/rosetta/base_app/templates/tasks.html b/images/webapp/code/rosetta/base_app/templates/tasks.html index 6db301d..25e9145 100644 --- a/images/webapp/code/rosetta/base_app/templates/tasks.html +++ b/images/webapp/code/rosetta/base_app/templates/tasks.html @@ -44,6 +44,21 @@ <td><b>Task created</b></td> <td>{{ task.created }}</td> </tr> + + <tr> + <td><b>Task pid</b></td> + <td>{{ task.pid}}</td> + </tr> + + <tr> + <td><b>Task ip</b></td> + <td>{{ task.ip}}</td> + </tr> + + <tr> + <td><b>Task port</b></td> + <td>{{ task.port }}</td> + </tr> <tr> <td><b>Task tunnel port</b></td> @@ -54,14 +69,18 @@ <td><b>Operations</b></td> <td> - {% if task.status == "Running" %} + {% if task.status == "running" %} <a href=?uuid={{task.uuid}}&action=stop>Stop</a> | {% else %} <!-- <a href=?uuid={{task.uuid}}&action=start>Start</a> | --> <font color="#c0c0c0">Stop</font> | - {% endif%} + {% endif %} + {% if task.status == "exited" or task.status == "stopped" %} <a href=?uuid={{task.uuid}}&action=delete>Delete</a> + {% else %} + <font color="#c0c0c0">Delete</font> + {% endif %} {% if task.status == "running" %} | <a href=?uuid={{task.uuid}}&action=connect>Connect</a> {% else %} diff --git a/images/webapp/code/rosetta/base_app/views.py b/images/webapp/code/rosetta/base_app/views.py index 2e036f7..45dae1f 100644 --- a/images/webapp/code/rosetta/base_app/views.py +++ b/images/webapp/code/rosetta/base_app/views.py @@ -388,18 +388,19 @@ def tasks(request): # Perform actions if required: if action and uuid: + + # Get the task (raises if none available including no permission) + task = Task.objects.get(user=request.user, uuid=uuid) + if action=='delete': + if task.status not in [TaskStatuses.stopped, TaskStatuses.exited]: + data['error'] = 'Can delete only tasks in the stopped state' + return render(request, 'error.html', {'data': data}) try: # Get the task (raises if none available including no permission) task = Task.objects.get(user=request.user, uuid=uuid) - # Delete the Docker container - delete_command = 'sudo docker stop {} && sudo docker rm {}'.format(task.tid,task.tid) - out = os_shell(delete_command, capture=True) - if out.exit_code != 0: - logger.error('Error when removing Docker container for task "{}": "{}"'.format(task.tid, out.stderr)) - - # Ok, delete + # Delete task.delete() # Unset uuid to load the list again @@ -412,25 +413,57 @@ def tasks(request): elif action=='stop': # or delete,a and if delete also remove object try: - # Get the task (raises if none available including no permission) - task = Task.objects.get(user=request.user, uuid=uuid) - str_shortuuid = task.uuid.split('-')[0] - - # Delete the Docker container - if standby_supported: - stop_command = 'sudo docker stop {}'.format(task.tid) + if task.compute == 'local': + str_shortuuid = task.uuid.split('-')[0] + + # Delete the Docker container + if standby_supported: + stop_command = 'sudo docker stop {}'.format(task.tid) + else: + stop_command = 'sudo docker stop {} && sudo docker rm {}'.format(task.tid,task.tid) + + out = os_shell(stop_command, capture=True) + if out.exit_code != 0: + raise Exception(out.stderr) + + elif task.compute == 'demoremote': + + # Stop the task remotely + stop_command = 'ssh -4 -o StrictHostKeyChecking=no slurmclusterworker-one "kill -9 {}"'.format(task.pid) + logger.debug(stop_command) + out = os_shell(stop_command, capture=True) + if out.exit_code != 0: + raise Exception(out.stderr) + else: - stop_command = 'sudo docker stop {} && sudo docker rm {}'.format(task.tid,task.tid) - - out = os_shell(stop_command, capture=True) - if out.exit_code != 0: - raise Exception(out.stderr) + data['error']= 'Don\'t know how to stop tasks on "{}" compute resource.'.format(task.compute) + return render(request, 'error.html', {'data': data}) - # Ok, delete - task.status = 'Stopped' + # Ok, save status as deleted + task.status = 'stopped' task.save() + # Check if the tunnel is active and if so kill it + logger.debug('Checking if task "{}" has a running tunnel'.format(task.tid)) + check_command = 'ps -ef | grep ":'+str(task.tunnel_port)+':'+str(task.ip)+':'+str(task.port)+'" | grep -v grep | awk \'{print $2}\'' + logger.debug(check_command) + out = os_shell(check_command, capture=True) + logger.debug(out) + if out.exit_code == 0: + logger.debug('Task "{}" has a running tunnel, killing it'.format(task.tid)) + tunnel_pid = out.stdout + # Kill Tunnel command + kill_tunnel_command= 'kill -9 {}'.format(tunnel_pid) + + # Log + logger.debug('Killing tunnel with command: {}'.format(kill_tunnel_command)) + + # Execute + os_shell(kill_tunnel_command, capture=True) + if out.exit_code != 0: + raise Exception(out.stderr) + except Exception as e: data['error'] = 'Error in stopping the task' logger.error('Error in stopping task with uuid="{}": "{}"'.format(uuid, e)) @@ -445,7 +478,7 @@ def tasks(request): task = Task.objects.get(user=request.user, uuid=uuid) # Create task tunnel - if task.compute=='local': + if task.compute in ['local', 'demoremote']: # If there is no tunnel port allocated yet, find one if not task.tunnel_port: @@ -471,7 +504,7 @@ def tasks(request): # Check if the tunnel is active and if not create it logger.debug('Checking if task "{}" has a running tunnel'.format(task.tid)) - out = os_shell('ps -ef | grep ":{}:{}:8590" | grep -v grep'.format(task.tunnel_port, task.ip_addr), capture=True) + out = os_shell('ps -ef | grep ":{}:{}:{}" | grep -v grep'.format(task.tunnel_port, task.ip, task.port), capture=True) if out.exit_code == 0: logger.debug('Task "{}" has a running tunnel, using it'.format(task.tid)) @@ -479,7 +512,7 @@ def tasks(request): logger.debug('Task "{}" has no running tunnel, creating it'.format(task.tid)) # Tunnel command - tunnel_command= 'ssh -4 -o StrictHostKeyChecking=no -nNT -L 0.0.0.0:{}:{}:8590 localhost & '.format(task.tunnel_port, task.ip_addr) + tunnel_command= 'ssh -4 -o StrictHostKeyChecking=no -nNT -L 0.0.0.0:{}:{}:{} localhost & '.format(task.tunnel_port, task.ip, task.port) background_tunnel_command = 'nohup {} >/dev/null 2>&1 &'.format(tunnel_command) # Log @@ -547,47 +580,107 @@ def create_task(request): data['error'] = 'No valid task container' return render(request, 'error.html', {'data': data}) + compute = request.POST.get('compute', None) + logger.debug(compute) + + if compute not in ['local', 'demoremote']: + data['error'] = 'Unknown compute resource "{}'.format(compute) + return render(request, 'error.html', {'data': data}) + + # Generate the task uuid + str_uuid = str(uuid.uuid4()) + str_shortuuid = str_uuid.split('-')[0] + + # Create the task object + task = Task.objects.create(uuid = str_uuid, + user = request.user, + name = data['name'], + status = TaskStatuses.created, + container = data['container'], + compute = compute) + + # Actually start tasks try: - - #Generate uuid - str_uuid = str(uuid.uuid4()) - str_shortuuid = str_uuid.split('-')[0] - - # Get our ip address - #import netifaces - #netifaces.ifaddresses('eth0') - #backend_ip = netifaces.ifaddresses('eth0')[netifaces.AF_INET][0]['addr'] + if compute == 'local': + + # Get our ip address + #import netifaces + #netifaces.ifaddresses('eth0') + #backend_ip = netifaces.ifaddresses('eth0')[netifaces.AF_INET][0]['addr'] + + # Init run command #--cap-add=NET_ADMIN --cap-add=NET_RAW + run_command = 'sudo docker run --network=rosetta_default --name rosetta-task-{}'.format( str_shortuuid) + + # Data volume + run_command += ' -v {}/task-{}:/data'.format(TASK_DATA_DIR, str_shortuuid) + + # Host name, image entry command + task_container = 'task-{}'.format(data['container']) + run_command += ' -h task-{} -d -t localhost:5000/rosetta/metadesktop'.format(str_shortuuid, task_container) + + # Create the model + task = Task.objects.create(user=request.user, name=data['name'], status=TaskStatuses.created, container=data['container']) + + # Run the task Debug + logger.debug('Running new task with command="{}"'.format(run_command)) + out = os_shell(run_command, capture=True) + if out.exit_code != 0: + raise Exception(out.stderr) + else: + task_tid = out.stdout + logger.debug('Created task with id: "{}"'.format(task_tid)) + - # Init run command #--cap-add=NET_ADMIN --cap-add=NET_RAW - run_command = 'sudo docker run --network=rosetta_default --name rosetta-task-{}'.format( str_shortuuid) + # Get task IP address + out = os_shell('sudo docker inspect --format \'{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}\' ' + task_tid + ' | tail -n1', capture=True) + if out.exit_code != 0: + raise Exception('Error: ' + out.stderr) + task_ip = out.stdout + + # Set fields + task.tid = task_tid + task.status = TaskStatuses.running + task.ip = task_ip + task.port = 8590 + + # Save + task.save() - # Data volume - run_command += ' -v {}/task-{}:/data'.format(TASK_DATA_DIR, str_shortuuid) + elif compute == 'demoremote': + logger.debug('Using Demo Remote as compute resource') - # Host name, image entry command - task_container = 'task-{}'.format(data['container']) - run_command += ' -h task-{} -d -t localhost:5000/rosetta/metadesktop'.format(str_shortuuid, task_container) - # Create the model - task = Task.objects.create(user=request.user, name=data['name'], status=TaskStatuses.created, container=data['container']) + # 1) Run the singularity container on slurmclusterworker-one (non blocking) + run_command = 'ssh -4 -o StrictHostKeyChecking=no slurmclusterworker-one "export SINGULARITY_NOHTTPS=true && exec nohup singularity run --pid --writable-tmpfs --containall --cleanenv docker://dregistry:5000/rosetta/metadesktop &> /dev/null & echo \$!"' + out = os_shell(run_command, capture=True) + if out.exit_code != 0: + raise Exception(out.stderr) - # Run the task Debug - logger.debug('Running new task with command="{}"'.format(run_command)) - out = os_shell(run_command, capture=True) - if out.exit_code != 0: - raise Exception(out.stderr) - else: - logger.debug('Created task with id: "{}"'.format(out.stdout)) + # Save pid echoed by the command above + task_pid = out.stdout + + # 2) Simulate the agent (i.e. report container IP and port port) + + # Get task IP address + out = os_shell('sudo docker inspect --format \'{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}\' slurmclusterworker-one | tail -n1', capture=True) + if out.exit_code != 0: + raise Exception('Error: ' + out.stderr) + task_ip = out.stdout # Set fields - task.uuid = str_uuid - task.tid = out.stdout - task.compute = 'local' + task.tid = task.uuid task.status = TaskStatuses.running + task.ip = task_ip + task.pid = task_pid + task.port = 8590 # Save task.save() + + + else: + raise Exception('Consistency exception: invalid compute resource "{}'.format(compute)) except Exception as e: data['error'] = 'Error in creating new Task.' -- GitLab