From 82cb71191fce2bf7b4d4dbdc37d37ac3f683715f Mon Sep 17 00:00:00 2001
From: Stefano Alberto Russo <stefano.russo@gmail.com>
Date: Wed, 29 Apr 2020 23:15:21 +0200
Subject: [PATCH] Fixes in running remote tasks.

---
 services/webapp/code/rosetta/base_app/api.py  |  9 +-
 .../webapp/code/rosetta/base_app/tasks.py     | 89 ++++++++++++++++---
 2 files changed, 85 insertions(+), 13 deletions(-)

diff --git a/services/webapp/code/rosetta/base_app/api.py b/services/webapp/code/rosetta/base_app/api.py
index ad8f576..bcc9f32 100644
--- a/services/webapp/code/rosetta/base_app/api.py
+++ b/services/webapp/code/rosetta/base_app/api.py
@@ -220,8 +220,12 @@ class agent_api(PublicGETAPI):
                 task = Task.objects.get(uuid=task_uuid)
             except (Task.DoesNotExist, ValidationError):
                 return HttpResponse('Unknown task uuid "{}"'.format(task_uuid))
-                
-            host_conn_string = 'http://172.21.0.1:8080'
+
+            import socket
+            hostname = socket.gethostname()
+            webapp_ip = socket.gethostbyname(hostname)
+
+            host_conn_string = 'http://{}:8080'.format(webapp_ip)
             
             action = request.GET.get('action', None)
             
@@ -299,6 +303,7 @@ print(port)
                     return HttpResponse('Port not valid (got "{}")'.format(task_port))
                   
                 # Set fields
+                logger.info('Setting task "{}" to ip "{}" and port "{}"'.format(task.uuid, task_ip, task_port))
                 task.status = TaskStatuses.running
                 task.ip     = task_ip
                 #task.pid    = task_pid
diff --git a/services/webapp/code/rosetta/base_app/tasks.py b/services/webapp/code/rosetta/base_app/tasks.py
index 73423c9..8cb820a 100644
--- a/services/webapp/code/rosetta/base_app/tasks.py
+++ b/services/webapp/code/rosetta/base_app/tasks.py
@@ -1,4 +1,4 @@
-from .models import TaskStatuses, Keys
+from .models import TaskStatuses, Keys, Task
 from .utils import os_shell
 from .exceptions import ErrorMessage, ConsistencyException
 
@@ -83,7 +83,8 @@ def start_task(task):
  
         if task.container.type == 'singularity':
 
-            
+            task.tid    = task.uuid
+            task.save()
 
             # Set pass if any
             if task.auth_pass:
@@ -93,13 +94,20 @@ def start_task(task):
 
             import socket
             hostname = socket.gethostname()
-            my_ip = socket.gethostbyname(hostname)
+            webapp_ip = socket.gethostbyname(hostname)
 
             run_command  = 'ssh -i {} -4 -o StrictHostKeyChecking=no {} '.format(user_keys.private_key_file, host)
-            run_command+= '"wget {}:8080/api/v1/base/agent/?task_uuid={} -O /tmp/agent_{}.py && TASK_PORT=$(python /tmp/agent_{}.py) && '.format(my_ip, task.uuid, task.uuid, task.uuid)
-            run_command += 'export SINGULARITY_NOHTTPS=true && export SINGULARITYENV_TASK_PORT=$TASK_PORT && {} '.format(authstring)
+            run_command+= '"wget {}:8080/api/v1/base/agent/?task_uuid={} -O /tmp/agent_{}.py &> /dev/null && export TASK_PORT=\$(python /tmp/agent_{}.py 2> /tmp/{}.log) && '.format(webapp_ip, task.uuid, task.uuid, task.uuid, task.uuid)
+            run_command += 'export SINGULARITY_NOHTTPS=true && export SINGULARITYENV_TASK_PORT=\$TASK_PORT && {} '.format(authstring)
             run_command += 'exec nohup singularity run --pid --writable-tmpfs --containall --cleanenv '
             
+            # ssh -i /rosetta/.ssh/id_rsa -4 -o StrictHostKeyChecking=no slurmclusterworker-one
+            # "wget 172.21.0.2:8080/api/v1/base/agent/?task_uuid=15a4320a-88b6-4ffc-8dd0-c80f9d18b292 -O /tmp/agent_15a4320a-88b6-4ffc-8dd0-c80f9d18b292.py &> /dev/null &&
+            # export TASK_PORT=\$(python /tmp/agent_15a4320a-88b6-4ffc-8dd0-c80f9d18b292.py) && export SINGULARITY_NOHTTPS=true && export SINGULARITYENV_TASK_PORT=\$TASK_PORT &&  export SINGULARITYENV_AUTH_PASS=testpass &&  
+            # exec nohup singularity run --pid --writable-tmpfs --containall --cleanenv
+            # docker://dregistry:5000/rosetta/metadesktop &> /tmp/15a4320a-88b6-4ffc-8dd0-c80f9d18b292.log & echo $!"
+            
+            
             # Set registry
             if task.container.registry == 'docker_local':
                 registry = 'docker://dregistry:5000/'
@@ -108,8 +116,7 @@ def start_task(task):
             else:
                 raise NotImplementedError('Registry {} not supported'.format(task.container.registry))
     
-            run_command+='{}{} &> /tmp/{}.log & echo \$!"'.format(registry, task.container.image, task.uuid)
-            logger.critical(run_command)
+            run_command+='{}{} &>> /tmp/{}.log & echo \$!"'.format(registry, task.container.image, task.uuid)
             
         else:
             raise NotImplementedError('Container {} not supported'.format(task.container.type))
@@ -118,15 +125,18 @@ def start_task(task):
         if out.exit_code != 0:
             raise Exception(out.stderr)
         
-        logger.critical(out.stdout)
-        logger.critical(out.stderr)
+        # Log        
+        logger.debug('Shell exec output: "{}"'.format(out))
+
+        # Load back the task to avoid  concurrency problems in the agent call
+        task_uuid = task.uuid
+        task = Task.objects.get(uuid=task_uuid)
 
- 
         # Save pid echoed by the command above
         task_pid = out.stdout
 
         # Set fields
-        task.tid    = task.uuid
+
         #task.status = TaskStatuses.sumbitted
         task.pid    = task_pid
  
@@ -198,6 +208,63 @@ def start_task(task):
         task.save()
 
 
+    #==============================
+    #  Slurm
+    #==============================
+    elif task.computing.type == 'slurm':
+        
+        logger.debug('Starting a remote task "{}"'.format(task.computing))
+
+        # Get computing host #Key Error ATM
+        host = 'slurmclustermaster-main' #task.computing.get_conf_param('host')
+
+        # Get user keys
+        if task.computing.require_user_keys:
+            user_keys = Keys.objects.get(user=task.user, default=True)
+        else:
+            raise NotImplementedError('Remote tasks not requiring keys are not yet supported')
+
+        # 1) Run the container on the host (non blocking)
+ 
+        if task.container.type == 'singularity':
+
+            
+
+            # Set pass if any
+            if task.auth_pass:
+                authstring = ' export SINGULARITYENV_AUTH_PASS={} && '.format(task.auth_pass)
+            else:
+                authstring = ''
+
+            import socket
+            hostname = socket.gethostname()
+            my_ip = socket.gethostbyname(hostname)
+
+            run_command  = 'ssh -i {} -4 -o StrictHostKeyChecking=no {} '.format(user_keys.private_key_file, host)
+            run_command+= '"echo \"wget {}:8080/api/v1/base/agent/?task_uuid={} -O /tmp/agent_{}.py && TASK_PORT=$(python /tmp/agent_{}.py) && '.format(my_ip, task.uuid, task.uuid, task.uuid)
+            run_command += 'export SINGULARITY_NOHTTPS=true && export SINGULARITYENV_TASK_PORT=$TASK_PORT && {} '.format(authstring)
+            run_command += 'exec nohup singularity run --pid --writable-tmpfs --containall --cleanenv '
+            
+            # Set registry
+            if task.container.registry == 'docker_local':
+                registry = 'docker://dregistry:5000/'
+            elif task.container.registry == 'docker_hub':
+                registry = 'docker://'
+            else:
+                raise NotImplementedError('Registry {} not supported'.format(task.container.registry))
+    
+            run_command+='{}{} &> /tmp/{}.log & echo \$!" > '.format(registry, task.container.image, task.uuid)
+
+            
+        else:
+            raise NotImplementedError('Container {} not supported'.format(task.container.type))
+
+        out = os_shell(run_command, capture=True)
+        if out.exit_code != 0:
+            raise Exception(out.stderr)
+
+
+
     else:
         raise Exception('Consistency exception: invalid computing resource "{}'.format(task.computing))
 
-- 
GitLab