diff --git a/plio/io/io_tes.py b/plio/io/io_tes.py index 66a6cd141d77ca7c281168e0fab7dd6b09d97c83..18d2cc55cd7eb4e31d90685f2a8c83e93e5f8e60 100644 --- a/plio/io/io_tes.py +++ b/plio/io/io_tes.py @@ -6,8 +6,8 @@ import sys import functools import json +from os import path from plio.io.io_json import read_json - from plio.utils._tes2numpy import tes_dtype_map from plio.utils._tes2numpy import tes_columns from plio.utils._tes2numpy import tes_scaling_factors @@ -199,11 +199,22 @@ class Tes(object): else: return df + if isinstance(input_data, pd.DataFrame): + self.dataset = None + for key in tes_columns.keys(): + if len(set(tes_columns[key]).intersection(set(input_data.columns))) > 2 : + self.dataset = key + + self.label = None + self.data = input_data + return + self.label = pvl.load(input_data) nrecords = self.label['TABLE']['ROWS'] nbytes_per_rec = self.label['RECORD_BYTES'] data_start = self.label['LABEL_RECORDS'] * self.label['RECORD_BYTES'] dataset = self.label['TABLE']['^STRUCTURE'].split('.')[0] + self.dataset = dataset numpy_dtypes = tes_dtype_map columns = tes_columns @@ -218,16 +229,20 @@ class Tes(object): # Read Radiance array if applicable if dataset.upper() == 'RAD': # pragma: no cover - with open('{}.var'.format(path.splitext(f)[0]) , 'rb') as file: - buffer = file.read() + if not var_file: + filename, file_extension = path.splitext(input_data) + var_file = filename + ".var" + + with open(var_file, "rb") as var: + buffer = var.read() def process_rad(index): if index is -1: return None length = np.frombuffer(buffer[index:index+2], dtype='>u2')[0] exp = np.frombuffer(buffer[index+2:index+4], dtype='>i2')[0] - - radarr = np.frombuffer(buffer[index+4:index+4+length-2], dtype='>i2') * (2**(exp-15)) + scale = 2**(int(exp)-15) + radarr = np.frombuffer(buffer[index+4:index+4+length-2], dtype='>i2') * scale if np.frombuffer(buffer[index+4+length-2:index+4+length], dtype='>u2')[0] != length: warnings.warn("Last element did not match the length for file index {} in file {}".format(index, f)) return radarr diff --git a/test.py b/test.py new file mode 100644 index 0000000000000000000000000000000000000000..11ae228614151d79a6418d6594c945c4e61f8d4e --- /dev/null +++ b/test.py @@ -0,0 +1,143 @@ +import sys +import os + +import plio +from plio.io.io_tes import Tes + +from glob import glob +from os import path + +import functools +import pandas as pd +from pymongo import MongoClient +from warnings import warn +from collections import Iterable +import pandas as pd +from pandas import DataFrame + +import functools +import json + +def join_tes(tes_data, init_dfs=None): + """ + + + """ + if not hasattr(tes_data, '__iter__') and not isinstance(tes_data, Tes): + raise TypeError("Input data must be a Tes datasets or an iterable of Tes datasets, got {}".format(type(tes_data))) + elif not hasattr(tes_data, '__iter__'): + tes_data = [tes_data] + + if len(tes_data) == 0: + warn("Input iterable is empty") + + if not all([isinstance(obj, Tes) for obj in tes_data]): + # Get the list of types and the indices of elements that caused the error + types = [type(obj) for obj in tes_data] + error_idx = [i for i, x in enumerate([isinstance(obj, Tes) for obj in tes_data]) if x == False] + + raise TypeError("Input data must must be a Tes dataset, input array has non Tes objects at indices: {}\ + for inputs of type: {}".format(error_idx, types)) + + single_key_sets = {'ATM', 'POS', 'TLM', 'OBS'} + compound_key_sets = {'BOL', 'CMP', 'GEO', 'IFG', 'PCT', 'RAD'} + dfs = dict.fromkeys(single_key_sets | compound_key_sets, DataFrame()) + + for ds in tes_data: + # Find a way to do this in place? + dfs[ds.dataset] = dfs[ds.dataset].append(ds.data) + + # remove and dataframes that are empty + empty_dfs = [key for key in dfs.keys() if dfs[key].empty] + for key in empty_dfs: + dfs.pop(key, None) + + + single_key_dfs = [dfs[key] for key in dfs.keys() if key in single_key_sets] + compound_key_dfs = [dfs[key] for key in dfs.keys() if key in compound_key_sets] + all_dfs = single_key_dfs+compound_key_dfs + + keyspace = functools.reduce(lambda left,right: left|right, [set(df['sclk_time']) for df in all_dfs]) + + single_key_merged = functools.reduce(lambda left,right: pd.merge(left, right, on=["sclk_time"]), single_key_dfs) + compound_key_merged = functools.reduce(lambda left,right: pd.merge(left, right, on=["sclk_time", "detector"]), compound_key_dfs) + merged = single_key_merged.merge(compound_key_merged, on="sclk_time") + + outlier_idx = keyspace-set(merged["sclk_time"]) + outliers = [Tes(tds.data[tds.data['sclk_time'].isin(outlier_idx)]) for tds in tes_data] + return merged, [tds for tds in outliers if not tds.data.empty] + + +def clamp_longitude(angle): + """ + Returns the angle limited to the range [-180, 180], the original + data is in the range [0,360] but mongo uses [-180,180]. + + Parameters + ---------- + angle : float + The angle to clamp + + Returns + ------- + : float + The clamped angle + """ + return ((angle + 180) % 360) - 180 + +def to_mongodb(chunk_size=60): + data_dir = '/scratch/jlaura/tes/tes_data/' + folders = [folder for folder in os.listdir(data_dir) if folder[:4] == "mgst"] + + search_len = len(data_dir) + 9 + print("search len: {}".format(search_len)) + + folders = sorted(folders, key=lambda x:int(x[5:]))[4:] + print("first 20 Folders:") + print("\n".join(folders[:20])) + + num_files = len(glob(data_dir+'mgst_*/*.tab')) + print("Number of files: {}".format(num_files)) + + outliers = [] + client = MongoClient('localhost', 27017) + print(client.server_info()) + + db = client.tes + processed = 0 + json_objs = [] + for folder in folders: + files = glob(data_dir+folder+'/*.tab') + length = len(files) + print("On folder {} with {} files.".format(folder, len(files))) + print("COMPLETE: {}/{} {}".format(processed, num_files, processed/num_files)) + tes_datasets = [Tes(file) for file in files] + outliers + dfs, outliers = join_tes(tes_datasets) + print("Num records: {}".format(dfs.shape[0])) + print("Num outliers: {}".format(len(outliers))) + try: + json_objs = json.loads(dfs.to_json(orient='records')) + + del dfs + print("Num json objs: {}".format(len(json_objs))) + for dictionary in json_objs: + dictionary["loc"] = { + "type" : "Point", + "coordinates" : [clamp_longitude(dictionary["longitude"]), dictionary["latitude"]] + } + + db.point_data.insert_many(json_objs, bypass_document_validation=True) + except Exception as e: + print("Had exception during processing: {}".format(e)) + + + json_objs = None + processed = processed + length + print() + + + + + +if __name__ == "__main__": + to_mongodb()