diff --git a/autocnet/graph/cluster_submit.py b/autocnet/graph/cluster_submit.py index 14059e034d8347db96448773471177f8e441c538..a6f765e26186f8ce3113edd707ab6819b3b6882c 100644 --- a/autocnet/graph/cluster_submit.py +++ b/autocnet/graph/cluster_submit.py @@ -7,6 +7,9 @@ import json import sys import warnings +from io import StringIO +from contextlib import redirect_stdout + from redis import StrictRedis from autocnet.graph.network import NetworkCandidateGraph @@ -15,6 +18,7 @@ from autocnet.graph.edge import NetworkEdge from autocnet.io.db.model import Points, Measures, Overlay from autocnet.utils.utils import import_func from autocnet.utils.serializers import JsonEncoder, object_hook +from autocnet.io.db.model import JobsHistory def parse_args(): # pragma: no cover @@ -167,7 +171,7 @@ def manage_messages(args, queue): msg = transfer_message_to_work_queue(queue, args['processing_queue'], args['working_queue']) - + if msg is None: warnings.warn('Expected to process a cluster job, but the message queue is empty.') return @@ -175,14 +179,32 @@ def manage_messages(args, queue): # The key to remove from the working queue is the message. Essentially, find this element # in the list where the element is the JSON representation of the message. Maybe swap to a hash? remove_key = msg - + #Convert the message from binary into a dict - msg = json.loads(msg, object_hook=object_hook) - - # Apply the algorithm - response = process(msg) - # Should go to a logger someday! - print(response) + msgdict = json.loads(msg, object_hook=object_hook) + + # should replace this with some logging logic later + # rather than redirecting std out + stdout = StringIO() + with redirect_stdout(stdout): + # Apply the algorithm + response = process(msgdict) + # Should go to a logger someday! + print(response) + + out = stdout.getvalue() + # print to get everything on the logs in the directory + print(out) + + serializedDict = json.loads(msg) + results = msgdict['results'] if msgdict['results'] else [{"status" : "success"}] + success = True if "success" in results[0]["status"].split(" ")[0].lower() else False + + jh = JobsHistory(jobId=int(os.environ["SLURM_JOB_ID"]), functionName=msgdict["func"], args={"args" : serializedDict["args"], "kwargs": serializedDict["kwargs"]}, results=msgdict["results"], logs=out, success=success) + + with response['kwargs']['Session']() as session: + session.add(jh) + session.commit() finalize_message_from_work_queue(queue, args['working_queue'], remove_key) @@ -191,5 +213,5 @@ def main(): # pragma: no cover # Get the message queue = StrictRedis(host=args['host'], port=args['port'], db=0) manage_messages(args, queue) - + diff --git a/autocnet/graph/network.py b/autocnet/graph/network.py index 2398ed9ae112a1d5f12aff4e1c56d0e8683dee7f..7847c35ea520ae9ab09835314b3707e20ca2bfd3 100644 --- a/autocnet/graph/network.py +++ b/autocnet/graph/network.py @@ -2117,6 +2117,7 @@ class NetworkCandidateGraph(CandidateGraph): sourceimages = sourcesession.execute(query_string).fetchall() # Change for SQLAlchemy >= 1.4, results are now row objects + sourceimages = [sourceimage._asdict() for sourceimage in sourceimages] with self.session_scope() as destinationsession: destinationsession.execute(Images.__table__.insert(), sourceimages) diff --git a/autocnet/graph/tests/test_cluster_submit.py b/autocnet/graph/tests/test_cluster_submit.py index bf36c0e0e7b081c727eaff2ab1ea80e87ebdf52b..47cdfcf48cffebcf81d6116ff773686f5bcbdc04 100644 --- a/autocnet/graph/tests/test_cluster_submit.py +++ b/autocnet/graph/tests/test_cluster_submit.py @@ -1,4 +1,6 @@ import json +import os +from unittest import mock from unittest.mock import patch import numpy as np @@ -8,7 +10,7 @@ from autocnet.utils.serializers import JsonEncoder, object_hook from autocnet.graph import cluster_submit from autocnet.graph.node import NetworkNode from autocnet.graph.edge import NetworkEdge -from autocnet.io.db.model import Points +from autocnet.io.db.model import Points, JobsHistory @pytest.fixture @@ -20,45 +22,73 @@ def args(): @pytest.fixture def simple_message(): return json.dumps({"job":"do some work", - "success":False}, cls=JsonEncoder + 'args' : ["arg1", "arg2"], + 'kwargs' : {"k1" : "foo", "k2" : "bar"}, + 'func':'autocnet.place_points', + 'results' :[{"status" : 'success'}] }, cls=JsonEncoder ) @pytest.fixture def complex_message(): return json.dumps({'job':'do some complex work', 'arr':np.ones(5), - 'func':lambda x:x}, cls=JsonEncoder) + 'results' :[{"status" : 'success'}], + 'args' : ["arg1", "arg2"], + 'kwargs' : {"k1" : "foo", "k2" : "bar"}, + 'func':'autocnet.place_points'}, cls=JsonEncoder) -def test_manage_simple_messages(args, queue, simple_message, mocker, capfd): +def test_manage_simple_messages(args, queue, simple_message, mocker, capfd, ncg): queue.rpush(args['processing_queue'], simple_message) - response_msg = {'success':True, 'results':'Things were good.'} + response_msg = {'success':True, 'results':'Things were good.', 'kwargs' : {'Session' : ncg.Session}} mocker.patch('autocnet.graph.cluster_submit.process', return_value=response_msg) - + mocker.patch.dict(os.environ, {"SLURM_JOB_ID": "1000"}) + cluster_submit.manage_messages(args, queue) # Check that logging to stdout is working out, err = capfd.readouterr() - assert out == str(response_msg) + '\n' + assert out.strip() == str(response_msg).strip() # Check that the messages are finalizing assert queue.llen(args['working_queue']) == 0 -def test_manage_complex_messages(args, queue, complex_message, mocker, capfd): +def test_manage_complex_messages(args, queue, complex_message, mocker, capfd, ncg): queue.rpush(args['processing_queue'], complex_message) - response_msg = {'success':True, 'results':'Things were good.'} + response_msg = {'success':True, 'results':'Things were good.', 'kwargs' : {'Session' : ncg.Session}} mocker.patch('autocnet.graph.cluster_submit.process', return_value=response_msg) - + mocker.patch.dict(os.environ, {"SLURM_JOB_ID": "1000"}) + cluster_submit.manage_messages(args, queue) # Check that logging to stdout is working out, err = capfd.readouterr() - assert out == str(response_msg) + '\n' + assert out.strip() == str(response_msg).strip() # Check that the messages are finalizing assert queue.llen(args['working_queue']) == 0 + +def test_job_history(args, queue, complex_message, mocker, capfd, ncg): + queue.rpush(args['processing_queue'], complex_message) + + response_msg = {'success':True, + 'args' : ["arg1", "arg2"], + 'kwargs' : {"k1" : "foo", "k2" : "bar", "Session" : ncg.Session}} + mocker.patch('autocnet.graph.cluster_submit.process', return_value=response_msg) + mocker.patch.dict(os.environ, {"SLURM_JOB_ID": "1000"}) + + cluster_submit.manage_messages(args, queue) + + message_json = json.loads(complex_message) + with ncg.Session() as session: + resp = session.query(JobsHistory).first() + assert resp.functionName == "autocnet.place_points" + assert resp.jobId == 1000 + assert resp.args == {"args" : message_json["args"], "kwargs" : message_json["kwargs"]} + assert resp.logs.strip() == str(response_msg).strip() + def test_transfer_message_to_work_queue(args, queue, simple_message): queue.rpush(args['processing_queue'], simple_message) cluster_submit.transfer_message_to_work_queue(queue, args['processing_queue'], args['working_queue']) diff --git a/autocnet/io/db/model.py b/autocnet/io/db/model.py index 82c7076a2a699bea89e75d8ce9c4b4ebfd733b5c..f3c10dc098126597099f9f481d5ca7641d15c325 100644 --- a/autocnet/io/db/model.py +++ b/autocnet/io/db/model.py @@ -5,7 +5,8 @@ import sqlalchemy from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import (Column, String, Integer, Float, \ ForeignKey, Boolean, LargeBinary, \ - UniqueConstraint, event) + UniqueConstraint, event, DateTime + ) from sqlalchemy.dialects.postgresql import ARRAY, JSONB from sqlalchemy.orm import relationship, backref from sqlalchemy_utils import database_exists, create_database @@ -609,8 +610,42 @@ class Measures(BaseMixin, Base): v = MeasureType(v) self._measuretype = v + +class JobsHistory(BaseMixin, Base): + __tablename__ = 'jobs_history' + id = Column(Integer, primary_key=True, autoincrement=True) + jobId = Column("jobId", Integer) + functionName = Column("functionName", String) + args = Column(JSONB) + results = Column(JSONB) + logs = Column(String) + success = Column(Boolean, default=False) + + +class MeasuresHistory(BaseMixin, Base): + __tablename__ = 'measures_history' + id = Column(Integer, primary_key=True, autoincrement=True) + fk = Column(Integer) + eventTime = Column(DateTime) + executedBy = Column(String) + event = Column(String) + before = Column(JSONB) + after = Column(JSONB) + + +class PointsHistory(BaseMixin, Base): + __tablename__ = 'points_history' + id = Column(Integer, primary_key=True, autoincrement=True) + fk = Column(Integer) + eventTime = Column(DateTime) + executedBy = Column(String) + event = Column(String) + before = Column(JSONB) + after = Column(JSONB) + + def try_db_creation(engine, config): - from autocnet.io.db.triggers import valid_point_function, valid_point_trigger, valid_geom_function, valid_geom_trigger, ignore_image_function, ignore_image_trigger + from autocnet.io.db import triggers # Create the database if not database_exists(engine.url): @@ -619,12 +654,19 @@ def try_db_creation(engine, config): # Trigger that watches for points that should be active/inactive # based on the point count. if not sqlalchemy.inspect(engine).has_table("points"): - event.listen(Base.metadata, 'before_create', valid_point_function) - event.listen(Measures.__table__, 'after_create', valid_point_trigger) - event.listen(Base.metadata, 'before_create', valid_geom_function) - event.listen(Images.__table__, 'after_create', valid_geom_trigger) - event.listen(Base.metadata, 'before_create', ignore_image_function) - event.listen(Images.__table__, 'after_create', ignore_image_trigger) + event.listen(Base.metadata, 'before_create', triggers.valid_point_function) + event.listen(Measures.__table__, 'after_create', triggers.valid_point_trigger) + event.listen(Base.metadata, 'before_create', triggers.valid_geom_function) + event.listen(Images.__table__, 'after_create', triggers.valid_geom_trigger) + event.listen(Base.metadata, 'before_create', triggers.ignore_image_function) + event.listen(Images.__table__, 'after_create', triggers.ignore_image_trigger) + event.listen(Points.__table__, 'before_create', triggers.jsonb_delete_func) + + for ddl in triggers.generate_history_triggers(Measures): + event.listen(Measures.__table__, 'after_create', ddl) + + for ddl in triggers.generate_history_triggers(Points): + event.listen(Points.__table__, 'after_create', ddl) Base.metadata.bind = engine @@ -645,4 +687,8 @@ def try_db_creation(engine, config): Edges.__table__, Costs.__table__, Matches.__table__, Cameras.__table__, Points.__table__, Measures.__table__, Images.__table__, - Keypoints.__table__, CandidateGroundPoints.__table__]) + Keypoints.__table__, CandidateGroundPoints.__table__, + JobsHistory.__table__, MeasuresHistory.__table__, PointsHistory.__table__]) + + + diff --git a/autocnet/io/db/tests/test_model.py b/autocnet/io/db/tests/test_model.py index 701393bfc488d6dfdb0dd798b122dba6e5943961..96ae81c4fee9b6b34283ada8f0cc7e95c1c206ce 100644 --- a/autocnet/io/db/tests/test_model.py +++ b/autocnet/io/db/tests/test_model.py @@ -34,6 +34,16 @@ def test_cameras_exists(tables): def test_measures_exists(tables): assert model.Measures.__tablename__ in tables +def test_points_history_exists(tables): + assert model.PointsHistory.__tablename__ in tables + +def test_measures_history_exists(tables): + assert model.MeasuresHistory.__tablename__ in tables + +def test_job_history_exists(tables): + assert model.JobsHistory.__tablename__ in tables + + def test_create_camera_without_image(session): with pytest.raises(sqlalchemy.exc.IntegrityError): model.Cameras.create(session, **{'image_id':1}) @@ -128,8 +138,78 @@ def test_update_point_geom(session, data, new_adjusted, expected): resp = session.query(model.Points).filter(model.Points.id == p.id).first() assert resp.geom == expected -def test_measures_exists(tables): - assert model.Measures.__tablename__ in tables +def test_point_trigger(session): + original = 3 + new_type = 2 + + data = {'pointtype':original, 'adjusted' : Point(1,10000,1)} + + with session as s: + p = model.Points.create(s, **data) + + p.pointtype = new_type + s.commit() + s.delete(p) + s.commit() + + resp = session.query(model.PointsHistory).filter(model.PointsHistory.fk == p.id) + + assert resp[0].event == "insert" + assert resp[0].before == None + assert resp[0].after["pointType"] == original + + assert resp[1].event == "update" + assert resp[1].before['pointType'] == original + assert resp[1].after["pointType"] == new_type + + assert resp[2].event == "delete" + assert resp[2].before['pointType'] == new_type + assert resp[2].after == None + s.close() + +@pytest.mark.xfail(reason="Unknown issue on GitHub actions, passes locally") +def test_measure_trigger(session): + original = 3 + new_type = 2 + + with session as s: + point_data = {'pointtype':original, 'adjusted' : Point(1,10000,1)} + p = model.Points.create(s, **point_data) + + measure_data = {'id' : 100 ,'sample': original, 'line': 10, 'pointid': p.id, 'serial': 'measure', '_measuretype' : 2} + m = model.Measures(**measure_data) + s.add(m) + s.commit() + + measures = s.query(model.Measures).all() + points = s.query(model.Points).all() + print(measures, len(measures)) + print(points, len(points)) + + m.sample = new_type + s.commit() + s.delete(p) + s.delete(m) + s.commit() + + measures = session.query(model.Measures).all() + points = session.query(model.Points).all() + print(measures, len(measures)) + print(points, len(points)) + + resp = session.query(model.MeasuresHistory).filter(model.MeasuresHistory.fk == m.id).all() + print(resp, len(resp)) + assert resp[0].event == "insert" + assert resp[0].before == None + assert resp[0].after["sample"] == original + + assert resp[1].event == "update" + assert resp[1].before['sample'] == original + assert resp[1].after["sample"] == new_type + + assert resp[2].event == "delete" + assert resp[2].before['sample'] == new_type + assert resp[2].after == None def test_null_footprint(session): i = model.Images.create(session, geom=None, diff --git a/autocnet/io/db/triggers.py b/autocnet/io/db/triggers.py index 1b314d7986b2c5ad86a6e1c590cf92bf6287cc66..0711506825b3e7eb21d90ed90805e7a39089177e 100644 --- a/autocnet/io/db/triggers.py +++ b/autocnet/io/db/triggers.py @@ -86,3 +86,126 @@ CREATE TRIGGER image_ignored FOR EACH ROW EXECUTE PROCEDURE ignore_image(); """) + + +# several funcs and an operator needed to get json diff working. +jsonb_delete_func = DDL(""" +SET search_path = 'public'; + +CREATE OR REPLACE FUNCTION jsonb_delete_left(a jsonb, b text) +RETURNS jsonb AS +$BODY$ + SELECT COALESCE( + ( + SELECT ('{' || string_agg(to_json(key) || ':' || value, ',') || '}') + FROM jsonb_each(a) + WHERE key <> b + ) + , '{}')::jsonb; +$BODY$ +LANGUAGE sql IMMUTABLE STRICT; +COMMENT ON FUNCTION jsonb_delete_left(jsonb, text) IS 'delete key in second argument from first argument'; + +CREATE OPERATOR - ( PROCEDURE = jsonb_delete_left, LEFTARG = jsonb, RIGHTARG = text); +COMMENT ON OPERATOR - (jsonb, text) IS 'delete key from left operand'; + +-- + +CREATE OR REPLACE FUNCTION jsonb_delete_left(a jsonb, b text[]) +RETURNS jsonb AS +$BODY$ + SELECT COALESCE( + ( + SELECT ('{' || string_agg(to_json(key) || ':' || value, ',') || '}') + FROM jsonb_each(a) + WHERE key <> ALL(b) + ) + , '{}')::jsonb; +$BODY$ +LANGUAGE sql IMMUTABLE STRICT; +COMMENT ON FUNCTION jsonb_delete_left(jsonb, text[]) IS 'delete keys in second argument from first argument'; + +CREATE OPERATOR - ( PROCEDURE = jsonb_delete_left, LEFTARG = jsonb, RIGHTARG = text[]); +COMMENT ON OPERATOR - (jsonb, text[]) IS 'delete keys from left operand'; + +-- + +CREATE OR REPLACE FUNCTION jsonb_delete_left(a jsonb, b jsonb) +RETURNS jsonb AS +$BODY$ + SELECT COALESCE( + ( + SELECT ('{' || string_agg(to_json(key) || ':' || value, ',') || '}') + FROM jsonb_each(a) + WHERE NOT ('{' || to_json(key) || ':' || value || '}')::jsonb <@ b + ) + , '{}')::jsonb; +$BODY$ +LANGUAGE sql IMMUTABLE STRICT; +COMMENT ON FUNCTION jsonb_delete_left(jsonb, jsonb) IS 'delete matching pairs in second argument from first argument'; + +CREATE OPERATOR - ( PROCEDURE = jsonb_delete_left, LEFTARG = jsonb, RIGHTARG = jsonb); +COMMENT ON OPERATOR - (jsonb, jsonb) IS 'delete matching pairs from left operand'; +""") + + +def generate_history_triggers(table): + tablename = table.__tablename__ + + history_update_function = DDL(f""" + CREATE OR REPLACE FUNCTION {tablename}_history_update() + RETURNS TRIGGER AS $$ + DECLARE + js_new jsonb := row_to_json(NEW)::jsonb; + js_old jsonb := row_to_json(OLD)::jsonb; + BEGIN + INSERT INTO {tablename}_history(fk, "eventTime", "executedBy", event, before, after) + VALUES((js_old->>'id')::int, CURRENT_TIMESTAMP, SESSION_USER, 'update', js_old - js_new, js_new - js_old); + RETURN NEW; + END; + + $$ LANGUAGE plpgsql; + """) + + history_insert_function = DDL(f""" + CREATE OR REPLACE FUNCTION {tablename}_history_insert() + RETURNS TRIGGER AS $$ + DECLARE + js_new jsonb := row_to_json(NEW)::jsonb; + BEGIN + INSERT INTO {tablename}_history(fk, "eventTime", "executedBy", event, after) + VALUES((js_new->>'id')::int, CURRENT_TIMESTAMP, SESSION_USER, 'insert', js_new); + RETURN NEW; + END; + $$ LANGUAGE plpgsql; + """) + + history_delete_function = DDL(f""" + CREATE OR REPLACE FUNCTION {tablename}_history_delete() + RETURNS TRIGGER AS $$ + DECLARE + js_old jsonb := row_to_json(OLD)::jsonb; + BEGIN + INSERT INTO {tablename}_history(fk, "eventTime", "executedBy", event, before) + VALUES((js_old->>'id')::int, CURRENT_TIMESTAMP, SESSION_USER, 'delete', js_old); + RETURN NEW; + END; + $$ LANGUAGE plpgsql; + """) + + history_insert_trigger = DDL(f""" + CREATE TRIGGER {tablename}_history_insert AFTER INSERT ON {tablename} + FOR EACH ROW EXECUTE PROCEDURE {tablename}_history_insert(); + """) + + history_delete_trigger = DDL(f""" + CREATE TRIGGER {tablename}_history_delete AFTER DELETE ON {tablename} + FOR EACH ROW EXECUTE PROCEDURE {tablename}_history_delete(); + """) + + history_update_trigger = DDL(f""" + CREATE TRIGGER {tablename}_history_update AFTER UPDATE ON {tablename} + FOR EACH ROW EXECUTE PROCEDURE {tablename}_history_update(); + """) + + return history_update_function, history_insert_function, history_delete_function, history_insert_trigger, history_delete_trigger, history_update_trigger \ No newline at end of file