From 3f30fd3a31610e6712cf0c29998f55936dadb663 Mon Sep 17 00:00:00 2001 From: jlaura Date: Thu, 28 Oct 2021 16:14:57 -0700 Subject: [PATCH] Adds parallel support for parallelization using ntasks (#611) --- autocnet/graph/asynchronous_funcs.py | 19 +++- autocnet/graph/cluster_submit.py | 100 +++++++++++--------- autocnet/graph/network.py | 26 ++++- autocnet/graph/tests/test_cluster_submit.py | 3 +- 4 files changed, 95 insertions(+), 53 deletions(-) diff --git a/autocnet/graph/asynchronous_funcs.py b/autocnet/graph/asynchronous_funcs.py index f7f8e05d..fe9fb13a 100644 --- a/autocnet/graph/asynchronous_funcs.py +++ b/autocnet/graph/asynchronous_funcs.py @@ -1,11 +1,12 @@ import json import time -from sqlalchemy import insert, update +from sqlalchemy import insert from sqlalchemy.sql.expression import bindparam from autocnet.io.db.model import Points, Measures from autocnet.utils.serializers import object_hook +from autocnet.transformation.spatial import reproject, og2oc def watch_insert_queue(queue, queue_name, counter_name, engine, stop_event, sleep_time=5): """ @@ -57,7 +58,8 @@ def watch_insert_queue(queue, queue_name, counter_name, engine, stop_event, slee measures = [] # Pull the SRID dynamically from the model (database) - srid = Points.rectangular_srid + rect_srid = Points.rectangular_srid + lat_srid = Points.latitudinal_srid for i in range(0, read_length): msg = json.loads(queue.lpop(queue_name), object_hook=object_hook) @@ -68,8 +70,17 @@ def watch_insert_queue(queue, queue_name, counter_name, engine, stop_event, slee # Since this avoids the ORM, need to map the table names manually msg['pointType'] = msg['pointtype'] - msg['adjusted'] = f'SRID={srid};' + msg["adjusted"].wkt # Geometries go in as EWKT - + adjusted = msg['adjusted'] + + msg['adjusted'] = f'SRID={rect_srid};' + adjusted.wkt # Geometries go in as EWKT + msg['apriori'] = f'SRID={rect_srid};' + adjusted.wkt + + lon_og, lat_og, _ = reproject([adjusted.x, adjusted.y, adjusted.z], + Points.semimajor_rad, Points.semiminor_rad, + 'geocent', 'latlon') + lon, lat = og2oc(lon_og, lat_og, Points.semimajor_rad, Points.semiminor_rad) + msg['geom'] = f'SRID={lat_srid};Point({lon} {lat})' + # Measures are removed and manually added later point_measures = msg.pop('measures', []) if point_measures: diff --git a/autocnet/graph/cluster_submit.py b/autocnet/graph/cluster_submit.py index a6f765e2..9024766b 100644 --- a/autocnet/graph/cluster_submit.py +++ b/autocnet/graph/cluster_submit.py @@ -20,11 +20,13 @@ from autocnet.utils.utils import import_func from autocnet.utils.serializers import JsonEncoder, object_hook from autocnet.io.db.model import JobsHistory - def parse_args(): # pragma: no cover parser = argparse.ArgumentParser() parser.add_argument('-r', '--host', help='The host URL for the redis queue to to pull messages from.') parser.add_argument('-p', '--port', help='The port for used by redis.') + parser.add_argument('-q', '--queue', default=False, action='store_true', + help='If passed, run in queue mode, where this job runs until either \ + walltime is hit or the queue that is being processed is empty.') parser.add_argument('processing_queue', help='The name of the processing queue to draw messages from.') parser.add_argument('working_queue', help='The name of the queue to push messages to while they process.') @@ -55,7 +57,6 @@ def _instantiate_row(msg, ncg): """ # Get the dict mapping iterable keyword types to the objects objdict = ncg.apply_iterable_options - rowid = msg['id'] obj = objdict[msg['along']] with ncg.session_scope() as session: res = session.query(obj).filter(getattr(obj, 'id')==msg['id']).one() @@ -167,51 +168,64 @@ def manage_messages(args, queue): A py-Redis queue object """ - # Pop the message from the left queue and push to the right queue; atomic operation - msg = transfer_message_to_work_queue(queue, - args['processing_queue'], - args['working_queue']) - - if msg is None: - warnings.warn('Expected to process a cluster job, but the message queue is empty.') - return - - # The key to remove from the working queue is the message. Essentially, find this element - # in the list where the element is the JSON representation of the message. Maybe swap to a hash? - remove_key = msg - - #Convert the message from binary into a dict - msgdict = json.loads(msg, object_hook=object_hook) - - # should replace this with some logging logic later - # rather than redirecting std out - stdout = StringIO() - with redirect_stdout(stdout): - # Apply the algorithm - response = process(msgdict) - # Should go to a logger someday! - print(response) - - out = stdout.getvalue() - # print to get everything on the logs in the directory - print(out) - - serializedDict = json.loads(msg) - results = msgdict['results'] if msgdict['results'] else [{"status" : "success"}] - success = True if "success" in results[0]["status"].split(" ")[0].lower() else False - - jh = JobsHistory(jobId=int(os.environ["SLURM_JOB_ID"]), functionName=msgdict["func"], args={"args" : serializedDict["args"], "kwargs": serializedDict["kwargs"]}, results=msgdict["results"], logs=out, success=success) + processing = True - with response['kwargs']['Session']() as session: - session.add(jh) - session.commit() - - finalize_message_from_work_queue(queue, args['working_queue'], remove_key) + while processing: + # Pop the message from the left queue and push to the right queue; atomic operation + msg = transfer_message_to_work_queue(queue, + args['processing_queue'], + args['working_queue']) + + if msg is None: + if args['queue'] == False: + warnings.warn('Expected to process a cluster job, but the message queue is empty.') + return + elif args['queue'] == True: + print(f'Completed processing from queue: {queue}.') + return + + # The key to remove from the working queue is the message. Essentially, find this element + # in the list where the element is the JSON representation of the message. Maybe swap to a hash? + remove_key = msg + + #Convert the message from binary into a dict + msgdict = json.loads(msg, object_hook=object_hook) + + # should replace this with some logging logic later + # rather than redirecting std out + stdout = StringIO() + with redirect_stdout(stdout): + # Apply the algorithm + response = process(msgdict) + # Should go to a logger someday! + print(response) + + out = stdout.getvalue() + # print to get everything on the logs in the directory + print(out) + + sys.stdout.flush() + stdout.flush() + + #serializedDict = json.loads(msg) + #results = msgdict['results'] if msgdict['results'] else [{"status" : "success"}] + #success = True if "success" in results[0]["status"].split(" ")[0].lower() else False + + #jh = JobsHistory(jobId=int(os.environ["SLURM_JOB_ID"]), functionName=msgdict["func"], args={"args" : serializedDict["args"], "kwargs": serializedDict["kwargs"]}, results=msgdict["results"], logs=out, success=success) + + #with response['kwargs']['Session']() as session: + #session.add(jh) + #session.commit() + + finalize_message_from_work_queue(queue, args['working_queue'], remove_key) + + # Process only a single job, else draw the next message off the queue if available. + if args['queue'] == False: + processing = False + def main(): # pragma: no cover args = vars(parse_args()) # Get the message queue = StrictRedis(host=args['host'], port=args['port'], db=0) manage_messages(args, queue) - - diff --git a/autocnet/graph/network.py b/autocnet/graph/network.py index 7847c35e..3e5a0d3c 100644 --- a/autocnet/graph/network.py +++ b/autocnet/graph/network.py @@ -1658,8 +1658,10 @@ class NetworkCandidateGraph(CandidateGraph): on='edge', args=(), walltime='01:00:00', + jobname='AutoCNet', chunksize=1000, arraychunk=25, + ntasks=1, filters={}, query_string='', reapply=False, @@ -1706,6 +1708,14 @@ class NetworkCandidateGraph(CandidateGraph): The number of concurrent jobs to run per job array. e.g. chunksize=100 and arraychunk=25 gives the job array 1-100%25 + ntasks : int + The number of tasks, distributed across the cluster on some set of nodes to be run. + When running apply with ntasks, set ntasks to some integer greater then 1. arraychunk and + chunksize arguments will then be ignored. In this mode, a number of non-communicating + CPUs equal to ntasks are allocated and these CPUs run jobs. Changing from arrays to ntasks + also likely requires increasing the walltime of the job significantly since less jobs + will need to run for a longer duration. + filters : dict Of simple filters to apply on database rows where the key is the attribute and the value used to check equivalency (e.g., attribute == value). @@ -1810,20 +1820,26 @@ class NetworkCandidateGraph(CandidateGraph): isissetup = f'export ISISROOT={isisroot} && export ISISDATA={isisdata}' condasetup = f'conda activate {condaenv}' job = f'acn_submit -r={rhost} -p={rport} {processing_queue} {self.working_queue}' - command = f'{condasetup} && {isissetup} && {job}' + if ntasks > 1: + job += ' --queue' # Use queue mode where jobs run until the queue is empty + command = f'{condasetup} && {isissetup} && srun {job}' if queue == None: queue = self.config['cluster']['queue'] submitter = Slurm(command, - job_name='AutoCNet', + job_name=jobname, mem_per_cpu=self.config['cluster']['processing_memory'], time=walltime, partition=queue, + ntasks=ntasks, output=log_dir+f'/autocnet.{function}-%j') - job_str = submitter.submit(array='1-{}%{}'.format(job_counter,arraychunk), - chunksize=chunksize, - exclude=exclude) + if ntasks > 1: + job_str = submitter.submit(exclude=exclude) + else: + job_str = submitter.submit(array='1-{}%{}'.format(job_counter,arraychunk), + chunksize=chunksize, + exclude=exclude) return job_str def generic_callback(self, msg): diff --git a/autocnet/graph/tests/test_cluster_submit.py b/autocnet/graph/tests/test_cluster_submit.py index 47cdfcf4..8c849855 100644 --- a/autocnet/graph/tests/test_cluster_submit.py +++ b/autocnet/graph/tests/test_cluster_submit.py @@ -16,7 +16,8 @@ from autocnet.io.db.model import Points, JobsHistory @pytest.fixture def args(): arg_dict = {'working_queue':'working', - 'processing_queue':'processing'} + 'processing_queue':'processing', + 'queue':False} return arg_dict @pytest.fixture -- GitLab