From c8aa3694093d8e1ef80b72febc6312725b8ab495 Mon Sep 17 00:00:00 2001 From: Stefano Alberto Russo <stefano.russo@gmail.com> Date: Wed, 29 Apr 2020 17:50:43 +0200 Subject: [PATCH] Added first agent stub. --- services/webapp/code/rosetta/base_app/api.py | 121 +++++++++++++++++- .../webapp/code/rosetta/base_app/models.py | 1 + .../webapp/code/rosetta/base_app/tasks.py | 72 ++++++++++- services/webapp/code/rosetta/urls.py | 5 +- 4 files changed, 193 insertions(+), 6 deletions(-) diff --git a/services/webapp/code/rosetta/base_app/api.py b/services/webapp/code/rosetta/base_app/api.py index 26590af..ad8f576 100644 --- a/services/webapp/code/rosetta/base_app/api.py +++ b/services/webapp/code/rosetta/base_app/api.py @@ -7,7 +7,7 @@ from rest_framework.response import Response from rest_framework import status, serializers, viewsets from rest_framework.views import APIView from .utils import format_exception -from .models import Profile +from .models import Profile, Task, TaskStatuses # Setup logging logger = logging.getLogger(__name__) @@ -205,6 +205,125 @@ class UserViewSet(viewsets.ModelViewSet): serializer_class = UserSerializer +class agent_api(PublicGETAPI): + + def _get(self, request): + try: + + task_uuid = request.GET.get('task_uuid', None) + if not task_uuid: + return HttpResponse('MISSING task_uuid') + + from django.core.exceptions import ValidationError + + try: + task = Task.objects.get(uuid=task_uuid) + except (Task.DoesNotExist, ValidationError): + return HttpResponse('Unknown task uuid "{}"'.format(task_uuid)) + + host_conn_string = 'http://172.21.0.1:8080' + + action = request.GET.get('action', None) + + if not action: + # Return the agent code + agent_code=''' +import logging +import socket +try: + from urllib.request import urlopen +except ImportError: + from urllib import urlopen + +# Setup logging +logger = logging.getLogger('Agent') +logging.basicConfig(level=logging.INFO) + +hostname = socket.gethostname() + +# Task id set by the API +task_uuid = "'''+ task_uuid +'''" + +# Log +logger.info('Reporting for task uuid: "{}"'.format(task_uuid)) + +# Get IP +ip = socket.gethostbyname(hostname) +logger.info(' - ip: "{}"'.format(ip)) + +# Get port +from random import randint +while True: + + # Get a random ephimeral port + port = randint(49152, 65535) + + # Check port is available + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + result = sock.connect_ex(('127.0.0.1', port)) + if result == 0: + print('Found not available ephimeral port ({}) , choosing another one...'.format(port)) + import time + time.sleep(1) + else: + break +logger.info(' - port: "{}"'.format(port)) + +response = urlopen("'''+host_conn_string+'''/api/v1/base/agent/?task_uuid={}&action=set_ip_port&ip={}&port={}".format(task_uuid, ip, port)) +response_content = response.read() +if response_content != 'OK': + logger.error(response_content) + logger.info('Not everything OK, exiting with status code =1') + sys.exit(1) +else: + logger.info('Everything OK') +print(port) +''' + + return HttpResponse(agent_code) + + + elif action=='set_ip_port': + + task_ip = request.GET.get('ip', None) + if not task_ip: + return HttpResponse('IP not valid (got "{}")'.format(task_ip)) + + task_port = request.GET.get('port', None) + if not task_port: + return HttpResponse('Port not valid (got "{}")'.format(task_port)) + + try: + int(task_port) + except (TypeError, ValueError): + return HttpResponse('Port not valid (got "{}")'.format(task_port)) + + # Set fields + task.status = TaskStatuses.running + task.ip = task_ip + #task.pid = task_pid + task.port = int(task_port) + task.save() + return HttpResponse('OK') + + + else: + return HttpResponse('Unknown action "{}"'.format(action)) + + + except Exception as e: + logger.error(e) + + + + + + + + + + + diff --git a/services/webapp/code/rosetta/base_app/models.py b/services/webapp/code/rosetta/base_app/models.py index 67ee3da..f32c7cf 100644 --- a/services/webapp/code/rosetta/base_app/models.py +++ b/services/webapp/code/rosetta/base_app/models.py @@ -25,6 +25,7 @@ logger = logging.getLogger(__name__) # Task statuses class TaskStatuses(object): created = 'created' + sumbitted = 'sumbitted' running = 'running' stopped = 'stopped' exited = 'exited' diff --git a/services/webapp/code/rosetta/base_app/tasks.py b/services/webapp/code/rosetta/base_app/tasks.py index cdf306e..73423c9 100644 --- a/services/webapp/code/rosetta/base_app/tasks.py +++ b/services/webapp/code/rosetta/base_app/tasks.py @@ -64,16 +64,80 @@ def start_task(task): # Save task.save() + + + elif task.computing.type == 'remote': logger.debug('Starting a remote task "{}"'.format(task.computing)) # Get computing host host = task.computing.get_conf_param('host') - # Get id_rsa - #id_rsa_file = task.computing.get_conf_param('id_rsa') - #if not id_rsa_file: - # raise Exception('This computing requires an id_rsa file but cannot find any') + # Get user keys + if task.computing.require_user_keys: + user_keys = Keys.objects.get(user=task.user, default=True) + else: + raise NotImplementedError('Remote tasks not requiring keys are not yet supported') + + # 1) Run the container on the host (non blocking) + + if task.container.type == 'singularity': + + + + # Set pass if any + if task.auth_pass: + authstring = ' export SINGULARITYENV_AUTH_PASS={} && '.format(task.auth_pass) + else: + authstring = '' + + import socket + hostname = socket.gethostname() + my_ip = socket.gethostbyname(hostname) + + run_command = 'ssh -i {} -4 -o StrictHostKeyChecking=no {} '.format(user_keys.private_key_file, host) + run_command+= '"wget {}:8080/api/v1/base/agent/?task_uuid={} -O /tmp/agent_{}.py && TASK_PORT=$(python /tmp/agent_{}.py) && '.format(my_ip, task.uuid, task.uuid, task.uuid) + run_command += 'export SINGULARITY_NOHTTPS=true && export SINGULARITYENV_TASK_PORT=$TASK_PORT && {} '.format(authstring) + run_command += 'exec nohup singularity run --pid --writable-tmpfs --containall --cleanenv ' + + # Set registry + if task.container.registry == 'docker_local': + registry = 'docker://dregistry:5000/' + elif task.container.registry == 'docker_hub': + registry = 'docker://' + else: + raise NotImplementedError('Registry {} not supported'.format(task.container.registry)) + + run_command+='{}{} &> /tmp/{}.log & echo \$!"'.format(registry, task.container.image, task.uuid) + logger.critical(run_command) + + else: + raise NotImplementedError('Container {} not supported'.format(task.container.type)) + + out = os_shell(run_command, capture=True) + if out.exit_code != 0: + raise Exception(out.stderr) + + logger.critical(out.stdout) + logger.critical(out.stderr) + + + # Save pid echoed by the command above + task_pid = out.stdout + + # Set fields + task.tid = task.uuid + #task.status = TaskStatuses.sumbitted + task.pid = task_pid + + # Save + task.save() + + elif task.computing.type == 'remoteOLD': + logger.debug('Starting a remote task "{}"'.format(task.computing)) + + # Get computing host + host = task.computing.get_conf_param('host') # Get user keys if task.computing.require_user_keys: diff --git a/services/webapp/code/rosetta/urls.py b/services/webapp/code/rosetta/urls.py index 500ab79..25b14bf 100644 --- a/services/webapp/code/rosetta/urls.py +++ b/services/webapp/code/rosetta/urls.py @@ -55,7 +55,10 @@ urlpatterns = [ # ViewSet APIs path('api/v1/base/login/', base_app_api.login_api.as_view(), name='login_api'), path('api/v1/base/logout/', base_app_api.logout_api.as_view(), name='logout_api'), - + + # Custom APIs + path('api/v1/base/agent/', base_app_api.agent_api.as_view(), name='agent_api'), + ] # This message here is quite useful when developing in autoreload mode -- GitLab