diff --git a/ale/kernel_access.py b/ale/kernel_access.py new file mode 100644 index 0000000000000000000000000000000000000000..43f9c68c6fa3be0dee8c028ea294cdf71610c5ac --- /dev/null +++ b/ale/kernel_access.py @@ -0,0 +1,297 @@ +from glob import glob +from itertools import groupby +import os +from os import path +import warnings + + +from ale import spice_root +from ale.util import get_isis_preferences + +def get_metakernels(spice_dir=spice_root, missions=set(), years=set(), versions=set()): + """ + Given a root directory, get any subdirectory containing metakernels, + assume spice directory structure. + + Mostly doing filtering here, might be worth using Pandas? + + Parameters + ---------- + spice_dir : str + Path containing Spice directories downloaded from NAIF's website + + missions : set, str + Mission or set of missions to search for + + years : set, str, int + year or set of years to search for + + versions : set, str + version or set of versions to search for + """ + if not missions or missions == "all": + missions = set() + if not years or years == "all": + years = set() + if not versions or versions == "all": + versions = set() + + if isinstance(missions, str): + missions = {missions} + + if isinstance(years, str) or isinstance(years, int): + years = {str(years)} + else: + years = {str(year) for year in years} + + avail = { + 'count': 0, + 'data': [] + } + + missions = [m.lower() for m in missions] + if spice_dir is not None: + mission_dirs = list(filter(path.isdir, glob(path.join(spice_dir, '*')))) + else: + warnings.warn("Unable to search mission directories without" + + "ALESPICEROOT being set. Defaulting to empty list") + mission_dirs = [] + + for md in mission_dirs: + # Assuming spice root has the same name as the original on NAIF website" + mission = os.path.basename(md).split('-')[0].split('_')[0] + if missions and all([m not in mission.lower() for m in missions]): + continue + + metakernel_keys = ['mission', 'year', 'version', 'path'] + + # recursive glob to make metakernel search more robust to subtle directory structure differences + metakernel_paths = sorted(glob(os.path.join(md, '**','*.[Tt][Mm]'), recursive=True)) + + metakernels = [] + for k in metakernel_paths: + components = path.splitext(path.basename(k))[0].split('_') + [k] + if len(components) == 3: + components.insert(1, 'N/A') + + metakernels.append(dict(zip(metakernel_keys, components))) + + # naive filter, do we really need anything else? + if years: + metakernels = list(filter(lambda x:x['year'] in years or x['year'] == 'N/A', metakernels)) + if versions: + if versions == 'latest': + latest = [] + # Panda's groupby is overrated + for k, g in groupby(metakernels, lambda x:x['year']): + items = list(g) + latest.append(max(items, key=lambda x:x['version'])) + metakernels = latest + else: + metakernels = list(filter(lambda x:x['version'] in versions, metakernels)) + + avail['data'].extend(metakernels) + + avail['count'] = len(avail['data']) + + return avail + + +def generate_kernels_from_cube(cube, expand=False, format_as='list'): + """ + Parses a cube label to obtain the kernels from the Kernels group. + + Parameters + ---------- + cube : cube + Path to the cube to pull the kernels from. + expand : bool, optional + Whether or not to expand variables within kernel paths based on your IsisPreferences file. + See :func:`get_isis_preferences` for how the IsisPreferences file is found. + format_as : str, optional {'list', 'dict'} + How to return the kernels: either as a one-dimensional ordered list, or as a dictionary + of kernel lists. + + Returns + ------- + : list + One-dimensional ordered list of all kernels from the Kernels group in the cube. + : Dictionary + Dictionary of lists of kernels with the keys being the Keywords from the Kernels group of + cube itself, and the values being the values associated with that Keyword in the cube. + """ + # just work with full path + cube = os.path.abspath(cube) + cubelabel = pvl.load(cube) + + try: + kernel_group = cubelabel['IsisCube'] + except KeyError: + raise KeyError(f'{cubelabel}, Could not find kernels group, input cube [{cube}] may not be spiceinited') + + return get_kernels_from_isis_pvl(kernel_group, expand, format_as) + + +def get_kernels_from_isis_pvl(kernel_group, expand=True, format_as="list"): + # enforce key order + mk_paths = OrderedDict.fromkeys( + ['TargetPosition', 'InstrumentPosition', + 'InstrumentPointing', 'Frame', 'TargetAttitudeShape', + 'Instrument', 'InstrumentAddendum', 'LeapSecond', + 'SpacecraftClock', 'Extra']) + + + if isinstance(kernel_group, str): + kernel_group = pvl.loads(kernel_group) + + kernel_group = kernel_group["Kernels"] + + def load_table_data(key): + mk_paths[key] = kernel_group.get(key, None) + if isinstance(mk_paths[key], str): + mk_paths[key] = [mk_paths[key]] + while 'Table' in mk_paths[key]: mk_paths[key].remove('Table') + while 'Nadir' in mk_paths[key]: mk_paths[key].remove('Nadir') + + load_table_data('TargetPosition') + load_table_data('InstrumentPosition') + load_table_data('InstrumentPointing') + load_table_data('TargetAttitudeShape') + # the rest + mk_paths['Frame'] = [kernel_group.get('Frame', None)] + mk_paths['Instrument'] = [kernel_group.get('Instrument', None)] + mk_paths['InstrumentAddendum'] = [kernel_group.get('InstrumentAddendum', None)] + mk_paths['SpacecraftClock'] = [kernel_group.get('SpacecraftClock', None)] + mk_paths['LeapSecond'] = [kernel_group.get('LeapSecond', None)] + mk_paths['Clock'] = [kernel_group.get('Clock', None)] + mk_paths['Extra'] = [kernel_group.get('Extra', None)] + + # handles issue with OsirisRex instrument kernels being in a 2d list + if isinstance(mk_paths['Instrument'][0], list): + mk_paths['Instrument'] = np.concatenate(mk_paths['Instrument']).flat + + if (format_as == 'list'): + # get kernels as 1-d string list + kernels = [] + for kernel in chain.from_iterable(mk_paths.values()): + if isinstance(kernel, str): + kernels.append(kernel) + elif isinstance(kernel, list): + kernels.extend(kernel) + if expand: + isisprefs = get_isis_preferences() + if not "DataDirectory" in isisprefs: + warnings.warn("No IsisPreferences file found, is your ISISROOT env var set?") + + kernels = [expandvars(expandvars(k, isisprefs['DataDirectory'], case_sensitive=False)) for k in kernels] + # Ensure that the ISIS Addendum kernel is last in case it overrides + # some values from the default Instrument kernel + kernels = sorted(kernels, key=lambda x: "Addendum" in x) + return kernels + elif (format_as == 'dict'): + # return created dict + if expand: + isisprefs = get_isis_preferences() + for kern_list in mk_paths: + for index, kern in enumerate(mk_paths[kern_list]): + if kern is not None: + mk_paths[kern_list][index] = expandvars(expandvars(kern, isisprefs['DataDirectory'], case_sensitive=False)) + return mk_paths + else: + raise Exception(f'{format_as} is not a valid return format') + + +def find_kernels(cube, isis_data, format_as=dict): + """ + Find all kernels for a cube and return a json object with categorized kernels. + + Parameters + ---------- + + cube : str + Path to an ISIS cube + + isis_data : str + path to $ISISDATA + + format_as : obj + What type to return the kernels as, ISIS3-like dict/PVL or flat list + + Returns + ------- + : obj + Container with kernels + """ + def remove_dups(listofElements): + # Create an empty list to store unique elements + uniqueList = [] + + # Iterate over the original list and for each element + # add it to uniqueList, if its not already there. + for elem in listofElements: + if elem not in uniqueList: + uniqueList.append(elem) + + # Return the list of unique elements + return uniqueList + + cube_label = pvl.load(cube) + mission_lookup_table = get_isis_mission_translations(isis_data) + + mission_dir = mission_lookup_table[cube_label["IsisCube"]["Instrument"]["SpacecraftName"]] + mission_dir = path.join(isis_data, mission_dir.lower()) + + kernel_dir = path.join(mission_dir, "kernels") + base_kernel_dir = path.join(isis_data, "base", "kernels") + + kernel_types = [ name for name in os.listdir(kernel_dir) if os.path.isdir(os.path.join(kernel_dir, name)) ] + kernel_types.extend(name for name in os.listdir(base_kernel_dir) if os.path.isdir(os.path.join(base_kernel_dir, name))) + kernel_types = set(kernel_types) + + db_files = [] + for typ in kernel_types: + files = sorted(glob(path.join(kernel_dir, typ, "*.db"))) + base_files = sorted(glob(path.join(base_kernel_dir, typ, "*.db"))) + files = [list(it) for k,it in groupby(files, key=lambda f:os.path.basename(f).split(".")[0])] + base_files = [list(it) for k,it in groupby(base_files, key=lambda f:os.path.basename(f).split(".")[0])] + + for instrument_dbs in files: + db_files.append(read_pvl(sorted(instrument_dbs)[-1], True)) + for base_dbs in base_files: + db_files.append(read_pvl(sorted(base_dbs)[-1], True)) + + + kernels = {} + for f in db_files: + #TODO: Error checking + typ = f[0][0] + kernel_search_results = search_isis_db(f[0][1], cube_label, isis_data) + + if not kernel_search_results: + kernels[typ] = None + else: + try: + kernels[typ]["kernels"].extend(kernel_search_results["kernels"]) + if any(kernel_search_results.get("types", [None])): + kernels[typ]["types"].extend(kernel_search_results["types"]) + except: + kernels[typ] = {} + kernels[typ]["kernels"] = kernel_search_results["kernels"] + if any(kernel_search_results.get("types", [None])): + kernels[typ]["types"] = kernel_search_results["types"] + + for k,v in kernels.items(): + if v: + kernels[k]["kernels"] = remove_dups(v["kernels"]) + + if format_as == dict: + return kernels + elif format_as == list: + kernel_list = [] + for _,kernels in kernels.items(): + if kernels: + kernel_list.extend(kernels["kernels"]) + return kernel_list + else: + warnings.warn(f"{format_as} is not a valid format, returning as dict") + return kernels diff --git a/ale/util.py b/ale/util.py index c3458034848fee1e495b6f721cde9735651fbdb8..5e4d4a69bcbdd9d776da8d5d269c759fa0d0af29 100644 --- a/ale/util.py +++ b/ale/util.py @@ -2,21 +2,17 @@ import os from os import path from glob import glob -from itertools import filterfalse, groupby import warnings import pvl -import collections from collections import OrderedDict try: from collections.abc import Mapping except ImportError: from collections import Mapping -from itertools import chain from datetime import datetime import pytz -import numpy as np import subprocess import re @@ -25,97 +21,6 @@ from networkx.algorithms.shortest_paths.generic import shortest_path import spiceypy as spice -from ale import spice_root - -def get_metakernels(spice_dir=spice_root, missions=set(), years=set(), versions=set()): - """ - Given a root directory, get any subdirectory containing metakernels, - assume spice directory structure. - - Mostly doing filtering here, might be worth using Pandas? - - Parameters - ---------- - spice_dir : str - Path containing Spice directories downloaded from NAIF's website - - missions : set, str - Mission or set of missions to search for - - years : set, str, int - year or set of years to search for - - versions : set, str - version or set of versions to search for - """ - if not missions or missions == "all": - missions = set() - if not years or years == "all": - years = set() - if not versions or versions == "all": - versions = set() - - if isinstance(missions, str): - missions = {missions} - - if isinstance(years, str) or isinstance(years, int): - years = {str(years)} - else: - years = {str(year) for year in years} - - avail = { - 'count': 0, - 'data': [] - } - - missions = [m.lower() for m in missions] - if spice_dir is not None: - mission_dirs = list(filter(path.isdir, glob(path.join(spice_dir, '*')))) - else: - warnings.warn("Unable to search mission directories without" + - "ALESPICEROOT being set. Defaulting to empty list") - mission_dirs = [] - - for md in mission_dirs: - # Assuming spice root has the same name as the original on NAIF website" - mission = os.path.basename(md).split('-')[0].split('_')[0] - if missions and all([m not in mission.lower() for m in missions]): - continue - - metakernel_keys = ['mission', 'year', 'version', 'path'] - - # recursive glob to make metakernel search more robust to subtle directory structure differences - metakernel_paths = sorted(glob(os.path.join(md, '**','*.[Tt][Mm]'), recursive=True)) - - metakernels = [] - for k in metakernel_paths: - components = path.splitext(path.basename(k))[0].split('_') + [k] - if len(components) == 3: - components.insert(1, 'N/A') - - metakernels.append(dict(zip(metakernel_keys, components))) - - # naive filter, do we really need anything else? - if years: - metakernels = list(filter(lambda x:x['year'] in years or x['year'] == 'N/A', metakernels)) - if versions: - if versions == 'latest': - latest = [] - # Panda's groupby is overrated - for k, g in groupby(metakernels, lambda x:x['year']): - items = list(g) - latest.append(max(items, key=lambda x:x['version'])) - metakernels = latest - else: - metakernels = list(filter(lambda x:x['version'] in versions, metakernels)) - - avail['data'].extend(metakernels) - - avail['count'] = len(avail['data']) - - return avail - - def find_latest_metakernel(path, year): metakernel = None mks = sorted(glob(os.path.join(path,'*.[Tt][Mm]'))) @@ -189,115 +94,6 @@ def expandvars(path, env_dict=os.environ, default=None, case_sensitive=True): return re.sub(reVar, replace_var, path) -def generate_kernels_from_cube(cube, expand=False, format_as='list'): - """ - Parses a cube label to obtain the kernels from the Kernels group. - - Parameters - ---------- - cube : cube - Path to the cube to pull the kernels from. - expand : bool, optional - Whether or not to expand variables within kernel paths based on your IsisPreferences file. - See :func:`get_isis_preferences` for how the IsisPreferences file is found. - format_as : str, optional {'list', 'dict'} - How to return the kernels: either as a one-dimensional ordered list, or as a dictionary - of kernel lists. - - Returns - ------- - : list - One-dimensional ordered list of all kernels from the Kernels group in the cube. - : Dictionary - Dictionary of lists of kernels with the keys being the Keywords from the Kernels group of - cube itself, and the values being the values associated with that Keyword in the cube. - """ - # enforce key order - mk_paths = OrderedDict.fromkeys( - ['TargetPosition', 'InstrumentPosition', - 'InstrumentPointing', 'Frame', 'TargetAttitudeShape', - 'Instrument', 'InstrumentAddendum', 'LeapSecond', - 'SpacecraftClock', 'Extra']) - - # just work with full path - cube = os.path.abspath(cube) - cubelabel = pvl.load(cube) - - try: - kernel_group = cubelabel['IsisCube'] - except KeyError: - raise KeyError(f'{cubelabel}, Could not find kernels group, input cube [{cube}] may not be spiceinited') - - return get_kernels_from_isis_pvl(kernel_group, expand, format_as) - -def get_kernels_from_isis_pvl(kernel_group, expand=True, format_as="list"): - # enforce key order - mk_paths = OrderedDict.fromkeys( - ['TargetPosition', 'InstrumentPosition', - 'InstrumentPointing', 'Frame', 'TargetAttitudeShape', - 'Instrument', 'InstrumentAddendum', 'LeapSecond', - 'SpacecraftClock', 'Extra']) - - - if isinstance(kernel_group, str): - kernel_group = pvl.loads(kernel_group) - - kernel_group = kernel_group["Kernels"] - - def load_table_data(key): - mk_paths[key] = kernel_group.get(key, None) - if isinstance(mk_paths[key], str): - mk_paths[key] = [mk_paths[key]] - while 'Table' in mk_paths[key]: mk_paths[key].remove('Table') - while 'Nadir' in mk_paths[key]: mk_paths[key].remove('Nadir') - - load_table_data('TargetPosition') - load_table_data('InstrumentPosition') - load_table_data('InstrumentPointing') - load_table_data('TargetAttitudeShape') - # the rest - mk_paths['Frame'] = [kernel_group.get('Frame', None)] - mk_paths['Instrument'] = [kernel_group.get('Instrument', None)] - mk_paths['InstrumentAddendum'] = [kernel_group.get('InstrumentAddendum', None)] - mk_paths['SpacecraftClock'] = [kernel_group.get('SpacecraftClock', None)] - mk_paths['LeapSecond'] = [kernel_group.get('LeapSecond', None)] - mk_paths['Clock'] = [kernel_group.get('Clock', None)] - mk_paths['Extra'] = [kernel_group.get('Extra', None)] - - # handles issue with OsirisRex instrument kernels being in a 2d list - if isinstance(mk_paths['Instrument'][0], list): - mk_paths['Instrument'] = np.concatenate(mk_paths['Instrument']).flat - - if (format_as == 'list'): - # get kernels as 1-d string list - kernels = [] - for kernel in chain.from_iterable(mk_paths.values()): - if isinstance(kernel, str): - kernels.append(kernel) - elif isinstance(kernel, list): - kernels.extend(kernel) - if expand: - isisprefs = get_isis_preferences() - if not "DataDirectory" in isisprefs: - warnings.warn("No IsisPreferences file found, is your ISISROOT env var set?") - - kernels = [expandvars(expandvars(k, isisprefs['DataDirectory'], case_sensitive=False)) for k in kernels] - # Ensure that the ISIS Addendum kernel is last in case it overrides - # some values from the default Instrument kernel - kernels = sorted(kernels, key=lambda x: "Addendum" in x) - return kernels - elif (format_as == 'dict'): - # return created dict - if expand: - isisprefs = get_isis_preferences() - for kern_list in mk_paths: - for index, kern in enumerate(mk_paths[kern_list]): - if kern is not None: - mk_paths[kern_list][index] = expandvars(expandvars(kern, isisprefs['DataDirectory'], case_sensitive=False)) - return mk_paths - else: - raise Exception(f'{format_as} is not a valid return format') - def write_metakernel_from_cube(cube, mkpath=None): # add ISISPREF paths as path_symbols and path_values to avoid custom expand logic pvlprefs = get_isis_preferences() @@ -333,6 +129,7 @@ def write_metakernel_from_cube(cube, mkpath=None): return body + def get_ck_frames(kernel): """ Get all of the reference frames defined in a kernel. @@ -357,6 +154,7 @@ def get_ck_frames(kernel): # Sort the output list for testability return sorted(list(ids)) + def create_spk_dependency_tree(kernels): """ construct the dependency tree for the body states in a set of kernels. @@ -390,6 +188,7 @@ def create_spk_dependency_tree(kernels): return dep_tree + def spkmerge_config_string(dep_tree, output_spk, bodies, lsk, start, stop): """ Create the contents of an spkmerge config file that will produce a spk that @@ -438,6 +237,7 @@ def spkmerge_config_string(dep_tree, output_spk, bodies, lsk, start, stop): config += f" INCLUDE_COMMENTS = no\n" return config + def write_metakernel_from_kernel_list(kernels): """ Parameters @@ -474,7 +274,6 @@ def write_metakernel_from_kernel_list(kernels): return body - def duckpool(naifvar, start=0, length=10, default=None): """ Duck typing friendly version of spiceypy kernel pool functions. @@ -801,99 +600,3 @@ def search_isis_db(dbobj, labelobj, isis_data): if any(types): kernels["types"] = types return kernels - - -def find_kernels(cube, isis_data, format_as=dict): - """ - Find all kernels for a cube and return a json object with categorized kernels. - - Parameters - ---------- - - cube : str - Path to an ISIS cube - - isis_data : str - path to $ISISDATA - - format_as : obj - What type to return the kernels as, ISIS3-like dict/PVL or flat list - - Returns - ------- - : obj - Container with kernels - """ - def remove_dups(listofElements): - # Create an empty list to store unique elements - uniqueList = [] - - # Iterate over the original list and for each element - # add it to uniqueList, if its not already there. - for elem in listofElements: - if elem not in uniqueList: - uniqueList.append(elem) - - # Return the list of unique elements - return uniqueList - - cube_label = pvl.load(cube) - mission_lookup_table = get_isis_mission_translations(isis_data) - - mission_dir = mission_lookup_table[cube_label["IsisCube"]["Instrument"]["SpacecraftName"]] - mission_dir = path.join(isis_data, mission_dir.lower()) - - kernel_dir = path.join(mission_dir, "kernels") - base_kernel_dir = path.join(isis_data, "base", "kernels") - - kernel_types = [ name for name in os.listdir(kernel_dir) if os.path.isdir(os.path.join(kernel_dir, name)) ] - kernel_types.extend(name for name in os.listdir(base_kernel_dir) if os.path.isdir(os.path.join(base_kernel_dir, name))) - kernel_types = set(kernel_types) - - db_files = [] - for typ in kernel_types: - files = sorted(glob(path.join(kernel_dir, typ, "*.db"))) - base_files = sorted(glob(path.join(base_kernel_dir, typ, "*.db"))) - files = [list(it) for k,it in groupby(files, key=lambda f:os.path.basename(f).split(".")[0])] - base_files = [list(it) for k,it in groupby(base_files, key=lambda f:os.path.basename(f).split(".")[0])] - - for instrument_dbs in files: - db_files.append(read_pvl(sorted(instrument_dbs)[-1], True)) - for base_dbs in base_files: - db_files.append(read_pvl(sorted(base_dbs)[-1], True)) - - - kernels = {} - for f in db_files: - #TODO: Error checking - typ = f[0][0] - kernel_search_results = search_isis_db(f[0][1], cube_label, isis_data) - - if not kernel_search_results: - kernels[typ] = None - else: - try: - kernels[typ]["kernels"].extend(kernel_search_results["kernels"]) - if any(kernel_search_results.get("types", [None])): - kernels[typ]["types"].extend(kernel_search_results["types"]) - except: - kernels[typ] = {} - kernels[typ]["kernels"] = kernel_search_results["kernels"] - if any(kernel_search_results.get("types", [None])): - kernels[typ]["types"] = kernel_search_results["types"] - - for k,v in kernels.items(): - if v: - kernels[k]["kernels"] = remove_dups(v["kernels"]) - - if format_as == dict: - return kernels - elif format_as == list: - kernel_list = [] - for _,kernels in kernels.items(): - if kernels: - kernel_list.extend(kernels["kernels"]) - return kernel_list - else: - warnings.warn(f"{format_as} is not a valid format, returning as dict") - return kernels