Newer
Older
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Third-party modules
from astropy.time import Time
Davide Ricci
committed
class Status:
def __init__(self):
self.last = {}
def initial(self, socketio, names):
for name in names:
socketio.emit(name, status.last[name])
status = Status()
def send_timestamp(socketio):
while True:
now = Time.now()
socketio.emit('timestamp', now.unix)
Davide Ricci
committed
def send_status(app, socketio, url, sleep=4, once=False):
name = url # maybe different?
# First time
with app.test_client() as client:
old = client.get(url).get_json()
socketio.emit(name, old)
Davide Ricci
committed
status.last[name] = old
while not once:
now = Time.now()
with app.test_client() as client:
res = client.get(url).get_json()
Davide Ricci
committed
status.last[name] = res
if res != old:
socketio.emit(name, res)
old = res
socketio.sleep(sleep)
Davide Ricci
committed
def send_webcam(app, socketio, url, sleep=1):
with app.test_client() as client:
res = client.get(url).get_json()
img = res["response"].encode("ISO-8859-1")
status.last[name] = bytes(img)
socketio.emit(url, bytes(img))
Davide Ricci
committed
def tail_f(socketio, num_lines=30, sleep=0.25, once=False):
filename = f"./data/log/OARPAF.{today}.log"
file_size = os.path.getsize(filename)
# First time
with open(filename, 'r') as f:
lines = f.readlines()[-num_lines:]
socketio.emit('new_lines', lines) # Initial lines on first page load
Davide Ricci
committed
status.last['new_lines'] = lines
current_size = os.path.getsize(filename)
if current_size < file_size:
file_size = 0
elif current_size > file_size:
# File has been appended
with open(filename, 'r') as f:
f.seek(file_size)
new_lines = f.readlines()
file_size = current_size
if new_lines:
socketio.emit('new_lines', new_lines)
Davide Ricci
committed
status.last['new_lines'] = new_lines
socketio.sleep(sleep) # Sleep before checking again
Davide Ricci
committed
def send_binary(socketio):
import numpy as np
from astropy.time import Time
# while True:
for i in range(1):
dtype = "uint16"
matrix = np.random.uniform(0, 65535, shape).astype(dtype)
binary_data = matrix.tobytes()
data_bundle = {
"shape": matrix.shape,
"dtype": dtype,
"data": binary_data,
"timestamp": now.unix,
}
socketio.emit('data_bundle', data_bundle)
print((Time.now()-now).sec, "emitted")
print(f"bin {len(binary_data)}")
Davide Ricci
committed
socketio.sleep(1)
Davide Ricci
committed
def send_fits(socketio):
"""download a fits"""
with fits.open("./temp.fits") as hdul:
matrix = hdul[0].data
header = hdul[0].header
now = Time.now()
data_bundle = {
"shape": matrix.shape,
"dtype": matrix.dtype.name,
"data": matrix.tobytes(),
"header": [list(c) for c in header.cards],
"timestamp": now.unix,
}
socketio.emit('data_fits', data_bundle)
print((Time.now()-now).sec, "emitted")
print(f"bin {len(matrix)}")