diff --git a/ale/util.py b/ale/util.py new file mode 100644 index 0000000000000000000000000000000000000000..6460aab1fac41393d2d7136d4896fd9ab362b8e5 --- /dev/null +++ b/ale/util.py @@ -0,0 +1,874 @@ +import os +from os import path + +from glob import glob +from itertools import filterfalse, groupby +import warnings + +import pvl + +import collections +from collections import OrderedDict +try: + from collections.abc import Mapping +except ImportError: + from collections import Mapping +from itertools import chain +from datetime import datetime +import pytz + +import subprocess +import re +import networkx as nx +from networkx.algorithms.shortest_paths.generic import shortest_path + +import spiceypy as spice + +from ale import spice_root + +def get_metakernels(spice_dir=spice_root, missions=set(), years=set(), versions=set()): + """ + Given a root directory, get any subdirectory containing metakernels, + assume spice directory structure. + + Mostly doing filtering here, might be worth using Pandas? + + Parameters + ---------- + spice_dir : str + Path containing Spice directories downloaded from NAIF's website + + missions : set, str + Mission or set of missions to search for + + years : set, str, int + year or set of years to search for + + versions : set, str + version or set of versions to search for + """ + if not missions or missions == "all": + missions = set() + if not years or years == "all": + years = set() + if not versions or versions == "all": + versions = set() + + if isinstance(missions, str): + missions = {missions} + + if isinstance(years, str) or isinstance(years, int): + years = {str(years)} + else: + years = {str(year) for year in years} + + avail = { + 'count': 0, + 'data': [] + } + + missions = [m.lower() for m in missions] + if spice_dir is not None: + mission_dirs = list(filter(path.isdir, glob(path.join(spice_dir, '*')))) + else: + warnings.warn("Unable to search mission directories without" + + "ALESPICEROOT being set. Defaulting to empty list") + mission_dirs = [] + + for md in mission_dirs: + # Assuming spice root has the same name as the original on NAIF website" + mission = os.path.basename(md).split('-')[0].split('_')[0] + if missions and all([m not in mission.lower() for m in missions]): + continue + + metakernel_keys = ['mission', 'year', 'version', 'path'] + + # recursive glob to make metakernel search more robust to subtle directory structure differences + metakernel_paths = sorted(glob(os.path.join(md, '**','*.[Tt][Mm]'), recursive=True)) + + metakernels = [] + for k in metakernel_paths: + components = path.splitext(path.basename(k))[0].split('_') + [k] + if len(components) == 3: + components.insert(1, 'N/A') + + metakernels.append(dict(zip(metakernel_keys, components))) + + # naive filter, do we really need anything else? + if years: + metakernels = list(filter(lambda x:x['year'] in years or x['year'] == 'N/A', metakernels)) + if versions: + if versions == 'latest': + latest = [] + # Panda's groupby is overrated + for k, g in groupby(metakernels, lambda x:x['year']): + items = list(g) + latest.append(max(items, key=lambda x:x['version'])) + metakernels = latest + else: + metakernels = list(filter(lambda x:x['version'] in versions, metakernels)) + + avail['data'].extend(metakernels) + + avail['count'] = len(avail['data']) + + return avail + + +def find_latest_metakernel(path, year): + metakernel = None + mks = sorted(glob(os.path.join(path,'*.[Tt][Mm]'))) + if not mks: + raise Exception(f'No metakernels found in {path}.') + for mk in mks: + if str(year) in os.path.basename(mk): + metakernel = mk + if not metakernel: + raise Exception(f'No metakernels found in {path} for {year}.') + return metakernel + + + +def dict_merge(dct, merge_dct): + for k, v in merge_dct.items(): + if (k in dct and isinstance(dct[k], dict) + and isinstance(merge_dct[k], Mapping)): + dict_merge(dct[k], merge_dct[k]) + else: + dct[k] = merge_dct[k] + + return dct + + +def get_isis_preferences(isis_preferences=None): + """ + Returns ISIS Preference file as a pvl object + """ + argprefs = {} + if isis_preferences: + if isinstance(isis_preferences, dict): + argprefs = isis_preferences + else: + argprefs = read_pvl(isis_preferences) + + try: + homeprefs = read_pvl(os.path.join(os.path.expanduser("~"), '.Isis', 'IsisPreferences')) + except FileNotFoundError as e: + homeprefs = {} + + try: + isisrootprefs_path = os.path.join(os.environ["ISISROOT"], 'IsisPreferences') + isisrootprefs = read_pvl(isisrootprefs_path) + except (FileNotFoundError, KeyError) as e: + isisrootprefs = {} + + finalprefs = dict_merge(dict_merge(isisrootprefs, homeprefs), argprefs) + + return finalprefs + + +def dict_to_lower(d): + return {k.lower():v if not isinstance(v, dict) else dict_to_lower(v) for k,v in d.items()} + + +def expandvars(path, env_dict=os.environ, default=None, case_sensitive=True): + user_dict = env_dict if case_sensitive else dict_to_lower(env_dict) + + def replace_var(m): + group0 = m.group(0) if case_sensitive else m.group(0).lower() + group1 = m.group(1) if case_sensitive else m.group(1).lower() + + return user_dict.get(m.group(2) or group1, group0 if default is None else default) + reVar = r'\$(\w+|\{([^}]*)\})' + return re.sub(reVar, replace_var, path) + + +def generate_kernels_from_cube(cube, expand=False, format_as='list'): + """ + Parses a cube label to obtain the kernels from the Kernels group. + + Parameters + ---------- + cube : cube + Path to the cube to pull the kernels from. + expand : bool, optional + Whether or not to expand variables within kernel paths based on your IsisPreferences file. + format_as : str, optional {'list', 'dict'} + How to return the kernels: either as a one-dimensional ordered list, or as a dictionary + of kernel lists. + + Returns + ------- + : list + One-dimensional ordered list of all kernels from the Kernels group in the cube. + : Dictionary + Dictionary of lists of kernels with the keys being the Keywords from the Kernels group of + cube itself, and the values being the values associated with that Keyword in the cube. + """ + # enforce key order + mk_paths = OrderedDict.fromkeys( + ['TargetPosition', 'InstrumentPosition', + 'InstrumentPointing', 'Frame', 'TargetAttitudeShape', + 'Instrument', 'InstrumentAddendum', 'LeapSecond', + 'SpacecraftClock', 'Extra']) + + # just work with full path + cube = os.path.abspath(cube) + cubelabel = pvl.load(cube) + + try: + kernel_group = cubelabel['IsisCube'] + except KeyError: + raise KeyError(f'{cubelabel}, Could not find kernels group, input cube [{cube}] may not be spiceinited') + + return get_kernels_from_isis_pvl(kernel_group, expand, format_as) + +def get_kernels_from_isis_pvl(kernel_group, expand=True, format_as="list"): + # enforce key order + mk_paths = OrderedDict.fromkeys( + ['TargetPosition', 'InstrumentPosition', + 'InstrumentPointing', 'Frame', 'TargetAttitudeShape', + 'Instrument', 'InstrumentAddendum', 'LeapSecond', + 'SpacecraftClock', 'Extra']) + + + if isinstance(kernel_group, str): + kernel_group = pvl.loads(kernel_group) + + kernel_group = kernel_group["Kernels"] + + def load_table_data(key): + mk_paths[key] = kernel_group.get(key, None) + if isinstance(mk_paths[key], str): + mk_paths[key] = [mk_paths[key]] + while 'Table' in mk_paths[key]: mk_paths[key].remove('Table') + while 'Nadir' in mk_paths[key]: mk_paths[key].remove('Nadir') + + load_table_data('TargetPosition') + load_table_data('InstrumentPosition') + load_table_data('InstrumentPointing') + load_table_data('TargetAttitudeShape') + # the rest + mk_paths['Frame'] = [kernel_group.get('Frame', None)] + mk_paths['Instrument'] = [kernel_group.get('Instrument', None)] + mk_paths['InstrumentAddendum'] = [kernel_group.get('InstrumentAddendum', None)] + mk_paths['SpacecraftClock'] = [kernel_group.get('SpacecraftClock', None)] + mk_paths['LeapSecond'] = [kernel_group.get('LeapSecond', None)] + mk_paths['Clock'] = [kernel_group.get('Clock', None)] + mk_paths['Extra'] = [kernel_group.get('Extra', None)] + + if (format_as == 'list'): + # get kernels as 1-d string list + kernels = [kernel for kernel in chain.from_iterable(mk_paths.values()) if isinstance(kernel, str)] + if expand: + isisprefs = get_isis_preferences() + + if not "DataDirectory" in isisprefs: + warnings.warn("No IsisPreferences file found, is your ISISROOT env var set?") + + kernels = [expandvars(expandvars(k, dict_to_lower(isisprefs['DataDirectory']))) for k in kernels] + return kernels + elif (format_as == 'dict'): + # return created dict + if expand: + isisprefs = get_isis_preferences() + for kern_list in mk_paths: + for index, kern in enumerate(mk_paths[kern_list]): + if kern is not None: + mk_paths[kern_list][index] = expandvars(expandvars(kern, dict_to_lower(isisprefs['DataDirectory']))) + return mk_paths + else: + raise Exception(f'{format_as} is not a valid return format') + +def write_metakernel_from_cube(cube, mkpath=None): + # add ISISPREF paths as path_symbols and path_values to avoid custom expand logic + pvlprefs = get_isis_preferences() + + kernels = generate_kernels_from_cube(cube) + + # make sure kernels are mk strings + kernels = ["'"+k+"'" for k in kernels] + + paths = OrderedDict(pvlprefs['DataDirectory']) + path_values = ["'"+os.path.expandvars(path)+"'" for path in paths.values()] + path_symbols = ["'"+symbol.lower()+"'" for symbol in paths.keys()] + + body = '\n\n'.join([ + 'KPL/MK', + f'Metakernel Generated from an ISIS cube: {cube}', + '\\begindata', + 'PATH_VALUES = (', + '\n'.join(path_values), + ')', + 'PATH_SYMBOLS = (', + '\n'.join(path_symbols), + ')', + 'KERNELS_TO_LOAD = (', + '\n'.join(kernels), + ')', + '\\begintext' + ]) + + if mkpath is not None: + with open(mkpath, 'w') as f: + f.write(body) + + return body + +def get_ck_frames(kernel): + """ + Get all of the reference frames defined in a kernel. + + Parameters + ---------- + kernel : str + The path to the kernel + + Returns + ------- + ids : list + The set of reference frames IDs defined in the kernel + """ + ckbrief = subprocess.run(["ckbrief", "-t {}".format(kernel)], + capture_output=True, + check=True, + text=True) + ids = set() + for id in re.findall(r'^(-?[0-9]+)', ckbrief.stdout, flags=re.MULTILINE): + ids.add(int(id)) + # Sort the output list for testability + return sorted(list(ids)) + +def create_spk_dependency_tree(kernels): + """ + construct the dependency tree for the body states in a set of kernels. + + Parameters + ---------- + kernels : list + The list of kernels to evaluate the dependencies in. If two + kernels in this list contain the same information for the same + pair of bodies, then the later kernel in the list will be + identified in the kernel property for that edge in dep_tree. + + Returns + ------- + dep_tree : nx.DiGraph + The dependency tree for the kernels. Nodes are bodies. There is + an edge from one node to another if the state of the body of the + source node is defined relative to the state of the body of the + destination node. The kernel edge property identifies what kernel + the information for the edge is defined in. + """ + dep_tree = nx.DiGraph() + + for kernel in kernels: + brief = subprocess.run(["brief", "-c {}".format(kernel)], + capture_output=True, + check=True, + text=True) + for body, rel_body in re.findall(r'\((.*)\).*w\.r\.t\..*\((.*)\)', brief.stdout): + dep_tree.add_edge(int(body), int(rel_body), kernel=kernel) + + return dep_tree + +def spkmerge_config_string(dep_tree, output_spk, bodies, lsk, start, stop): + """ + Create the contents of an spkmerge config file that will produce a spk that + completely defines the state of a list of bodies for a time range. + + Parameters + ---------- + dep_tree : nx.DiGraph + Dependency tree from create_kernel_dependency_tree that contains + information about what the state of different bodies are relative + to and where that information is stored. + output_spk : str + The path to the SPK that will be output by spkmerge + bodies : list + The list of body ID codes that need to be defined in the kernel + created by spkmerge + lsk : str + The absolute path to the leap second kernel to use + start : str + The UTC start time for the kernel created by spkmerge + stop : str + The UTC stop time for the kernel created by spkmerge + + Returns + ------- + : str + The contents of an spkmerge config file that will produce a kernel that + defines the state of the input bodies for the input time range. + """ + input_kernels = set() + all_bodies = set(bodies) + for body in bodies: + # Everything is ultimately defined relative to + # SOLAR SYSTEM BARYCENTER (0) so find the path to it + dep_path = shortest_path(dep_tree, body, 0) + all_bodies.update(dep_path) + for i in range(len(dep_path) - 1): + input_kernels.add(dep_tree[dep_path[i]][dep_path[i+1]]['kernel']) + config = f"LEAPSECONDS_KERNEL = {lsk}\n" + config += f"SPK_KERNEL = {output_spk}\n" + config += f" BODIES = {', '.join([str(b) for b in all_bodies])}\n" + config += f" BEGIN_TIME = {start}\n" + config += f" END_TIME = {stop}\n" + for kernel in input_kernels: + config += f" SOURCE_SPK_KERNEL = {kernel}\n" + config += f" INCLUDE_COMMENTS = no\n" + return config + +def write_metakernel_from_kernel_list(kernels): + """ + Parameters + ---------- + kernels : str + list of kernel paths + + Returns + ------- + : str + Returns string representation of a Naif Metakernel file + """ + + kernels = [os.path.abspath(k) for k in kernels] + common_prefix = os.path.commonprefix(kernels) + + kernels = ["'"+"$PREFIX"+k[len(common_prefix):]+"'" for k in kernels] + body = '\n\n'.join([ + 'KPL/MK', + f'Metakernel Generated from a kernel list by Ale', + '\\begindata', + 'PATH_VALUES = (', + "'"+common_prefix+"'", + ')', + 'PATH_SYMBOLS = (', + "'PREFIX'", + ')', + 'KERNELS_TO_LOAD = (', + '\n'.join(kernels), + ')', + '\\begintext' + ]) + + return body + + + +def duckpool(naifvar, start=0, length=10, default=None): + """ + Duck typing friendly version of spiceypy kernel pool functions. + + Parameters + ---------- + naifvar : str + naif var string to query pool for + + start : int + Index of first value + + length : int + max number of values returned + + default : obj + Default value to return if key is not found in kernel pool + + Returns + ------- + : obj + Spice value returned from spiceypy if found, default value otherwise + + """ + for f in [spice.gdpool, spice.gcpool, spice.gipool]: + try: + val = f(naifvar, start, length) + return val[0] if len(val) == 1 else val + except: + continue + return default + + +def query_kernel_pool(matchstr="*"): + """ + Collect multiple keywords from the naif kernel pool based on a + template string + + Parameters + ---------- + matchstr : str + matchi_c formatted str + + Returns + ------- + : dict + python dictionary of naif keywords in {keyword:value} format. + """ + + try: + svars = spice.gnpool(matchstr, 0, 100) + except Exception as e: + warnings.warn(f"kernel search for {matchstr} failed with {e}") + svars = [] + + svals = [duckpool(v) for v in svars] + return dict(zip(svars, svals)) + + +def read_pvl(path, use_jank=False): + """ + Syntax sugar, used to load a pvl object file from path + + Parameters + ---------- + + path : str + Path to Pvl file + + use_jank : bool + If true, uses faster but less reliable JBFPvlParser, else uses standard PVL parser. + + """ + with open(path) as f: + preftext = f.read().replace('EndGroup', 'End_Group').replace("EndObject", "End_Object") + if use_jank: + pvlprefs = JBFPvlParser(open(path).read()) + else: + pvlprefs = pvl.loads(preftext) + + return pvlprefs + + +def get_isis_mission_translations(isis_data): + """ + Use ISIS translation files and return a lookup table. + + Parameters + ---------- + + isis_data : str + path to $ISIS3DATA + + Returns + ------- + + : dict + Dictionary mapping label mission strings to ISIS3 mission strings + + """ + mission_translation_file = read_pvl(os.path.join(isis_data, "base", "translations", "MissionName2DataDir.trn")) + # For some reason this file takes the form [value, key] for mission name -> data dir + lookup = [l[::-1] for l in mission_translation_file["MissionName"].getlist("Translation")] + return dict(lookup) + + +def JBFPvlParser(lines): + """ + Janky But Faster PVL Parser(TM) + + Only really supports ISIS's Kernel DB files. This is because KernelDB files are sometimes very large for smithed kernels. + This should bring the parsing time for those DB files from minutes to seconds. + Still needs nested object/group support and it should be able to read most PVL files. + + Parameters + ---------- + + lines : str + string body of PVL file. + + Returns : PVLModule + object representing the parsed PVL + + + """ + def JBFKeywordParser(lines): + keyword = lines[0].split("=")[0] + value = lines[0].split("=")[1]+"".join(l.strip() for l in lines[1:]) + + if "(" in value and ")" in value: + value = value.replace("(", "").replace(")", "").split(",") + value = tuple([v.replace("\"", "") for v in value]) + else: + value = value.strip() + + return keyword.strip(), value + + + if isinstance(lines, str): + lines = lines.split("\n") + + items = [] + lines = [l.strip() for l in lines if l.strip()] + metadata = [] + for i,l in enumerate(lines): + if "group = " in l.lower(): + metadata.append([i, "group_start"]) + elif "object = " in l.lower(): + metadata.append([i, "object_start"]) + elif "=" in l: + metadata.append([i, "keyword"]) + elif "end_group" in l.lower() or "endgroup" in l.lower(): + metadata.append([i, "group_end"]) + elif "end_object" in l.lower() or "endobject" in l.lower(): + metadata.append([i, "object_end"]) + + imeta = 0 + while imeta < len(metadata): + element_start_line, element_type = metadata[imeta] + + if element_type == "keyword": + next_element_start = metadata[imeta+1][0] if imeta+1<len(metadata) else len(lines)+1 + element_lines = lines[element_start_line:next_element_start] + items.append(JBFKeywordParser(element_lines)) + imeta+=1 + elif element_type == "group_start": + group_name = lines[element_start_line].split('=')[1].strip() + + next_meta = [(i,m) for i,m in enumerate(metadata[imeta:]) if m[1] == "group_end"][0] + next_group_start = next_meta[1][0] + + group_lines = lines[element_start_line+1:next_group_start] + items.append((group_name, JBFPvlParser(group_lines))) + imeta += next_meta[0] + elif element_type == "object_start": + # duplicate code but whatever + group_name = lines[element_start_line].split('=')[1].strip() + + next_meta = [(i,m) for i,m in enumerate(metadata[imeta:]) if m[1] == "object_end"][0] + next_group_start = next_meta[1][0] + + group_lines = lines[element_start_line+1:next_group_start] + items.append((group_name, JBFPvlParser(group_lines))) + imeta += next_meta[0] + elif element_type == "object_end" or element_type == "group_end": + imeta+=1 + + return pvl.PVLModule(items) + + +def search_isis_db(dbobj, labelobj, isis_data): + """ + Given an PVL obj of a KernelDB file and an Isis Label for a cube, find the best kernel + to attach to the cube. + + The Logic here is a bit gross, but it matches ISIS's implementation very closely. + + + Parameters + ---------- + dbobj : PVLModule + ISIS3 KernelDB file as a loaded PVLModule + + labelobj : PVLModule + Cube label as loaded PVLModule + + isis_data : str + path to $ISISDATA + + Returns + ------- + : dict + dictionary containing kernel list and optionally the kernel type if relevant. + + """ + if not dbobj: + return + + quality = dict(e[::-1] for e in enumerate(["predicted", "nadir", "reconstructed", "smithed"])) + + utc_start_time = labelobj["IsisCube"]["Instrument"]["StartTime"] + utc_stop_time = labelobj["IsisCube"]["Instrument"]["StopTime"] + + run_time = None + dependencies = None + kernels = [] + typ = None + types = [] + + # flag is set when a kernel is found matching the start time but not stop time + # and therefore a second pair needs to be found + partial_match = False + + # Flag is set when kernels encapsulating the entire image time is found + full_match = False + + for selection in dbobj.getlist("Selection"): + files = selection.getlist("File") + + # selection criteria + matches = selection.getlist("Match") + times = selection.getlist("Time") + + if not files: + raise Exception(f"No File found in {selection}") + + files = [path.join(*file) if isinstance(file, list) else file for file in files] + + for i,time in enumerate(times): + isis_time_format = '%Y %b %d %H:%M:%S.%f TDB' + other_isis_time_format = '"%Y %b %d %H:%M:%S.%f TDB"' + + try: + time = [datetime.strptime(time[0].strip(), isis_time_format), + datetime.strptime(time[1].strip(), isis_time_format)] + except Exception as e: + time = [datetime.strptime(time[0].strip(), other_isis_time_format), + datetime.strptime(time[1].strip(), other_isis_time_format)] + + time[0] = pytz.utc.localize(time[0]) + time[1] = pytz.utc.localize(time[1]) + start_time_in_range = utc_start_time >= time[0] and utc_start_time <= time[1] + stop_time_in_range = utc_stop_time >= time[0] and utc_stop_time <= time[1] + times[i] = stop_time_in_range, stop_time_in_range + + for i,match in enumerate(matches): + matches[i] = labelobj["IsisCube"][match[0].strip()][match[1].strip()].lower().strip() == match[2].lower().strip() + + if any(matches if matches else [True]): + for i,f in enumerate(files): + if isinstance(f, tuple): + f = os.path.join(*[e.strip() for e in f]) + + full_path = os.path.join(isis_data, f).replace("$", "").replace("\"", "") + if "{" in full_path: + start = full_path.find("{") + stop = full_path.find("}") + full_path = full_path[:start] + "?"*(stop-start-1) + full_path[stop+1:] + if '?' in full_path: + full_path = sorted(glob(full_path))[-1] + files[i] = full_path + + if times: + have_start_match, have_stop_match = list(map(list, zip(*times))) + typ = selection.get("Type", None) + typ = typ.lower().strip() if typ else None + + current_quality = max([quality[t.lower().strip()] for t in types if t]) if any(types) else 0 + + if any(have_start_match) and any(have_stop_match): + # best case, the image is fully encapsulated in the kernel + full_match = True + if quality[typ] >= current_quality: + kernels = files + types = [selection.get("Type", None)] + elif any(have_start_match): + kernels.extend(files) + types.append(selection.get("Type", None)) + partial_match = True + elif any(have_stop_match): + if partial_match: + if quality[typ] >= current_quality: + kernels.extend(files) + types.append(selection.get("Type", None)) + full_match = True + else: + full_match = True + kernels = files + types = [selection.get("Type", None)] + + if partial_match: + # this can only be true if a kernel matching start time was found + # but not the end time + raise Exception("Could not find kernels encapsulating the full image time") + + kernels = {"kernels" : kernels} + if any(types): + kernels["types"] = types + return kernels + + +def find_kernels(cube, isis_data, format_as=dict): + """ + Find all kernels for a cube and return a json object with categorized kernels. + + Parameters + ---------- + + cube : str + Path to an ISIS cube + + isis_data : str + path to $ISISDATA + + format_as : obj + What type to return the kernels as, ISIS3-like dict/PVL or flat list + + Returns + ------- + : obj + Container with kernels + """ + def remove_dups(listofElements): + # Create an empty list to store unique elements + uniqueList = [] + + # Iterate over the original list and for each element + # add it to uniqueList, if its not already there. + for elem in listofElements: + if elem not in uniqueList: + uniqueList.append(elem) + + # Return the list of unique elements + return uniqueList + + cube_label = pvl.load(cube) + mission_lookup_table = get_isis_mission_translations(isis_data) + + mission_dir = mission_lookup_table[cube_label["IsisCube"]["Instrument"]["SpacecraftName"]] + mission_dir = path.join(isis_data, mission_dir.lower()) + + kernel_dir = path.join(mission_dir, "kernels") + base_kernel_dir = path.join(isis_data, "base", "kernels") + + kernel_types = [ name for name in os.listdir(kernel_dir) if os.path.isdir(os.path.join(kernel_dir, name)) ] + kernel_types.extend(name for name in os.listdir(base_kernel_dir) if os.path.isdir(os.path.join(base_kernel_dir, name))) + kernel_types = set(kernel_types) + + db_files = [] + for typ in kernel_types: + files = sorted(glob(path.join(kernel_dir, typ, "*.db"))) + base_files = sorted(glob(path.join(base_kernel_dir, typ, "*.db"))) + files = [list(it) for k,it in groupby(files, key=lambda f:os.path.basename(f).split(".")[0])] + base_files = [list(it) for k,it in groupby(base_files, key=lambda f:os.path.basename(f).split(".")[0])] + + for instrument_dbs in files: + db_files.append(read_pvl(sorted(instrument_dbs)[-1], True)) + for base_dbs in base_files: + db_files.append(read_pvl(sorted(base_dbs)[-1], True)) + + + kernels = {} + for f in db_files: + #TODO: Error checking + typ = f[0][0] + kernel_search_results = search_isis_db(f[0][1], cube_label, isis_data) + + if not kernel_search_results: + kernels[typ] = None + else: + try: + kernels[typ]["kernels"].extend(kernel_search_results["kernels"]) + if any(kernel_search_results.get("types", [None])): + kernels[typ]["types"].extend(kernel_search_results["types"]) + except: + kernels[typ] = {} + kernels[typ]["kernels"] = kernel_search_results["kernels"] + if any(kernel_search_results.get("types", [None])): + kernels[typ]["types"] = kernel_search_results["types"] + + for k,v in kernels.items(): + if v: + kernels[k]["kernels"] = remove_dups(v["kernels"]) + + if format_as == dict: + return kernels + elif format_as == list: + kernel_list = [] + for _,kernels in kernels.items(): + if kernels: + kernel_list.extend(kernels["kernels"]) + return kernel_list + else: + warnings.warn(f"{format_as} is not a valid format, returning as dict") + return kernels diff --git a/tests/pytests/test_util.py b/tests/pytests/test_util.py new file mode 100644 index 0000000000000000000000000000000000000000..50c05902a74a68ce85da902c9264aef2f46a3e75 --- /dev/null +++ b/tests/pytests/test_util.py @@ -0,0 +1,541 @@ +import os +from os.path import join +from importlib import reload +import subprocess +import networkx as nx + +import pytest +import tempfile +import spiceypy as spice +import pvl +from unittest import mock +from unittest.mock import MagicMock, patch + +from collections import OrderedDict + +import ale +from ale import util + +@pytest.fixture +def cube_kernels(): + return """ + Object = IsisCube + Group = Instrument + StartTime = 2016-332T05:40:45.020 + StopTime = 2016-332T05:40:46.820 + InstrumentId = fake + SpacecraftName = fake + End_Group + + Group = Kernels + TargetAttitudeShape = $base/attitudeshape + TargetPosition = ($messenger/targetposition0, $messenger/targetposition1) + Instrument = $messenger/instrument + InstrumentPointing = (Table, $messenger/instrumentpointing0, $messenger/instrumentpointing1) + SpacecraftClock = $base/clock + InstrumentPosition = $messenger/instrumentposition + InstrumentAddendum = Null + ShapeModel = Null + End_Group + End_Object + End + """ + + +@pytest.fixture +def pvl_one_group(): + return """ + Group = Test + t = t1 + EndGroup + """ + +@pytest.fixture +def pvl_two_group(): + return """ + Group = Data + b = b2 + a = a2 + r = r2 + End_Group + + Group = Test + t = t2 + End_Group + """ + +@pytest.fixture +def pvl_three_group(): + # intentionally mixup End_Group and EndGroup since + # isis likes to mix them up as well + return """ + Group = Data + b = b3 + a = a3 + r = r3 + EndGroup + + Group = Test + t = t3 + EndGroup + + Group = Settings + delsystem32 = yes + End_Group + """ + +@pytest.fixture +def pvl_four_group(): + # Mock of the DataDirectory group + return """ + Group = DataDirectory + Base = $ISIS3DATA/base + Messenger = $ISIS3DATA/messenger + EndGroup + """ + +def test_pvl_parser(pvl_three_group): + obj = util.JBFPvlParser(pvl_three_group) + assert obj["Data"]["a"] == "a3" + assert obj["Test"]["t"] == "t3" + assert obj["Settings"]["delsystem32"] == "yes" + + +def test_find_kernels(cube_kernels, tmpdir): + ck_db = """ + Object = Pointing + Group = Selection + Time = ( "2016 JAN 01 00:00:00.000000 TDB", "2016 DEC 31 00:00:00.000000 TDB" ) + Type = Reconstructed + File = $MRO/fake + End_Group + End_Object + """ + + ik_db = """ + Object = instrument + Group = Selection + Match = ("Instrument", "InstrumentId", "fake") + File = ("fake", "not/a/real/file") + End_Group + End_Object + """ + translation = """ + Group = MissionName + InputKey = SpacecraftName + InputGroup = "IsisCube,Instrument" + InputPosition = (IsisCube, Instrument) + Translation = (fake, "fake") + End_Group + """ + + tmpdir.mkdir("fake").mkdir("kernels").mkdir("ik") + tmpdir.mkdir("base").mkdir("kernels").mkdir("ck") + tmpdir.mkdir("base", "translations") + + ck_db_file = tmpdir.join("base", "kernels", "ck", "kernel.01.db") + ik_db_file = tmpdir.join("fake", "kernels", "ik", "kernel.01.db") + translation_file = tmpdir.join("base", "translations", "MissionName2DataDir.trn") + cube_file = tmpdir.join("test.cub") + + with open(translation_file, "w") as f: + f.write(translation) + + with open(ck_db_file, "w") as f: + f.write(ck_db) + + with open(ik_db_file, "w") as f: + f.write(ik_db) + + with open(cube_file, "w") as cube: + cube.write(cube_kernels) + + print(pvl.load(str(cube_file))) + kernels = util.find_kernels(str(cube_file), str(tmpdir)) + assert kernels == {'Pointing': {'kernels': [str(tmpdir / 'MRO/fake')], 'types': ['Reconstructed']}, 'instrument': {'kernels': [str(tmpdir / 'fake/not/a/real/file')]}} + + +def test_kernel_from_cube_list(cube_kernels): + with tempfile.NamedTemporaryFile('r+') as cube: + cube.write(cube_kernels) + cube.flush() + kernels = util.generate_kernels_from_cube(cube.name) + assert kernels == ['$messenger/targetposition0', '$messenger/targetposition1','$messenger/instrumentposition', '$messenger/instrumentpointing0', '$messenger/instrumentpointing1', '$base/attitudeshape', '$messenger/instrument', '$base/clock'] + +def test_kernel_from_cube_list_expanded(monkeypatch, tmpdir, pvl_four_group, cube_kernels): + monkeypatch.setenv('ISISROOT', str(tmpdir)) + monkeypatch.setenv('ISIS3DATA', '/test/path') + + with open(tmpdir.join('IsisPreferences'), 'w+') as pvl_isisroot_file: + pvl_isisroot_file.write(pvl_four_group) + pvl_isisroot_file.flush() + + with tempfile.NamedTemporaryFile('r+') as cube: + cube.write(cube_kernels) + cube.flush() + kernels = util.generate_kernels_from_cube(cube.name, expand=True) + assert kernels == ['/test/path/messenger/targetposition0', '/test/path/messenger/targetposition1', '/test/path/messenger/instrumentposition', '/test/path/messenger/instrumentpointing0', '/test/path/messenger/instrumentpointing1', '/test/path/base/attitudeshape', '/test/path/messenger/instrument', '/test/path/base/clock'] + +def test_kernel_from_cube_dict(cube_kernels): + with tempfile.NamedTemporaryFile('r+') as cube: + cube.write(cube_kernels) + cube.flush() + kernels = util.generate_kernels_from_cube(cube.name, format_as='dict') + assert kernels == OrderedDict([('TargetPosition', ['$messenger/targetposition0', '$messenger/targetposition1']), ('InstrumentPosition', ['$messenger/instrumentposition']), ('InstrumentPointing', ['$messenger/instrumentpointing0', '$messenger/instrumentpointing1']), ('Frame', [None]), ('TargetAttitudeShape', ['$base/attitudeshape']), ('Instrument', ['$messenger/instrument']), ('InstrumentAddendum', [None]), ('LeapSecond', [None]), ('SpacecraftClock', ['$base/clock']), ('Extra', [None]), ('Clock', [None])]) + +def test_kernel_from_cube_dict_expanded(monkeypatch, tmpdir, pvl_four_group, cube_kernels): + monkeypatch.setenv('ISISROOT', str(tmpdir)) + monkeypatch.setenv('ISIS3DATA', '/test/path') + + with open(tmpdir.join('IsisPreferences'), 'w+') as pvl_isisroot_file: + pvl_isisroot_file.write(pvl_four_group) + pvl_isisroot_file.flush() + + with tempfile.NamedTemporaryFile('r+') as cube: + cube.write(cube_kernels) + cube.flush() + kernels = util.generate_kernels_from_cube(cube.name, expand=True, format_as='dict') + assert kernels == OrderedDict([('TargetPosition', ['/test/path/messenger/targetposition0', '/test/path/messenger/targetposition1']), ('InstrumentPosition', ['/test/path/messenger/instrumentposition']), ('InstrumentPointing', ['/test/path/messenger/instrumentpointing0', '/test/path/messenger/instrumentpointing1']), ('Frame', [None]), ('TargetAttitudeShape', ['/test/path/base/attitudeshape']), ('Instrument', ['/test/path/messenger/instrument']), ('InstrumentAddendum', [None]), ('LeapSecond', [None]), ('SpacecraftClock', ['/test/path/base/clock']), ('Extra', [None]), ('Clock', [None])]) + +def test_kernel_from_cube_no_kernel_group(): + with pytest.raises(KeyError): + with tempfile.NamedTemporaryFile('w+') as cube: + cube.write('') + cube.flush() + util.generate_kernels_from_cube(cube.name) + +def test_get_preferences_arg(tmpdir, pvl_one_group): + with open(tmpdir.join('IsisPrefrences'), 'w+') as pvl_file: + pvl_file.write(pvl_one_group) + pvl_file.flush() + + pvl_obj = util.get_isis_preferences(pvl_file.name) + pvl_obj_from_dict = util.get_isis_preferences({**pvl_obj}) + + assert pvl_obj['Test']['t'] == 't1' + assert pvl_obj == pvl_obj_from_dict + +def test_get_prefernces_arg_isisroot(monkeypatch, tmpdir, pvl_one_group, pvl_two_group): + monkeypatch.setenv('ISISROOT', str(tmpdir)) + + with open(tmpdir.join('IsisPreferences'), 'w+') as pvl_isisroot_file: + with open(tmpdir.join('arg_prefs.pvl'), 'w+') as pvl_arg_file: + pvl_arg_file.write(pvl_two_group) + pvl_arg_file.flush() + + pvl_isisroot_file.write(pvl_one_group) + pvl_isisroot_file.flush() + + pvl_obj = util.get_isis_preferences(pvl_arg_file.name) + + assert pvl_obj['Test']['t'] == 't2' + assert pvl_obj['Data']['b'] == 'b2' + assert pvl_obj['Data']['a'] == 'a2' + assert pvl_obj['Data']['r'] == 'r2' + +def test_dict_to_lower(): + data = {'F': {'O' : {'O': 1}, 'B': {'A': 2}}} + expected = {'f': {'o' : {'o': 1}, 'b': {'a': 2}}} + assert util.dict_to_lower(data) == expected + +def test_get_prefernces_arg_isisroot_home(monkeypatch, tmpdir, pvl_one_group, pvl_two_group, pvl_three_group): + tmpdir.mkdir('.Isis') + monkeypatch.setenv('ISISROOT', str(tmpdir)) + monkeypatch.setenv('HOME', str(tmpdir)) + + with open(tmpdir.join('arg_prefs.pvl'), 'w+') as pvl_arg_file: + with open(tmpdir.join('.Isis', 'IsisPreferences'), 'w+') as pvl_home_file: + with open(tmpdir.join('IsisPreferences'), 'w+') as pvl_isisroot_file: + pvl_arg_file.write(pvl_one_group) + pvl_arg_file.flush() + + pvl_home_file.write(pvl_three_group) + pvl_home_file.flush() + + pvl_isisroot_file.write(pvl_two_group) + pvl_isisroot_file.flush() + + pvl_obj = util.get_isis_preferences(pvl_arg_file.name) + + assert pvl_obj['Test']['t'] == 't1' + assert pvl_obj['Data']['b'] == 'b3' + + +@pytest.mark.parametrize('filename', [os.path.join('.Isis', 'IsisPreferences'), 'IsisPreferences']) +def test_get_prefrences_malformed_files(monkeypatch, tmpdir, filename): + monkeypatch.setenv('ISISROOT', str(tmpdir)) + monkeypatch.setenv('HOME', str(tmpdir)) + tmpdir.mkdir('.Isis') + + with pytest.raises(pvl.parser.LexerError): + with open(tmpdir.join(filename), 'w+') as brokenpref: + brokenpref.write('Totally not PVL') + brokenpref.flush() + util.get_isis_preferences() + + with pytest.raises(pvl.parser.LexerError): + util.get_isis_preferences(tmpdir.join(filename)) + + +@pytest.mark.parametrize('string,expected,case_sensitive', [('$bar/baz', '/bar/baz', False), ('$bar/$foo/baz', '/bar//foo/baz', True), ('$BAR/$FOO/baz', '/bar//foo/baz', False)]) +def test_expand_vars(string, expected, case_sensitive): + user_vars = {'foo': '/foo', 'bar': '/bar'} + result = util.expandvars(string, env_dict=user_vars, case_sensitive=case_sensitive) + assert result == expected + + +@pytest.mark.parametrize('search_kwargs,expected', + [({'years':'2009', 'versions':'v01'}, {'count':1, 'data':[{'path':join('foo-b-v01', 'foo_2009_v01.tm'), 'year':'2009', 'mission':'foo', 'version':'v01'}]}), + ({'versions':'v02', 'years':2010}, {'count': 1, 'data': [{'path':join('bar-b-v01', 'bar_2010_v02.tm'), 'year':'2010', 'mission':'bar', 'version': 'v02'}]})]) +def test_get_metakernels(tmpdir, search_kwargs, expected): + tmpdir.mkdir('foo-b-v01') + tmpdir.mkdir('bar-b-v01') + + open(tmpdir.join('foo-b-v01', 'foo_2009_v01.tm'), 'w').close() + open(tmpdir.join('bar-b-v01', 'bar_2010_v02.tm'), 'w').close() + + search_result = util.get_metakernels(str(tmpdir), **search_kwargs) + # we can't know the tmpdir at parameterization, append it here + for r in expected['data']: + r['path'] = str(tmpdir.join(r['path'])) + + assert search_result == expected + +@pytest.mark.parametrize('search_kwargs, expected', + [({'years':'2009', 'versions':'v01'}, {'count':0, 'data':[]})]) +def test_get_metakernels_no_alespiceroot(monkeypatch, search_kwargs, expected): + monkeypatch.delenv('ALESPICEROOT', raising=False) + reload(ale) + with pytest.warns(UserWarning, match="Unable to search mission directories without" + + "ALESPICEROOT being set. Defaulting to empty list"): + search_result = ale.util.get_metakernels(**search_kwargs) + print(search_result) + monkeypatch.setenv('ALESPICEROOT', '/foo/bar') + reload(ale) + + assert search_result == expected + +@pytest.mark.parametrize('search_kwargs', [{'years':'2010'}, {'years':2010}, {'years': [2010]}, {'years': ['2010']}, {'years': set(['2010', '1999', '1776'])}, + {'missions':'bar', 'versions':'v20'}, {'missions': ['bar'], 'versions':'v20'}, {'missions': 'bar', 'versions':['v20', 'v03']}, {'missions':set(['bar']),'years': 2010, 'versions': 'latest'} ]) +def test_get_metakernels_search_args(tmpdir, search_kwargs): + tmpdir.mkdir('foo-b-v01') + tmpdir.mkdir('bar-b-v01') + + open(tmpdir.join('foo-b-v01', 'foo_2009_v01.tm'), 'w').close() + open(tmpdir.join('bar-b-v01', 'bar_9009_v01.tm'), 'w').close() + open(tmpdir.join('bar-b-v01', 'bar_2009_v10.tm'), 'w').close() + + test_mk = tmpdir.join('bar-b-v01', 'bar_2010_v20.tm') + open(test_mk, 'w').close() + + search_result = util.get_metakernels(str(tmpdir), **search_kwargs) + + expected = { + 'count' : 1, + 'data' : [{ + 'year' : '2010', + 'mission' : 'bar', + 'version': 'v20', + 'path': test_mk + }] + } + + assert search_result == expected + +@pytest.mark.parametrize('search_kwargs,expected_count', [({'years':'2010'}, 2), ({'years': ['1990', '2009']}, 4), ({'years':'9009'}, 1), ({'years':'all'}, 7), ({'years':[]},7), ({'versions':'latest'}, 6), ({'versions':'all'},7), ({'versions':[]}, 7), ({'versions':None}, 7), ({'versions':['v20']}, 2), ({'versions':['v10', 'v01']}, 4), ({'missions': 'foo'}, 3), ({'missions':'bar'},3), ({'missions':'baz'},1), ({'missions':'all'}, 7), ({'missions':['foo', 'bar'], 'versions': 'v01', 'years': + 2009}, 1), ({}, 7), ({'versions': 'latest', 'missions':'foo'}, 2), ({'missions': 'not_real'}, 0)]) +def test_get_metakernels_search_counts(tmpdir, search_kwargs, expected_count): + tmpdir.mkdir('foo-b-v01') + tmpdir.mkdir('bar-b-v01') + tmpdir.mkdir('baz-b-v100') + + open(tmpdir.join('foo-b-v01', 'foo_2009_v01.tm'), 'w').close() + open(tmpdir.join('foo-b-v01', 'foo_2009_v20.tm'), 'w').close() + open(tmpdir.join('foo-b-v01', 'foo_2010_v20.tm'), 'w').close() + open(tmpdir.join('bar-b-v01', 'bar_9009_v01.tm'), 'w').close() + open(tmpdir.join('bar-b-v01', 'bar_2009_v10.tm'), 'w').close() + open(tmpdir.join('bar-b-v01', 'bar_2010_v02.tm'), 'w').close() + open(tmpdir.join('baz-b-v100', 'baz_1990_v10.tm'), 'w').close() + + + search_result = util.get_metakernels(str(tmpdir), **search_kwargs) + assert search_result['count'] == expected_count + +def test_create_spk_dependency_tree(): + de430_output =""" +BRIEF -- Version 4.0.0, September 8, 2010 -- Toolkit Version N0066 + + +Summary for: de430.bsp + +Bodies: MERCURY BARYCENTER (1) w.r.t. SOLAR SYSTEM BARYCENTER (0) + VENUS BARYCENTER (2) w.r.t. SOLAR SYSTEM BARYCENTER (0) + EARTH BARYCENTER (3) w.r.t. SOLAR SYSTEM BARYCENTER (0) + MARS BARYCENTER (4) w.r.t. SOLAR SYSTEM BARYCENTER (0) + JUPITER BARYCENTER (5) w.r.t. SOLAR SYSTEM BARYCENTER (0) + SATURN BARYCENTER (6) w.r.t. SOLAR SYSTEM BARYCENTER (0) + URANUS BARYCENTER (7) w.r.t. SOLAR SYSTEM BARYCENTER (0) + NEPTUNE BARYCENTER (8) w.r.t. SOLAR SYSTEM BARYCENTER (0) + PLUTO BARYCENTER (9) w.r.t. SOLAR SYSTEM BARYCENTER (0) + SUN (10) w.r.t. SOLAR SYSTEM BARYCENTER (0) + MERCURY (199) w.r.t. MERCURY BARYCENTER (1) + VENUS (299) w.r.t. VENUS BARYCENTER (2) + MOON (301) w.r.t. EARTH BARYCENTER (3) + EARTH (399) w.r.t. EARTH BARYCENTER (3) + Start of Interval (ET) End of Interval (ET) + ----------------------------- ----------------------------- + 1549 DEC 31 00:00:00.000 2650 JAN 25 00:00:00.000 +""" + msgr_20040803_20150430_od431sc_2_output =""" +BRIEF -- Version 4.0.0, September 8, 2010 -- Toolkit Version N0066 + + +Summary for: msgr_20040803_20150430_od431sc_2.bsp + +Body: MESSENGER (-236) w.r.t. MERCURY BARYCENTER (1) + Start of Interval (ET) End of Interval (ET) + ----------------------------- ----------------------------- + 2008 JAN 13 19:18:06.919 2008 JAN 15 18:52:08.364 + 2008 OCT 05 06:12:17.599 2008 OCT 07 11:08:05.670 + 2009 SEP 28 05:25:44.180 2009 OCT 01 14:07:55.870 + 2011 MAR 15 11:20:36.100 2015 APR 30 19:27:09.351 + +Body: MESSENGER (-236) w.r.t. VENUS BARYCENTER (2) + Start of Interval (ET) End of Interval (ET) + ----------------------------- ----------------------------- + 2006 OCT 16 19:26:46.293 2006 OCT 31 22:15:29.222 + 2007 MAY 29 09:28:46.998 2007 JUN 13 15:30:49.997 + +Body: MESSENGER (-236) w.r.t. SUN (10) + Start of Interval (ET) End of Interval (ET) + ----------------------------- ----------------------------- + 2004 AUG 10 04:06:05.612 2005 JUL 26 22:39:53.204 + 2005 AUG 09 16:04:35.204 2006 OCT 16 19:26:46.293 + 2006 OCT 31 22:15:29.222 2007 MAY 29 09:28:46.998 + 2007 JUN 13 15:30:49.997 2008 JAN 13 19:18:06.919 + 2008 JAN 15 18:52:08.364 2008 OCT 05 06:12:17.599 + 2008 OCT 07 11:08:05.670 2009 SEP 28 05:25:44.180 + 2009 OCT 01 14:07:55.870 2011 MAR 15 11:20:36.100 + +Body: MESSENGER (-236) w.r.t. EARTH (399) + Start of Interval (ET) End of Interval (ET) + ----------------------------- ----------------------------- + 2004 AUG 03 07:14:39.393 2004 AUG 10 04:06:05.612 + 2005 JUL 26 22:39:53.204 2005 AUG 09 16:04:35.204 + +Bodies: MERCURY BARYCENTER (1) w.r.t. SOLAR SYSTEM BARYCENTER (0) + EARTH BARYCENTER (3) w.r.t. SOLAR SYSTEM BARYCENTER (0) + Start of Interval (ET) End of Interval (ET) + ----------------------------- ----------------------------- + 2004 AUG 03 07:14:39.393 2015 APR 30 19:27:09.351 + +""" + de430_mock = MagicMock(spec=subprocess.CompletedProcess) + de430_mock.stdout = de430_output + msgr_20040803_20150430_od431sc_2_mock = MagicMock(spec=subprocess.CompletedProcess) + msgr_20040803_20150430_od431sc_2_mock.stdout = msgr_20040803_20150430_od431sc_2_output + + expected_tree = nx.DiGraph() + expected_tree.add_edge(1, 0, kernel='msgr_20040803_20150430_od431sc_2.bsp') + expected_tree.add_edge(2, 0, kernel='de430.bsp') + expected_tree.add_edge(3, 0, kernel='msgr_20040803_20150430_od431sc_2.bsp') + expected_tree.add_edge(4, 0, kernel='de430.bsp') + expected_tree.add_edge(5, 0, kernel='de430.bsp') + expected_tree.add_edge(6, 0, kernel='de430.bsp') + expected_tree.add_edge(7, 0, kernel='de430.bsp') + expected_tree.add_edge(8, 0, kernel='de430.bsp') + expected_tree.add_edge(9, 0, kernel='de430.bsp') + expected_tree.add_edge(10, 0, kernel='de430.bsp') + expected_tree.add_edge(199, 1, kernel='de430.bsp') + expected_tree.add_edge(299, 2, kernel='de430.bsp') + expected_tree.add_edge(301, 3, kernel='de430.bsp') + expected_tree.add_edge(399, 3, kernel='de430.bsp') + expected_tree.add_edge(-236, 1, kernel='msgr_20040803_20150430_od431sc_2.bsp') + expected_tree.add_edge(-236, 2, kernel='msgr_20040803_20150430_od431sc_2.bsp') + expected_tree.add_edge(-236, 10, kernel='msgr_20040803_20150430_od431sc_2.bsp') + expected_tree.add_edge(-236, 399, kernel='msgr_20040803_20150430_od431sc_2.bsp') + + with patch('subprocess.run', side_effect = [de430_mock, msgr_20040803_20150430_od431sc_2_mock]) as run_mock: + dep_tree = util.create_spk_dependency_tree(['de430.bsp', 'msgr_sc_EN1072174528M.bsp']) + run_mock.assert_any_call(["brief", "-c de430.bsp"], + capture_output=True, + check=True, + text=True) + run_mock.assert_any_call(["brief", "-c msgr_sc_EN1072174528M.bsp"], + capture_output=True, + check=True, + text=True) + assert nx.is_isomorphic(dep_tree, expected_tree) + +def test_spkmerge_config_string(): + dep_tree = nx.DiGraph() + dep_tree.add_edge(1, 0, kernel='msgr_20040803_20150430_od431sc_2.bsp') + dep_tree.add_edge(2, 0, kernel='de430.bsp') + dep_tree.add_edge(3, 0, kernel='msgr_20040803_20150430_od431sc_2.bsp') + dep_tree.add_edge(4, 0, kernel='de430.bsp') + dep_tree.add_edge(5, 0, kernel='de430.bsp') + dep_tree.add_edge(6, 0, kernel='de430.bsp') + dep_tree.add_edge(7, 0, kernel='de430.bsp') + dep_tree.add_edge(8, 0, kernel='de430.bsp') + dep_tree.add_edge(9, 0, kernel='de430.bsp') + dep_tree.add_edge(10, 0, kernel='de430.bsp') + dep_tree.add_edge(199, 1, kernel='de430.bsp') + dep_tree.add_edge(299, 2, kernel='de430.bsp') + dep_tree.add_edge(301, 3, kernel='de430.bsp') + dep_tree.add_edge(399, 3, kernel='de430.bsp') + dep_tree.add_edge(-236, 1, kernel='msgr_20040803_20150430_od431sc_2.bsp') + dep_tree.add_edge(-236, 2, kernel='msgr_20040803_20150430_od431sc_2.bsp') + dep_tree.add_edge(-236, 10, kernel='msgr_20040803_20150430_od431sc_2.bsp') + dep_tree.add_edge(-236, 399, kernel='msgr_20040803_20150430_od431sc_2.bsp') + + config_string = util.spkmerge_config_string(dep_tree, + 'theoutput.bsp', + [-236, 199, 10], + 'thelsk.tls', + 'the start UTC string', + 'the stop UTC string') + assert config_string == """LEAPSECONDS_KERNEL = thelsk.tls +SPK_KERNEL = theoutput.bsp + BODIES = 0, 1, 199, 10, -236 + BEGIN_TIME = the start UTC string + END_TIME = the stop UTC string + SOURCE_SPK_KERNEL = de430.bsp + INCLUDE_COMMENTS = no + SOURCE_SPK_KERNEL = msgr_20040803_20150430_od431sc_2.bsp + INCLUDE_COMMENTS = no +""" or config_string == """LEAPSECONDS_KERNEL = thelsk.tls +SPK_KERNEL = theoutput.bsp + BODIES = 0, 1, 199, 10, -236 + BEGIN_TIME = the start UTC string + END_TIME = the stop UTC string + SOURCE_SPK_KERNEL = msgr_20040803_20150430_od431sc_2.bsp + INCLUDE_COMMENTS = no + SOURCE_SPK_KERNEL = de430.bsp + INCLUDE_COMMENTS = no +""" + +def test_get_ck_frames(): + msgr_0905_v02_output = """ +CKBRIEF -- Version 6.1.0, June 27, 2014 -- Toolkit Version N0066 + + +Summary for: msgr_0905_v02.bc + +Objects Interval Begin ET Interval End ET AV +-------- ------------------------ ------------------------ --- +-236001 NEED LSK AND SCLK FILES NEED LSK AND SCLK FILES Y +-236002 NEED LSK AND SCLK FILES NEED LSK AND SCLK FILES Y +-236002 NEED LSK AND SCLK FILES NEED LSK AND SCLK FILES Y +-236000 NEED LSK AND SCLK FILES NEED LSK AND SCLK FILES Y +-236002 NEED LSK AND SCLK FILES NEED LSK AND SCLK FILES Y + +""" + msgr_0905_v02_mock = MagicMock(spec=subprocess.CompletedProcess) + msgr_0905_v02_mock.stdout = msgr_0905_v02_output + with patch('subprocess.run', side_effect = [msgr_0905_v02_mock]) as run_mock: + frames = util.get_ck_frames('msgr_0905_v02.bc') + run_mock.assert_any_call(["ckbrief", "-t msgr_0905_v02.bc"], + capture_output=True, + check=True, + text=True) + assert frames == [-236002, -236001, -236000]