From feb8ef2a842d8be7a6a46c6519fe3c7f79ea1e1e Mon Sep 17 00:00:00 2001
From: Stefano Alberto Russo <stefano.russo@gmail.com>
Date: Fri, 17 Jan 2020 15:21:16 +0100
Subject: [PATCH] Improved support for running tasks on local compute.

---
 images/webapp/code/rosetta/base_app/models.py |  63 ++++++--
 .../base_app/templates/create_task.html       |  16 +-
 .../rosetta/base_app/templates/tasks.html     |  25 +--
 images/webapp/code/rosetta/base_app/views.py  | 146 ++++++++----------
 4 files changed, 131 insertions(+), 119 deletions(-)

diff --git a/images/webapp/code/rosetta/base_app/models.py b/images/webapp/code/rosetta/base_app/models.py
index 92385b0..14ba06c 100644
--- a/images/webapp/code/rosetta/base_app/models.py
+++ b/images/webapp/code/rosetta/base_app/models.py
@@ -1,4 +1,5 @@
 import uuid
+import enum
 
 from django.db import models
 from django.contrib.auth.models import User
@@ -11,6 +12,14 @@ import logging
 logger = logging.getLogger(__name__)
 
 
+# Task statuses
+class TaskStatuses(object):
+    created = 'created'
+    running = 'running'
+    stopped = 'stopped'
+    exited = 'exited'
+
+
 #=========================
 #  Profile 
 #=========================
@@ -42,18 +51,26 @@ class LoginToken(models.Model):
 #  Tasks 
 #=========================
 class Task(models.Model):
-    user     = models.ForeignKey(User, related_name='+', on_delete=models.CASCADE)
-    tid      = models.CharField('Task ID', max_length=64, blank=False, null=False)
-    uuid     = models.CharField('Task UUID', max_length=36, blank=False, null=False)
-    name     = models.CharField('Task name', max_length=36, blank=False, null=False)
-    type     = models.CharField('Task type', max_length=36, blank=False, null=False)
-    status   = models.CharField('Task status', max_length=36, blank=True, null=True)
-    created  = models.DateTimeField('Created on', default=timezone.now)
-    compute  = models.CharField('Task compute', max_length=36, blank=True, null=True)
-
-    tunneled    = models.BooleanField('Task tunneled', default=False)
+    user      = models.ForeignKey(User, related_name='+', on_delete=models.CASCADE)
+    tid       = models.CharField('Task ID', max_length=64, blank=False, null=False)
+    uuid      = models.CharField('Task UUID', max_length=36, blank=False, null=False)
+    name      = models.CharField('Task name', max_length=36, blank=False, null=False)
+    container = models.CharField('Task container', max_length=36, blank=False, null=False)
+    status    = models.CharField('Task status', max_length=36, blank=True, null=True)
+    created   = models.DateTimeField('Created on', default=timezone.now)
+    compute   = models.CharField('Task compute', max_length=36, blank=True, null=True)
     tunnel_port = models.IntegerField('Task tunnel port', blank=True, null=True)
 
+    def save(self, *args, **kwargs):
+        
+        try:
+            getattr(TaskStatuses, str(self.status))
+        except AttributeError:
+            raise Exception('Invalid status "{}"'.format(self.status))
+
+        # Call parent save
+        super(Task, self).save(*args, **kwargs)
+
 
     def __str__(self):
         return str('Task "{}" of user "{}" in status "{}" (TID "{}")'.format(self.name, self.user.email, self.status, self.tid))
@@ -65,8 +82,32 @@ class Task(models.Model):
         if out.exit_code != 0:
             raise Exception('Error: ' + out.stderr)
         return out.stdout
-        
+    
+    def update_status(self):
+        if self.compute == 'local':
+            
+            check_command = 'sudo docker inspect --format \'{{.State.Status}}\' ' + self.tid # or, .State.Running
+            out = os_shell(check_command, capture=True)
+            logger.debug('Status: "{}"'.format(out.stdout))
+            if out.exit_code != 0: 
+                if (('No such' in out.stderr) and (self.tid in out.stderr)):
+                    logger.debug('Task "{}" is not running in reality'.format(self.tid))
+                self.status = TaskStatuses.exited
+            else:
+                if out.stdout == 'running':
+                    self.status = TaskStatuses.running
+                    
+                elif out.stdout == 'exited':
+                    self.status = TaskStatuses.exited
+                    
+                else:
+                    raise Exception('Unknown task status: "{}"'.format(out.stdout))
+                
+            self.save()                   
 
+    @property
+    def short_uuid(self):
+        return self.uuid.split('-')[0]
 
 
 
diff --git a/images/webapp/code/rosetta/base_app/templates/create_task.html b/images/webapp/code/rosetta/base_app/templates/create_task.html
index d4950e8..7c56e65 100644
--- a/images/webapp/code/rosetta/base_app/templates/create_task.html
+++ b/images/webapp/code/rosetta/base_app/templates/create_task.html
@@ -29,17 +29,24 @@
             </td>
            </tr>
 
+           <!--
            <tr>
             <td><b>Task user</b></td>
             <td>
              <input type="text" name="name" value="" placeholder="metauser" size="23" disabled />
             </td>
-           </tr>
+           </tr>-->
+           
+           <!--<tr>
+            <td><b>Task password</b></td>
+            <td>
+             <input type="password" name="password" value="" placeholder="" size="23" disabled />
+            </td>
+           </tr> -->
 
            <tr>
-            <td><b>Task Type</b></td><td>
-              <!-- Dropdown with versions -->
-              <select name="type" >
+            <td><b>Task container</b></td><td>
+              <select name="container" >
               <option value="metadesktop" selected>Meta Desktop</option>
               <option value="astroccok">Astrocook</option>
               <option value="gadgetviewer">Gadget Viewer</option>
@@ -49,7 +56,6 @@
            
            <tr>
             <td><b>Computing resource</b></td><td>
-              <!-- Dropdown with versions -->
               <select name="computing" >
               <option value="builtin" selected>Local</option>
               <option value="demoslurm">Demo Slurm cluster</option>
diff --git a/images/webapp/code/rosetta/base_app/templates/tasks.html b/images/webapp/code/rosetta/base_app/templates/tasks.html
index 9a0b7f8..6db301d 100644
--- a/images/webapp/code/rosetta/base_app/templates/tasks.html
+++ b/images/webapp/code/rosetta/base_app/templates/tasks.html
@@ -21,23 +21,18 @@
           <table class="dashboard">
 
            <tr>
-            <td><b>Task name</b></td>
-            <td>{{ task.name }}</td>
-           </tr>
-
-           <tr>
-            <td><b>Task type</b></td>
-            <td>{{ task.type }}</td>
+            <td><b>Task id</b></td>
+            <td>{{ task.short_uuid }}</td>
            </tr>
 
            <tr>
-            <td><b>Task URL</b></td>
-            <td>https://metabox.online/task/{{ task.shortuuid }}</td>
+            <td><b>Task name</b></td>
+            <td>{{ task.name }}</td>
            </tr>
 
            <tr>
-            <td><b>Task user</b></td>
-            <td>metauser</td>
+            <td><b>Task container</b></td>
+            <td>{{ task.container }}</td>
            </tr>
 
            <tr>
@@ -50,12 +45,6 @@
             <td>{{ task.created }}</td>
            </tr>
            
-           <tr>
-            <td><b>Task tunneled</b></td>
-            <td>{{ task.tunneled }}</td>
-           
-           </tr>
-           
            <tr>
             <td><b>Task tunnel port</b></td>
             <td>{{ task.tunnel_port }}</td>
@@ -73,7 +62,7 @@
             
             {% endif%}
             <a href=?uuid={{task.uuid}}&action=delete>Delete</a>
-            {% if task.status == "Running" %}
+            {% if task.status == "running" %}
              | <a href=?uuid={{task.uuid}}&action=connect>Connect</a>
             {% else %}
              | <font color="#c0c0c0">Connect</font>
diff --git a/images/webapp/code/rosetta/base_app/views.py b/images/webapp/code/rosetta/base_app/views.py
index 6c2a567..2e036f7 100644
--- a/images/webapp/code/rosetta/base_app/views.py
+++ b/images/webapp/code/rosetta/base_app/views.py
@@ -20,7 +20,7 @@ from django.contrib.auth.models import User
 from django.contrib.auth import update_session_auth_hash
 
 # Project imports
-from .models import Profile, LoginToken, Task
+from .models import Profile, LoginToken, Task, TaskStatuses
 from .utils import send_email, format_exception, random_username, log_user_activity, timezonize, os_shell
 
 # Setup logging
@@ -410,7 +410,7 @@ def tasks(request):
                 logger.error('Error in deleting task with uuid="{}": "{}"'.format(uuid, e))
                 return render(request, 'error.html', {'data': data})  
         
-        elif action=='stop': # or delete,a nd if delete also remove object
+        elif action=='stop': # or delete,a and if delete also remove object
             try:
                 # Get the task (raises if none available including no permission)
                 task = Task.objects.get(user=request.user, uuid=uuid)
@@ -445,58 +445,54 @@ def tasks(request):
             task = Task.objects.get(user=request.user, uuid=uuid)
             
             # Create task tunnel
-            if task.tunneled:
-                # If the task is already tunneled, do nothing.
-                pass
-            
-            elif not task.tunneled and task.compute=='local':
-                # 1) Get task IP
-                task_ip_addr = task.ip_addr
-                logger.debug('task_ip_addr="{}"'.format(task_ip_addr))
-
-            
-                # 2) Get a free port fot the tunnel:
-                allocated_tunnel_ports = []
-                for other_task in Task.objects.all():
-                    if other_task.tunneled and other_task.tunnel_port:
-                        allocated_tunnel_ports.append(other_task.tunnel_port)
-                
-                for port in range(7000, 7006):
-                    if not port in allocated_tunnel_ports:
-                        tunnel_port = port
-                        break
-                if not tunnel_port:
-                    logger.error('Cannot find a free port for the tunnel for task "{}"'.format(task.tid))                      
-                    raise ErrorMessage('Cannot find a free port for the tunnel to the task')
-                
-                tunnel_command= 'nohup ssh -4 -o StrictHostKeyChecking=no -nNT -L 0.0.0.0:{}:{}:8590 localhost  &> /dev/null & '.format(tunnel_port, task_ip_addr)
-                
-                logger.debug(tunnel_command)
-                
-                subprocess.Popen(['nohup', 'tunnel_command'],
-                                 stdout=open('/dev/null', 'w'),
-                                 stderr=open('/dev/null', 'w'),
-                                 preexec_fn=os.setpgrp
-                                 )
-
-                task.tunneled=True
-                task.tunnel_port = tunnel_port
-                task.save()
-                
-                
-                
-                #out = os_shell(tunnel_command, capture=True)
-                #if out.exit_code != 0:
-                #    logger.error('Error when creating the tunnel for task "{}": "{}"'.format(task.tid, out.stderr))                      
-                #    raise ErrorMessage('Error when creating the tunnel for task')
-                
+            if task.compute=='local':
                 
+                # If there is no tunnel port allocated yet, find one                
+                if not task.tunnel_port:
+
+                    # Get a free port fot the tunnel:
+                    allocated_tunnel_ports = []
+                    for other_task in Task.objects.all():
+                        if other_task.tunnel_port and not other_task.status in [TaskStatuses.exited, TaskStatuses.stopped]:
+                            allocated_tunnel_ports.append(other_task.tunnel_port)
+                    
+                    for port in range(7000, 7006):
+                        if not port in allocated_tunnel_ports:
+                            tunnel_port = port
+                            break
+                    if not tunnel_port:
+                        logger.error('Cannot find a free port for the tunnel for task "{}"'.format(task.tid))                      
+                        raise ErrorMessage('Cannot find a free port for the tunnel to the task')
+
+                    task.tunnel_port = tunnel_port
+                    task.save()
+
+
+                # Check if the tunnel is active and if not create it
+                logger.debug('Checking if task "{}" has a running tunnel'.format(task.tid))
                 
+                out = os_shell('ps -ef | grep ":{}:{}:8590" | grep -v grep'.format(task.tunnel_port, task.ip_addr), capture=True)
+
+                if out.exit_code == 0:
+                    logger.debug('Task "{}" has a running tunnel, using it'.format(task.tid))
+                else:
+                    logger.debug('Task "{}" has no running tunnel, creating it'.format(task.tid))
+                    
+                    # Tunnel command
+                    tunnel_command= 'ssh -4 -o StrictHostKeyChecking=no -nNT -L 0.0.0.0:{}:{}:8590 localhost & '.format(task.tunnel_port, task.ip_addr)
+                    background_tunnel_command = 'nohup {} >/dev/null 2>&1 &'.format(tunnel_command)
+                    
+                    # Log
+                    logger.debug('Opening tunnel with command: {}'.format(background_tunnel_command))
+
+                    # Execute
+                    subprocess.Popen(background_tunnel_command, shell=True)
+                   
             else:
                 raise ErrorMessage('Connecting to tasks on compute "{}" is not supported yet'.format(task.compute))
 
 
-            # Ok, now redirect
+            # Ok, now redirect to the task through the tunnel
             from django.shortcuts import redirect
             return redirect('http://localhost:{}'.format(task.tunnel_port))
 
@@ -516,6 +512,10 @@ def tasks(request):
             logger.error('Error in getting Virtual Devices: "{}"'.format(e))
             return render(request, 'error.html', {'data': data})   
 
+    # Update task statuses
+    for task in tasks:
+        task.update_status()
+
     data['tasks'] = tasks
 
     return render(request, 'tasks.html', {'data': data})
@@ -538,13 +538,13 @@ def create_task(request):
     if data['name']:
         
         # Type
-        data['type'] = request.POST.get('type', None)
-        if not data['type']:
-            data['error'] = 'No type given'
+        data['container'] = request.POST.get('container', None)
+        if not data['container']:
+            data['error'] = 'No container given'
             return render(request, 'error.html', {'data': data})
 
-        if not data['type'] in SUPPORTED_TASK_TYPES:
-            data['error'] = 'No valid task type'
+        if not data['container'] in SUPPORTED_TASK_TYPES:
+            data['error'] = 'No valid task container'
             return render(request, 'error.html', {'data': data})
         
 
@@ -559,35 +559,20 @@ def create_task(request):
             #netifaces.ifaddresses('eth0')
             #backend_ip = netifaces.ifaddresses('eth0')[netifaces.AF_INET][0]['addr']       
 
-
-            # Get the IP address of the DNS service               
-            #inspect_json = json.loads(os_shell('sudo docker inspect metaboxonline-dns-one', capture=True).stdout)
-            #DNS_SERVICE_IP = inspect_json[0]['NetworkSettings']['IPAddress']    
-        
-            # The following does not work on WIndows
-            # Do not use .format as there are too many graph brackets    
-            #DNS_SERVICE_IP = os_shell('docker inspect --format \'{{ .NetworkSettings.IPAddress }}\' ' + PROJECT_NAME + '-' + service + '-' +instance, capture=True).stdout
-        
-            #if DNS_SERVICE_IP:
-            #    try:
-            #        socket.inet_aton(DNS_SERVICE_IP)
-            #    except socket.error:
-            #        raise Exception('Error, I could not find a valid IP address for the DNS service')
-
             # Init run command #--cap-add=NET_ADMIN --cap-add=NET_RAW 
             run_command  = 'sudo docker run  --network=rosetta_default --name rosetta-task-{}'.format( str_shortuuid)
 
             # Data volume
             run_command += ' -v {}/task-{}:/data'.format(TASK_DATA_DIR, str_shortuuid)
-            
-            # Ports TODO: remove or generate randomly
-            #run_command += ' -p 8590:8590 -p 5900:5900 -p 50381:22'
-            
+
             # Host name, image entry command
-            task_type = 'task-{}'.format(data['type'])
-            run_command += ' -h task-{} -d -t localhost:5000/rosetta/metadesktop'.format(str_shortuuid, task_type)
+            task_container = 'task-{}'.format(data['container'])
+            run_command += ' -h task-{} -d -t localhost:5000/rosetta/metadesktop'.format(str_shortuuid, task_container)
 
-            # Debug
+            # Create the model
+            task = Task.objects.create(user=request.user, name=data['name'], status=TaskStatuses.created, container=data['container'])
+                
+            # Run the task Debug
             logger.debug('Running new task with command="{}"'.format(run_command))
             out = os_shell(run_command, capture=True)
             if out.exit_code != 0:                        
@@ -595,24 +580,15 @@ def create_task(request):
             else:
                 logger.debug('Created task with id: "{}"'.format(out.stdout))
                 
-                # Create the model
-                task = Task.objects.create(user=request.user, name=data['name'], status='Created', type=data['type'])
-                
                 # Set fields
                 task.uuid   = str_uuid
                 task.tid    = out.stdout
-                task.status = 'Running'
                 task.compute = 'local'
+                task.status = TaskStatuses.running
                 
                 # Save
                 task.save()
 
-            # Create passwd file on Proxy
-            #out = os_shell('ssh -o StrictHostKeyChecking=no proxy "cd /shared/reyns/etc_apache2_sites_enabled/ && htpasswd -bc {}.htpasswd metauser {}"'.format(str_shortuuid, password), capture=True)
-            #if out.exit_code != 0:
-            #    logger.error(out.stderr) 
-
-
         except Exception as e:
             data['error'] = 'Error in creating new Task.'
             logger.error(e)
-- 
GitLab