Skip to content
Snippets Groups Projects
Commit 82cb7119 authored by Stefano Alberto Russo's avatar Stefano Alberto Russo
Browse files

Fixes in running remote tasks.

parent 7b08187f
No related branches found
No related tags found
No related merge requests found
...@@ -221,7 +221,11 @@ class agent_api(PublicGETAPI): ...@@ -221,7 +221,11 @@ class agent_api(PublicGETAPI):
except (Task.DoesNotExist, ValidationError): except (Task.DoesNotExist, ValidationError):
return HttpResponse('Unknown task uuid "{}"'.format(task_uuid)) return HttpResponse('Unknown task uuid "{}"'.format(task_uuid))
host_conn_string = 'http://172.21.0.1:8080' import socket
hostname = socket.gethostname()
webapp_ip = socket.gethostbyname(hostname)
host_conn_string = 'http://{}:8080'.format(webapp_ip)
action = request.GET.get('action', None) action = request.GET.get('action', None)
...@@ -299,6 +303,7 @@ print(port) ...@@ -299,6 +303,7 @@ print(port)
return HttpResponse('Port not valid (got "{}")'.format(task_port)) return HttpResponse('Port not valid (got "{}")'.format(task_port))
# Set fields # Set fields
logger.info('Setting task "{}" to ip "{}" and port "{}"'.format(task.uuid, task_ip, task_port))
task.status = TaskStatuses.running task.status = TaskStatuses.running
task.ip = task_ip task.ip = task_ip
#task.pid = task_pid #task.pid = task_pid
......
from .models import TaskStatuses, Keys from .models import TaskStatuses, Keys, Task
from .utils import os_shell from .utils import os_shell
from .exceptions import ErrorMessage, ConsistencyException from .exceptions import ErrorMessage, ConsistencyException
...@@ -83,7 +83,8 @@ def start_task(task): ...@@ -83,7 +83,8 @@ def start_task(task):
if task.container.type == 'singularity': if task.container.type == 'singularity':
task.tid = task.uuid
task.save()
# Set pass if any # Set pass if any
if task.auth_pass: if task.auth_pass:
...@@ -93,13 +94,20 @@ def start_task(task): ...@@ -93,13 +94,20 @@ def start_task(task):
import socket import socket
hostname = socket.gethostname() hostname = socket.gethostname()
my_ip = socket.gethostbyname(hostname) webapp_ip = socket.gethostbyname(hostname)
run_command = 'ssh -i {} -4 -o StrictHostKeyChecking=no {} '.format(user_keys.private_key_file, host) run_command = 'ssh -i {} -4 -o StrictHostKeyChecking=no {} '.format(user_keys.private_key_file, host)
run_command+= '"wget {}:8080/api/v1/base/agent/?task_uuid={} -O /tmp/agent_{}.py && TASK_PORT=$(python /tmp/agent_{}.py) && '.format(my_ip, task.uuid, task.uuid, task.uuid) run_command+= '"wget {}:8080/api/v1/base/agent/?task_uuid={} -O /tmp/agent_{}.py &> /dev/null && export TASK_PORT=\$(python /tmp/agent_{}.py 2> /tmp/{}.log) && '.format(webapp_ip, task.uuid, task.uuid, task.uuid, task.uuid)
run_command += 'export SINGULARITY_NOHTTPS=true && export SINGULARITYENV_TASK_PORT=$TASK_PORT && {} '.format(authstring) run_command += 'export SINGULARITY_NOHTTPS=true && export SINGULARITYENV_TASK_PORT=\$TASK_PORT && {} '.format(authstring)
run_command += 'exec nohup singularity run --pid --writable-tmpfs --containall --cleanenv ' run_command += 'exec nohup singularity run --pid --writable-tmpfs --containall --cleanenv '
# ssh -i /rosetta/.ssh/id_rsa -4 -o StrictHostKeyChecking=no slurmclusterworker-one
# "wget 172.21.0.2:8080/api/v1/base/agent/?task_uuid=15a4320a-88b6-4ffc-8dd0-c80f9d18b292 -O /tmp/agent_15a4320a-88b6-4ffc-8dd0-c80f9d18b292.py &> /dev/null &&
# export TASK_PORT=\$(python /tmp/agent_15a4320a-88b6-4ffc-8dd0-c80f9d18b292.py) && export SINGULARITY_NOHTTPS=true && export SINGULARITYENV_TASK_PORT=\$TASK_PORT && export SINGULARITYENV_AUTH_PASS=testpass &&
# exec nohup singularity run --pid --writable-tmpfs --containall --cleanenv
# docker://dregistry:5000/rosetta/metadesktop &> /tmp/15a4320a-88b6-4ffc-8dd0-c80f9d18b292.log & echo $!"
# Set registry # Set registry
if task.container.registry == 'docker_local': if task.container.registry == 'docker_local':
registry = 'docker://dregistry:5000/' registry = 'docker://dregistry:5000/'
...@@ -108,8 +116,7 @@ def start_task(task): ...@@ -108,8 +116,7 @@ def start_task(task):
else: else:
raise NotImplementedError('Registry {} not supported'.format(task.container.registry)) raise NotImplementedError('Registry {} not supported'.format(task.container.registry))
run_command+='{}{} &> /tmp/{}.log & echo \$!"'.format(registry, task.container.image, task.uuid) run_command+='{}{} &>> /tmp/{}.log & echo \$!"'.format(registry, task.container.image, task.uuid)
logger.critical(run_command)
else: else:
raise NotImplementedError('Container {} not supported'.format(task.container.type)) raise NotImplementedError('Container {} not supported'.format(task.container.type))
...@@ -118,15 +125,18 @@ def start_task(task): ...@@ -118,15 +125,18 @@ def start_task(task):
if out.exit_code != 0: if out.exit_code != 0:
raise Exception(out.stderr) raise Exception(out.stderr)
logger.critical(out.stdout) # Log
logger.critical(out.stderr) logger.debug('Shell exec output: "{}"'.format(out))
# Load back the task to avoid concurrency problems in the agent call
task_uuid = task.uuid
task = Task.objects.get(uuid=task_uuid)
# Save pid echoed by the command above # Save pid echoed by the command above
task_pid = out.stdout task_pid = out.stdout
# Set fields # Set fields
task.tid = task.uuid
#task.status = TaskStatuses.sumbitted #task.status = TaskStatuses.sumbitted
task.pid = task_pid task.pid = task_pid
...@@ -198,6 +208,63 @@ def start_task(task): ...@@ -198,6 +208,63 @@ def start_task(task):
task.save() task.save()
#==============================
# Slurm
#==============================
elif task.computing.type == 'slurm':
logger.debug('Starting a remote task "{}"'.format(task.computing))
# Get computing host #Key Error ATM
host = 'slurmclustermaster-main' #task.computing.get_conf_param('host')
# Get user keys
if task.computing.require_user_keys:
user_keys = Keys.objects.get(user=task.user, default=True)
else:
raise NotImplementedError('Remote tasks not requiring keys are not yet supported')
# 1) Run the container on the host (non blocking)
if task.container.type == 'singularity':
# Set pass if any
if task.auth_pass:
authstring = ' export SINGULARITYENV_AUTH_PASS={} && '.format(task.auth_pass)
else:
authstring = ''
import socket
hostname = socket.gethostname()
my_ip = socket.gethostbyname(hostname)
run_command = 'ssh -i {} -4 -o StrictHostKeyChecking=no {} '.format(user_keys.private_key_file, host)
run_command+= '"echo \"wget {}:8080/api/v1/base/agent/?task_uuid={} -O /tmp/agent_{}.py && TASK_PORT=$(python /tmp/agent_{}.py) && '.format(my_ip, task.uuid, task.uuid, task.uuid)
run_command += 'export SINGULARITY_NOHTTPS=true && export SINGULARITYENV_TASK_PORT=$TASK_PORT && {} '.format(authstring)
run_command += 'exec nohup singularity run --pid --writable-tmpfs --containall --cleanenv '
# Set registry
if task.container.registry == 'docker_local':
registry = 'docker://dregistry:5000/'
elif task.container.registry == 'docker_hub':
registry = 'docker://'
else:
raise NotImplementedError('Registry {} not supported'.format(task.container.registry))
run_command+='{}{} &> /tmp/{}.log & echo \$!" > '.format(registry, task.container.image, task.uuid)
else:
raise NotImplementedError('Container {} not supported'.format(task.container.type))
out = os_shell(run_command, capture=True)
if out.exit_code != 0:
raise Exception(out.stderr)
else: else:
raise Exception('Consistency exception: invalid computing resource "{}'.format(task.computing)) raise Exception('Consistency exception: invalid computing resource "{}'.format(task.computing))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment