Skip to content
Snippets Groups Projects
Commit 1bf972c6 authored by Kelvin Rodriguez's avatar Kelvin Rodriguez
Browse files

fixed bugs in io_tes.py

parent 41bab951
No related branches found
No related tags found
No related merge requests found
......@@ -29,7 +29,7 @@ class Tes(object):
"""
def __init__(self, input_data, var_file = None):
def __init__(self, input_data, var_file = None, data_set=None):
"""
Read the .spc file, parse the label, and extract the spectra
......@@ -201,9 +201,12 @@ class Tes(object):
if isinstance(input_data, pd.DataFrame):
self.dataset = None
for key in tes_columns.keys():
if len(set(tes_columns[key]).intersection(set(input_data.columns))) > 2 :
self.dataset = key
if not data_set:
for key in tes_columns.keys():
if len(set(tes_columns[key]).intersection(set(input_data.columns))) > 3 :
self.dataset = key
else:
self.dataset=data_set
self.label = None
self.data = input_data
......@@ -259,3 +262,68 @@ class Tes(object):
df = expand_bitstrings(df, dataset.upper())
self.data = df
def join(tes_data):
"""
Given a list of Tes objects, merges them into a single dataframe using
SPACECRAFT_CLOCK_START_COUNT (sclk_time) as the index.
Parameters
----------
tes_data : iterable
A Python iterable of Tes objects
Returns
-------
: dataframe
A pandas dataframe containing the merged data
: outliers
A list of Tes() objects containing the tables containing no matches
"""
if not hasattr(tes_data, '__iter__') and not isinstance(tes_data, Tes):
raise TypeError("Input data must be a Tes datasets or an iterable of Tes datasets, got {}".format(type(tes_data)))
elif not hasattr(tes_data, '__iter__'):
tes_data = [tes_data]
if len(tes_data) == 0:
warn("Input iterable is empty")
if not all([isinstance(obj, Tes) for obj in tes_data]):
# Get the list of types and the indices of elements that caused the error
types = [type(obj) for obj in tes_data]
error_idx = [i for i, x in enumerate([isinstance(obj, Tes) for obj in tes_data]) if x == False]
raise TypeError("Input data must must be a Tes dataset, input array has non Tes objects at indices: {}\
for inputs of type: {}".format(error_idx, types))
single_key_sets = {'ATM', 'POS', 'TLM', 'OBS'}
compound_key_sets = {'BOL', 'CMP', 'GEO', 'IFG', 'PCT', 'RAD'}
dfs = dict.fromkeys(single_key_sets | compound_key_sets, DataFrame())
# Organize the data based on datasets
for ds in tes_data:
# Find a way to do this in place?
dfs[ds.dataset] = dfs[ds.dataset].append(ds.data)
# remove and dataframes that are empty
empty_dfs = [key for key in dfs.keys() if dfs[key].empty]
for key in empty_dfs:
dfs.pop(key, None)
single_key_dfs = [dfs[key] for key in dfs.keys() if key in single_key_sets]
compound_key_dfs = [dfs[key] for key in dfs.keys() if key in compound_key_sets]
all_dfs = single_key_dfs+compound_key_dfs
keyspace = functools.reduce(lambda left,right: left|right, [set(df['sclk_time']) for df in all_dfs])
single_key_merged = functools.reduce(lambda left,right: pd.merge(left, right, on=["sclk_time"]), single_key_dfs)
compound_key_merged = functools.reduce(lambda left,right: pd.merge(left, right, on=["sclk_time", "detector"]), compound_key_dfs)
merged = single_key_merged.merge(compound_key_merged, on="sclk_time")
outlier_idx = keyspace-set(merged["sclk_time"])
outliers = [Tes(tds.data[tds.data['sclk_time'].isin(outlier_idx)], data_set=tds.dataset) for tds in tes_data]
return merged, [tds for tds in outliers if not tds.data.empty]`
import sys
import os
import argparse
import plio
from plio.io.io_tes import Tes
from glob import glob
from os import path
import functools
import pandas as pd
from pymongo import MongoClient
from warnings import warn
from collections import Iterable
import pandas as pd
from pandas import DataFrame
import functools
import json
def join_tes(tes_data, init_dfs=None):
"""
"""
if not hasattr(tes_data, '__iter__') and not isinstance(tes_data, Tes):
raise TypeError("Input data must be a Tes datasets or an iterable of Tes datasets, got {}".format(type(tes_data)))
elif not hasattr(tes_data, '__iter__'):
tes_data = [tes_data]
if len(tes_data) == 0:
warn("Input iterable is empty")
if not all([isinstance(obj, Tes) for obj in tes_data]):
# Get the list of types and the indices of elements that caused the error
types = [type(obj) for obj in tes_data]
error_idx = [i for i, x in enumerate([isinstance(obj, Tes) for obj in tes_data]) if x == False]
raise TypeError("Input data must must be a Tes dataset, input array has non Tes objects at indices: {}\
for inputs of type: {}".format(error_idx, types))
single_key_sets = {'ATM', 'POS', 'TLM', 'OBS'}
compound_key_sets = {'BOL', 'CMP', 'GEO', 'IFG', 'PCT', 'RAD'}
dfs = dict.fromkeys(single_key_sets | compound_key_sets, DataFrame())
for ds in tes_data:
# Find a way to do this in place?
dfs[ds.dataset] = dfs[ds.dataset].append(ds.data)
# remove and dataframes that are empty
empty_dfs = [key for key in dfs.keys() if dfs[key].empty]
for key in empty_dfs:
dfs.pop(key, None)
single_key_dfs = [dfs[key] for key in dfs.keys() if key in single_key_sets]
compound_key_dfs = [dfs[key] for key in dfs.keys() if key in compound_key_sets]
all_dfs = single_key_dfs+compound_key_dfs
keyspace = functools.reduce(lambda left,right: left|right, [set(df['sclk_time']) for df in all_dfs])
single_key_merged = functools.reduce(lambda left,right: pd.merge(left, right, on=["sclk_time"]), single_key_dfs)
compound_key_merged = functools.reduce(lambda left,right: pd.merge(left, right, on=["sclk_time", "detector"]), compound_key_dfs)
merged = single_key_merged.merge(compound_key_merged, on="sclk_time")
outlier_idx = keyspace-set(merged["sclk_time"])
outliers = [Tes(tds.data[tds.data['sclk_time'].isin(outlier_idx)]) for tds in tes_data]
return merged, [tds for tds in outliers if not tds.data.empty]
def clamp_longitude(angle):
"""
Returns the angle limited to the range [-180, 180], the original
data is in the range [0,360] but mongo uses [-180,180].
Parameters
----------
angle : float
The angle to clamp
Returns
-------
: float
The clamped angle
"""
return ((angle + 180) % 360) - 180
def to_mongodb(data_dir, out_dir,sl):
folders = [folder for folder in os.listdir(data_dir) if folder[:4] == "mgst"]
search_len = len(data_dir) + 9
print("search len: {}".format(search_len))
folders = sorted(folders, key=lambda x:int(x[5:]))[sl]
print("first 20 Folders:")
print("\n".join(folders[:20]))
num_files = len(glob(data_dir+'mgst_*/*.tab'))
print("Number of files: {}".format(num_files))
outliers = []
client = MongoClient('localhost', 27017)
print(client.server_info())
db = client.tes
processed = 0
json_objs = []
for folder in folders:
files = glob(data_dir+folder+'/*.tab')
length = len(files)
print("On folder {} with {} files.".format(folder, len(files)))
print("COMPLETE: {}/{} {}".format(processed, num_files, processed/num_files))
tes_datasets = [Tes(file) for file in files] + outliers
dfs, outliers = join_tes(tes_datasets)
print("Num records: {}".format(dfs.shape[0]))
print("Num outliers: {}".format(len(outliers)))
try:
json_objs = json.loads(dfs.to_json(orient='records'))
del dfs
print("Num json objs: {}".format(len(json_objs)))
for dictionary in json_objs:
dictionary["loc"] = {
"type" : "Point",
"coordinates" : [clamp_longitude(dictionary["longitude"]), dictionary["latitude"]]
}
db.point_data.insert_many(json_objs, bypass_document_validation=True)
except Exception as e:
print("Had exception during processing: {}".format(e))
json_objs = None
processed = processed + length
print()
single_key_sets = {'ATM', 'POS', 'TLM', 'OBS'}
compound_key_sets = {'BOL', 'CMP', 'GEO', 'IFG', 'PCT', 'RAD'}
dfs = dict.fromkeys(single_key_sets | compound_key_sets,0)
for tes in outliers:
dfs[tes.dataset] = dfs[tes.dataset] + 1
tes.data.to_hdf5(out_dir+"/"+tes.dataset+str(dfs[tes.dataset]))
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument('data_dir', action='store', help='The location of the MGST folders for TES',
default='/scratch/jlaura/tes/tes_data/')
parser.add_argument('to', action='store', help='Python style slice of the folders to process. \
Folders are ordered (e.g. [mgst1100, mgst1101 ...])', default=None)
parser.add_argument('from', action='store', help='Python style slice of the folders to process. \
Folders are ordered (e.g. [mgst1100, mgst1101 ...])', default=None)
parser.add_argument('out_dir', action='store', help='The location of where to place outliers.')
args = parser.parse_args()
args = args.__dict__()
to_mongodb(args["data_dir"], args["out_dir"], slice(args["from"], args["to"]))
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment