Skip to content
Snippets Groups Projects
Commit a9a55c01 authored by Jesse Mapel's avatar Jesse Mapel Committed by GitHub
Browse files

Moved kernel parsing logic from slicing notebook into ale utils. (#259)

* Moved SPK dependency tree logic to utils

* More tests

* Added ck frame function

* updated notebook

* Updated notebook
parent 9ea482aa
No related branches found
No related tags found
No related merge requests found
%% Cell type:code id: tags:
``` python
import spiceypy as spice
import pvl
import os
import re
import subprocess
from ale import util
from itertools import chain
import io
import networkx as nx
import tempfile
# These should be provided when running this script.
cube = "/work/users/sgstapleton/kernel_split/EN1072174528M.cub"
output_dir = "/work/users/sgstapleton/kernel_split/" # Output dir for created kernel files
output_dir = "/Users/jmapel/ale/nb_test" # Output dir for created kernel files
data_dir = "/usgs/cpkgs/isis3/data/" # Dir of where to pull original kernels from
ckslicer_loc = "/work/users/sgstapleton/kernel_split/ckslicer"
def get_reference_id(bc_filename):
brief = subprocess.check_output(["ckbrief", bc_filename])
reference = None
for line in brief.splitlines():
line = str(line)
if('Object:' in line):
reference = int(line.strip("''").split()[1])
return reference
ckslicer_loc = "/Users/jmapel/ale/ckslicer"
def add_light_time_correction(cube_info):
image_start_et = spice.scs2e(cube_info['SpacecraftID'], cube_info['SpacecraftClockCount'])
image_end_et = image_start_et + cube_info['ExposureDuration']
inst_state, inst_lt = spice.spkez(cube_info['SpacecraftID'], (image_start_et + image_end_et)/2, 'J2000', 'LT+S', 0)
target_state, target_lt = spice.spkez(cube_info['TargetID'], (image_start_et + image_end_et)/2, 'J2000', 'LT+S', 0)
lt_pad = max(abs(inst_lt), abs(target_lt)) + 15
# Eph time
padded_start_et = image_start_et - lt_pad
padded_end_et = image_end_et + lt_pad
# Padded times
padded_start_sclk = spice.sce2s(cube_info['SpacecraftID'], padded_start_et)
padded_end_sclk = spice.sce2s(cube_info['SpacecraftID'], padded_end_et)
padded_start_utc = spice.et2utc(padded_start_et, 'c', 3)
padded_end_utc = spice.et2utc(padded_end_et, 'c', 3)
cube_info.update(PaddedStartTimeSCLK = padded_start_sclk)
cube_info.update(PaddedEndTimeSCLK = padded_end_sclk)
cube_info.update(PaddedStartTimeUTC = padded_start_utc)
cube_info.update(PaddedEndTimeUTC = padded_end_utc)
def get_body_ids(spk_file, body_id):
brief = subprocess.check_output(["brief", "-c {}".format(spk_file)])
# Convert from bytes
brief = brief.decode("utf-8")
brief_io = io.StringIO(brief)
line = brief_io.readline()
edge_list = []
while(line):
if("w.r.t" in line):
bodies_results = re.findall('\((.*?)\)', line)
edge_list.append(bodies_results)
line = brief_io.readline()
G = nx.DiGraph()
G.add_edges_from(edge_list)
body_ids = []
for path in nx.all_simple_paths(G, body_id, '0'):
body_ids = body_ids + path
body_ids = set(body_ids)
return body_ids
def output_spkmerge_config(config_file, cube_info, output_dir):
with open(config_file, "w+") as config:
config.write('''LEAPSECONDS_KERNEL = {}\n'''.format(cube_info['LeapSecond'][0]))
for kern in cube_info['InstrumentPosition']:
basename = os.path.basename(kern).split('.')[0]
filename, file_extension = os.path.splitext(kern)
new_kern = output_dir + basename + "_merged" + file_extension
config.write(''' SPK_KERNEL = {}\n'''.format(new_kern))
body_ids = get_body_ids(kern, str(cube_info['SpacecraftID']))
for body in body_ids:
if body != '0':
config.write(''' SOURCE_SPK_KERNEL = {}\n'''.format(kern))
config.write(''' INCLUDE_COMMENTS = no\n''')
config.write(''' BODIES = {}\n'''.format(body))
config.write(''' BEGIN_TIME = {}\n'''.format(cube_info['PaddedStartTimeUTC']))
config.write(''' END_TIME = {}\n'''.format(cube_info['PaddedEndTimeUTC']))
config.close()
```
%% Cell type:code id: tags:
``` python
# These are the processing steps. This will make use of the cube provided further up to create smaller,
# more manageable kernel files for ale testing purposes. This currently only handles ck and spk files.
# Get dictionary of kernel lists from cube
cube_info = util.generate_kernels_from_cube(cube, format_as = 'dict')
# Replace path variables with absolute paths for kernels
for kernel_list in cube_info:
for index, kern in enumerate(cube_info[kernel_list]):
if kern is not None:
cube_info[kernel_list][index] = data_dir + kern.strip('$')
# Create ordered list of kernels for furnishing
kernels = [kernel for kernel in chain.from_iterable(cube_info.values()) if isinstance(kernel, str)]
spice.furnsh(kernels)
# Loads cube as pvl to extract rest of data
cube_pvl = pvl.load(cube)
# Save other necesary info in cube_info dict
cube_info.update(SpacecraftClockCount = cube_pvl['IsisCube']['Instrument']['SpacecraftClockCount'])
cube_info.update(ExposureDuration = cube_pvl['IsisCube']['Instrument']['ExposureDuration'].value * 0.001)
cube_info.update(TargetID = spice.bods2c(cube_pvl['IsisCube']['Instrument']['TargetName']))
cube_info.update(SpacecraftID = spice.bods2c(cube_pvl['IsisCube']['Instrument']['SpacecraftName']))
# Add lighttime-corrected SCLK values to cube_info
add_light_time_correction(cube_info)
# For each binary ck kernel specified in cube, run the ckslicer, comment and to-transfer commands
for ck in cube_info['InstrumentPointing']:
if ck.endswith('.bc'):
# Get reference id associated with ck
reference_id = get_reference_id(ck)
ck_filename = os.path.basename(ck).split('.')[0]
ck_path, ck_file_extension = os.path.splitext(ck)
output_kern = output_dir + ck_filename + '_sliced' + ck_file_extension
for ck in [k for k in kernels if k.lower().endswith('.bc')]:
ck_path, ck_file_extension = os.path.splitext(ck)
ck_basename = os.path.basename(ck_path)
for frame in util.get_ck_frames(ck):
output_basename = os.path.join(output_dir, ck_basename + '_sliced' + str(frame))
output_kern = output_basename + ck_file_extension
output_comments = output_basename + '.cmt'
# Create new sliced ck kernel
ckslicer_command = [ckslicer_loc,
'-LSK {}'.format(cube_info['LeapSecond'][0]),
'-SCLK {}'.format(cube_info['SpacecraftClock'][0]),
'-INPUTCK {}'.format(ck),
'-OUTPUTCK {}'.format(output_kern),
'-ID {}'.format(reference_id),
'-ID {}'.format(frame),
'-TIMETYPE {}'.format('SCLK'),
'-START {}'.format(cube_info['PaddedStartTimeSCLK']),
'-STOP {}'.format(cube_info['PaddedEndTimeSCLK'])]
subprocess.call(ckslicer_command)
subprocess.run(ckslicer_command, check=True)
# Remove old comments from new ck kernel
commnt_command = ['commnt', '-d {}'.format(output_kern)]
subprocess.call(commnt_command)
subprocess.run(commnt_command, check=True)
# Makes a new txt file for the only comments that should be stored in the new ck file
with open(output_dir + "temp_commnts.txt","w+") as f:
f.write("This CK is for testing with the image: {}\n".format(cube))
f.write("\nThis CK was generated using the following command: {}\n".format(" ".join(ckslicer_command)))
with open(output_comments, 'w+') as comment_file:
comment_file.write("This CK is for testing with the image: {}\n".format(cube))
comment_file.write("\nThis CK was generated using the following command: {}\n")
comment_file.write(" ".join(ckslicer_command))
# Add new comments to new ck kernel
new_commnts_command = ["commnt", "-a {}".format(output_kern), "temp_commnts.txt"]
subprocess.call(new_commnts_command)
# Clean up temp comment file
os.remove(output_dir + "temp_commnts.txt")
new_commnts_command = ["commnt", "-a {}".format(output_kern), output_comments]
subprocess.run(new_commnts_command, check=True)
# Create the transfer file of the new ck kernel
subprocess.call(["toxfr", output_kern])
subprocess.run(["toxfr", output_kern], check=True)
# Create the config file for the spkmerge command
output_spkmerge_config(output_dir + "spk.config", cube_info, output_dir)
output_spk_basename = os.path.join(output_dir, os.path.basename(os.path.splitext(cube)[0]))
output_spk = output_spk_basename + '.bsp'
spk_dep_tree = util.create_spk_dependency_tree([k for k in kernels if k.lower().endswith('.bsp')])
config_string = util.spkmerge_config_string(spk_dep_tree,
output_spk,
[cube_info['TargetID'], cube_info['SpacecraftID'], 10],
cube_info['LeapSecond'][0],
cube_info['PaddedStartTimeUTC'],
cube_info['PaddedEndTimeUTC'])
with open(output_spk_basename + '.conf', 'w+') as spk_config:
spk_config.write(config_string)
# Create the new SPK
spkmerge_command = ["spkmerge", spk_config.name]
subprocess.run(spkmerge_command, check=True)
# Run the spkmerge command
spkmerge_command = ["spkmerge", output_dir + "spk.config"]
subprocess.call(spkmerge_command)
# Create the transfer file of the new SPK kernel
subprocess.run(["toxfr", output_spk], check=True)
```
%% Cell type:code id: tags:
``` python
```
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment