#!/usr/bin/env python # -*- coding: utf-8 -*- import json import sys import datetime import numpy as np from astropy.time import Time from util import functions as ufun from util import variables as uvars from util.master_listener_davide import thread from util.mini_os import obsController ### Focus only on the 8x8 pixel matrix centered on the centroid def focus_on_centroid(a, cx, cy, s=4): # a is an array-like matrix , s is an Int cx = int(cx)*32 + 32 # cx and cy are normalized in [-1,1] -> now are renormalized in [0,64] cy = int(cy)*32 + 32 if(s>cx or s>cy): s = min(cx,cy) if(s>len(a)-cx or s>len(a)-cy): s = min(s , len(a)-cx , len(a)-cy) a_focused = a[cx-s : cx+s , cy-s : cy+s] return a_focused class RtcObject(object): def __init__(self): self.counter = 0 self.todos = [] self.rotating_ncpa = False self.ncpa_file = "No file" self.json_state = {} self.thread = thread self.obsController = obsController def reset(self): print("reset") status = self.obsController.RTCTTResetBCU() return status def resurrect(self): print("import") from SHINS_SW_TEST.SHINS_TEST_DEBUG import SHINS_TEST_DEBUG print("init") status = SHINS_TEST_DEBUG(test_s="10", test_s1="true") # init time.sleep(1) print("setup start") status = SHINS_TEST_DEBUG(test_s="11") # setup and start print(status) return status def resurrect2(self): print('resurrect2') rtc_s = uvars.rtc_s_base.copy() time.sleep(10) print("init") status = self.obsController.RTCTTInit(rtc_s) print("Exit status: {}".format(status)) ufun.error_handler(status, "RTCTTIsInit") print("Wait 10 s...") time.sleep(10) # SETUP REAL-TIME RTCTT # print("RTCTT setup... Wait 15 s...") status = self.obsController.RTCTTSetup() print("Exit status: {}".format(status)) ufun.error_handler(status, "RTCTTSetup") # START REAL-TIME RTCTT # print("RTCTT start") time.sleep(10) status = self.obsController.RTCTTStart() print("Exit status: {}".format(status)) ufun.error_handler(status, "RTCTTStart") def uploadRTC(self, modal=True): rtc_s = uvars.rtc_s_base.copy() if modal: print("Enabling Modal Mode") else: print("Enabling Zonal Mode") rtc_s["TTM2CFILE"] = "ZONAL" # INIT REAL-TIME RTCTT # print("Enumerating RTC setup") for element, value in rtc_s.items(): print("Element : % 20s - Value % 20s " % (element, value)) status = self.obsController.RTCTTInit(rtc_s) print("Exit status: {}".format(status)) ufun.error_handler(status, "RTCTTIsInit") print("Wait 5 s...") time.sleep(5) # SETUP REAL-TIME RTCTT # print("RTCTT setup... Wait 15 s...") status = self.obsController.RTCTTSetup() print("Exit status: {}".format(status)) ufun.error_handler(status, "RTCTTSetup") # START REAL-TIME RTCTT # print("RTCTT start") status = self.obsController.RTCTTStart() print("Exit status: {}".format(status)) ufun.error_handler(status, "RTCTTStart") mode = "Modal" if modal else "Zonal" print("Enabling {} Mode... DONE".format(mode)) def zonalCommand(self, data): print("chiamata effettuata") normalize_strain = 1#6.666667 act = int(data[0]) strain = float(data[1]) print(strain) strain = strain/normalize_strain print("Act on actuator number {} with absolute intensity {}".format(act,strain)) reserved = [0,0] actuators = np.zeros(97).tolist() actuators[act-1] = strain print("vector for RTCTTModeUpload: ") # print(actuators) try: status = self.obsController.RTCTTModeUpload(reserved+actuators, absolute=False) except Exception as e: print(e) status = 1 return status def move_soul(self, data, apply_movement=False): self.soul_deltas = data print("Received data: {}".format(data)) PSFcx = float(data[0]) PSFcx = 0 if PSFcx < 0 else PSFcx PSFcx = 2048 if PSFcx > 2048 else PSFcx PSFcy = float(data[1]) PSFcy = 0 if PSFcy < 0 else PSFcy PSFcy = 2048 if PSFcy > 2048 else PSFcy deltaX = PSFcx - uvars.SOUL_TARGET_POS[0] deltaY = PSFcy - uvars.SOUL_TARGET_POS[1] print("Calling Here OffsetXYAO with SCICAM deltas Dx = {} , Dy = {}".format(deltaX , deltaY)) # GET CURRENT DROT # status, drotDict = self.obsController.getPosition("DEGREE", {"DROT":-9999.9}) drotPos = drotDict["DROT"] print("DROT pos is {}".format(drotPos)) # Transform to SOUL delta # step_x, step_y = ufun.transform_offset(disp = [deltaX,deltaY], thetaRot = drotPos , conv = "SOUL") print("Calculatd steps to apply to SOUL : {} , {}".format(step_x , step_y)) if apply_movement: max_step = 1 step_x = step_x if (abs(step_x) <= max_step) else max_step*np.sign(step_x) step_y = step_y if (abs(step_y) <= max_step) else max_step*np.sign(step_y) print("Applying these steps to SOUL : {} , {}".format(step_x , step_y)) try: status = self.obsController.OffsetXYAO(step_x , step_y , "LEFT") print("Function is returning NOW") except Exception as e: print(e) status = 1 else: from util.shinsLogger import logger resume = "Delta X = {} mm , Delta Y = {} mm".format(step_x,step_y) logger.info("--------------------------------------------------------------------------") logger.info("| {:^70s} |".format(resume)) logger.info("--------------------------------------------------------------------------") logger.debug("SCICAM deltas = ({},{}), DROT pos = {}, computed SOUL deltas = ({},{})".format(deltaX,deltaY,drotPos,np.round(step_x,3),np.round(step_y,3))) return status, [np.round(step_x,4), np.round(step_y,4)] def manageDerotation(self, data): action = str(data[0]) try: mode = str(data[1]) pos_ang = float(data[2]) pos_ang = pos_ang-360 if pos_ang>180 else pos_ang except Exception as e: # print(e) mode, pos_ang = None, None if(action == "Start_Tracking"): print("DAO_______________Derotation mode: {} with angle {}".format(mode,pos_ang)) ins_s = {"DROT_Mode":mode} async = self.obsController.begin_setupInstrument(ins_s) status = self.obsController.end_setupInstrument(async) if mode == "FIELD_FREE": print("Launching now a {} derotation".format(mode)) status = self.obsController.trackingStart2(0,0) elif mode == "FIELD_FIXED": print("Launching now a {} derotation".format(mode)) dateMjd= Time.now() status = self.obsController.trackingStart2(dateMjd.mjd, pos_ang) else: print("{} mode: NO derotation".format(mode)) status = self.obsController.trackingStop() elif(action == "Stop_Tracking"): print("DAO__________________________STOP Derotation!") status = self.obsController.trackingStop() else: print("Something went wrong, no action received") print("Reached end of command") return status def read(self, id): for todo in self.todos: if todo["id"] == id: return todo def create(self, data): todo = data self.todos.append(todo) return todo def update(self, id, data): todo = self.read(id) todo.update(data) return todo def delete(self, id): todo = self.read(id) self.todos.remove(todo) @property def centroid(self): status, [cx, cy] = self.obsController.getInternalSeqFloatVariable("RtcCentroidOrigin") return [cx, cy] @centroid.setter def centroid(self, cx_cy): cx, cy = cx_cy status = self.obsController.RTCTTSetCentroidPos(cx, cy) return status def flatten(self): modes = [0] * 96#39 status = self.obsController.RTCTTModeUpload(modes, absolute=True) status = self.load_ncpa(name="zeri.txt") return status def close_loop(self): status = self.obsController.RTCTTCloseLoop() return status def open_loop(self): status = self.obsController.RTCTTOpenLoop() return status def frequency(self, freq): """Set the loop frequency""" rtc_s = {"TTFRAMERATE": freq} status = self.obsController.RTCTTExecuteSetup(rtc_s) return status def tint(self, t): """Set integration time""" rtc_s = { "TTCAMTINT" : t, } status = self.obsController.RTCTTExecuteSetup(rtc_s) return status def patrol(self, b): """Set params to enable or disable patrol camera mode""" if b: print("Switch to patrol mode") rtc_patrol = { "TTM2CFILE": "20220603-Zernike-FullPupil-Arcetri-1u-rms-4rtc.dat", "TTWINCOORDX": "0", "TTWINCOORDY": "0", "TTWINCOLS": "640", "TTWINROWS": "512", "TTFRAMERATE": "200", "TTNSLOPEPIXELS": "16", "TTBIASFILE": "", "TTPIXELGAINFILE": "", "TTCAMTINT": "0.004995574", "TTPIXELGAINMODE": "FLATGAIN", } else: print("Switch to cropped mode") rtc_small = { "TTM2CFILE": "20220603-Zernike-FullPupil-Arcetri-1u-rms-4rtc.dat", "TTWINCOORDX": "224", "TTWINCOORDY": "212", "TTWINCOLS": "64", "TTWINROWS": "64", "TTFRAMERATE": "1000", "TTNSLOPEPIXELS": "4096", "TTBIASFILE": "2022-02-19-23-45-23_PixelBias_19.95°C_1000Hz-064x064.dat", "TTPIXELGAINFILE": "flat_gain_64x64.dat", "TTCAMTINT": "0.000994919", "TTPIXELGAINMODE": "FLATGAIN", } rtc_s = rtc_patrol if b else rtc_small status = self.obsController.RTCTTSwitchCcdMode(rtc_s) return status def temperature(self, temp): """Set the loop temperature""" status = self.obsController.RTCTTSendCameraCommand("set temperatures sensor "+temp) return status def pid(self, p): """Set the RTC PID""" rtc_s = { "TTPIDKP": str(p[0]), "TTPIDKI": str(p[1]), "TTPIDKD": str(p[2]), "TTPIDTF": str(p[3]), "TTPIDTS": str(p[4]), } rtc_s["TTPIDTS"]=str(1e-3) rtc_s["TTPIDTF"]=rtc_s["TTPIDTS"] print(rtc_s) status = self.obsController.RTCTTExecuteSetup(rtc_s) return status def threshold(self, t): """Set the RTC Threshold""" status = self.obsController.RTCTTWriteSingle(4416, t, "dsp") return status def get_threshold(self): """Get the RTC Threshold""" status, threshold = self.obsController.RTCTTReadSingle(4416, "dsp") return threshold ############## # Dark stuff # ############## def todos_dark(self): """List dark files""" return [ str(dark.name).decode('utf-8') for dark in sorted(uvars.dark_path.glob("*.dat")) ] def new_dark(self): """Create a new dark""" rtc_s = { "TTDIAGENABLED" : "true", # or "1" "TTDIAGDECIMATION" : "0", "TTPIXELENABLED" : "1", "TTPIXELDECIMATION" : "0", # if 0 master diagnostic stops "TTLOOPENABLED" : "false", "TTSAVEASBIAS" : "true", "TTUPDATEBIAS" : "true", "TTTESTTIME" : "1000000", } status, filename = self.obsController.RTCTTTeccamExpose(rtc_s) return filename def load_dark(self, name): """Load a dark file""" rtc_s = {"TTBIASFILE": name} status = self.obsController.RTCTTExecuteSetup(rtc_s) return status def get_current_dark(self): pass ############## # Gain stuff # ############## def todos_gain(self): """List gain files""" return [ str(gain.name).decode('utf-8') for gain in sorted(uvars.gain_path.glob("*.dat")) ] def new_gain(self, gain_type, centroid_size): """Create a new gain""" rtc_s = { "TTPIXELGAINMODE": gain_type, "TTPIXELGAINRADIUS": centroid_size, } status = self.obsController.RTCTTExecuteSetup(rtc_s) return status def load_gain(self, name): """Load a gain file""" rtc_s = { "TTPIXELGAINMODE": "FILEGAIN", "TTPIXELGAINFILE": name, } status = self.obsController.RTCTTExecuteSetup(rtc_s) return status ############## # Flat stuff # ############## def todos_flat(self): """List flat files""" return [ str(flat.name).decode('utf-8') for flat in sorted(uvars.flat_path.glob("*.txt")) ] def load_flat(self, name): """Load a flat file""" rtc_s = {"TTDMFLATFILE": name} status = self.obsController.RTCTTExecuteSetup(rtc_s) return status def tiptilt(self, modes=[0,0,0,0], absolute=False): print("Loaded Modes {}".format(modes)) status = self.obsController.RTCTTModeUpload(modes=modes, absolute=absolute) return status def teccam_custom_command(self, command): status, res = self.obsController.RTCTTSendCameraCommand(command) return res def save(self): data = self.data() now = Time.now().isot filename = "/data/rtc/rtc-panel-save-{}.json".format(now) with open(filename, 'w') as f: json.dump(data, f) return filename def save_flat(self): """Save the current flat in a file""" print("hereeee") flat_name = ufun.save_flat(self.obsController, self.thread.shape) print("doneee") print(flat_name) return flat_name def cloud_samples(self, s): print("prima : samples = {}".format(self.thread.cloud_samples)) status = self.thread.cloud_samples = s print("dopo : samples = {}".format(self.thread.cloud_samples)) return status def get_cloud_samples(self): return self.thread.cloud_samples ############## # NCPA stuff # ############## def todos_ncpa(self): """List ncpa files""" return [ str(ncpa.name).decode('utf-8') for ncpa in sorted(uvars.ncpa_path.glob("*.txt")) ] def load_ncpa(self, name): """Load a ncpa file""" # load saved file mode_file = open(str(uvars.ncpa_path)+name, 'r') data = mode_file.read() data_into_list = data.split('\n') mode_vector = [float(element) for element in data_into_list[:-1]] # edit mode vector obtained from loaded file print("v=[{}], len = {}".format(mode_vector, len(mode_vector))) angle_where_ncpa_was_optimized = float(mode_vector.pop()) print("angle read from ncpa file: {}".format(angle_where_ncpa_was_optimized)) print("v=[{}], len = {}".format(mode_vector, len(mode_vector))) mode_vector = mode_vector[2:] print("final mode vector=[{}], len = {}".format(mode_vector, len(mode_vector))) #apply correct rotation (if necessary) actual_drot_pos = float(self.obsController.getPosition("DEGREE", {"DROT":-9999})[1]["DROT"]) drot_diff = actual_drot_pos -angle_where_ncpa_was_optimized if(abs(drot_diff)>0.5): ruotati = ufun.rotate_modes(mode_vector, -1*drot_diff) else: ruotati = mode_vector riservati = [0,0] try: status = self.obsController.RTCTTModeUpload( riservati + list(ruotati), absolute=True) except Exception as e: print(e) status = 1 self.obsController.setContext({"NCPA_FILE":name}); print("Finished") return status def save_ncpa(self): """Save the current ncpa in a file""" ncpa_name = ufun.save_ncpa(os=self.obsController) print(ncpa_name) return ncpa_name def getNCPARotationStatus(self): print("Status ncpa rotation in arrivo... status is {}".format(self.rotating_ncpa)) return self.rotating_ncpa def rotateNCPA(self, action): if action == "start": print("START NCPA Rotation") self.rotating_ncpa = True self.obsController.setContext({"NCPA_ROT_STATUS":"Rotation ON"}); elif action == "stop": print("STOP NCPA Rotation") self.rotating_ncpa = False self.obsController.setContext({"NCPA_ROT_STATUS":"Rotation OFF"}); else: print("Wrong command in rotateNCPA") riservati = [0,0] sign = -1 # rotation direction sleep = 5#10 threshold = 0.5 ins_s = {"DROT":-9999} angle_where_ncpa_was_optimized = float(self.obsController.getPosition("DEGREE", ins_s)[1]["DROT"]) status, total_vector = self.obsController.getInternalSeqFloatVariable("RtcAppliedMode") mode_vector = total_vector[2:] position_of_last_update = angle_where_ncpa_was_optimized while self.rotating_ncpa: bearing = float(self.obsController.getPosition("DEGREE", ins_s)[1]["DROT"]) diff = bearing - position_of_last_update print("Bearing: {}; Diff wrt previous update: {}".format(bearing, diff)) if abs(diff) > threshold: print("Above threshold: rotating modes!") ruotati = ufun.rotate_modes(mode_vector, sign * (bearing - angle_where_ncpa_was_optimized) )[0:36] self.obsController.RTCTTModeUpload( riservati + list(ruotati), absolute=True) position_of_last_update = bearing else: print("Below threshold: no mode rotation") time.sleep(sleep) return 1 ############## # Plot stuff # ############## @property def rtc_image(self): dummy_64 = np.zeros(shape=(64,64)) # Diagostic stuff try: #print("trying to get the image") image = self.thread.img.T # transpose #print("there is an image. probably diagnostic is arriving") #print(image[40][40:60]) except AttributeError as e: print("no image") print("if NO image but diagnostic data from os are coming then try reset and resurrect rtc") print("can also try to request fps to the console on rtc panel, and then again reset+resurrect") image = dummy_64 except IndexError as e: print("too many indices") image = dummy_64 except Exception as e: print("Generic error") print(e) return image def data_cred(self, image, centroid): # # OS status, darkarray = self.obsController.getInternalSeqFloatVariable("RtcPixelBias") try: darkimage = np.array(darkarray).reshape(image.shape).T except Exception as e: darkimage = np.zeros(shape=(image.shape)) try: cx,cy = centroid image = image[2:-2 , 2:-2] darkimage = darkimage[2:-2 , 2:-2] darkedData = image - darkimage cred = { "data": image.tolist(), "dark": darkimage.tolist(), #"darkedData" = darkedData, "min": image.min(), #image[1:-1, 1:-1].min(), "max": image.max(), #image[1:-1, 1:-1].max(), "minDark" : darkedData.min(), #darkedData[3:, 3:].min(), "maxDark" : darkedData.max(), #darkedData[3:, 3:].max(), "cx": cx, "cy": cy, "image_shape": list(image.shape), } except Exception as e: print("error building the cred response") print(e) print("image shape : {} ".format(image.shape)) #print("image shape : {} , darkedData shape : {}".format(image.shape, darkedData.shape)) cred = None return cred def data_gain(self, imshape): # OS status, pixgain = self.obsController.getInternalSeqFloatVariable("RtcPixelGain") try: pixgain = np.array(pixgain).reshape(imshape).T except Exception as e: pixgain = np.zeros(shape=imshape)+1 try: gain = { "data": pixgain.reshape(imshape).tolist(), "min": pixgain[3:, 3:].min(), "max": pixgain[3:, 3:].max(), "image_shape": list(imshape), } except Exception as e: print("error building the gain response") print(e) gain = None return gain def data_dm(self): signs_list = [] with open(str(uvars.sign_map), "r") as f: for line in f: for x in line.split(): signs_list.append(int(x)) signs = np.array(signs_list) # Diagnostics shape = self.thread.shape # OS status, dmflat = self.obsController.getInternalSeqFloatVariable("RtcDmFlat") try: dm_shape_no_flat = (shape*signs) - (np.array(dmflat[0:97])) dm = { "actuators": uvars.actuators.tolist(), "shape": (shape*signs).tolist(), "flat": (np.array(dmflat[0:97])).tolist(), "min": round(shape.min(),2), "max": round(shape.max(),2), "min_no_flat" : round(min(dm_shape_no_flat),2), "max_no_flat" : round(max(dm_shape_no_flat),2), # "image_shape": list(imshape), "skp_cmd" : self.thread.tt_cmd[1] } except Exception as e: print("error building the dm response") print(e) dm = None return dm def data_cloud(self, imshape, centroid): # Diagnostics centroids_x = self.thread.centroid_x centroids_y = self.thread.centroid_y cloud = np.flip(self.thread.centroid_matrix.T, 1) cx,cy = centroid rmsX = np.std(centroids_x)*21 # px 2 mas, plate scale = 21 mas/px rmsY = np.std(centroids_y)*21 # px 2 mas wx = (np.max(centroids_x)-np.min(centroids_x))*21 # px to mas wy = (np.max(centroids_y)-np.min(centroids_y))*21 # px to mas try: cld = { "data": cloud.tolist(), "min": cloud.min(), "max": cloud.max(), "samples": len(self.thread.centroid_x), "cx": cx - self.thread.cx, "cy": cy - self.thread.cy, "wx": round(wx, 5), "wy": round(wy, 5), "image_shape": list(imshape), "RMS_X": round(rmsX, 5), "RMS_Y": round(rmsY, 5), "cloud_samples": self.thread.cloud_samples, } except Exception as e: print("error building the cloud response") print(e) cld = None return cld def data(self): # Diagnostics image = self.rtc_image imshape = image.shape # OS centroid = self.centroid data_cloud = self.data_cloud(imshape, centroid) data_cred = self.data_cred(image, centroid) cropped_cred = focus_on_centroid( np.array(data_cred["data"]) , data_cloud["cx"] ,data_cloud["cy"], s=4) flux_cred = np.sum(cropped_cred) data_cred["tot_flux"] = flux_cred # added data flux to jso cropped_cred_darked = focus_on_centroid( np.array(data_cred["data"])-np.array(data_cred["dark"]) , data_cloud["cx"] ,data_cloud["cy"], s=4) flux_cred_darked = np.sum(cropped_cred_darked) data_cred["tot_flux_darked"] = flux_cred_darked # added darked flux to json response = { "cred": data_cred, "gain": self.data_gain(imshape), "dm": self.data_dm(), # removed imshape as parameter "cloud": data_cloud, "cx": centroid[0], "cy": centroid[1], } #print(response["cred"]["tot_flux"]) self.json_state = response return response