Newer
Older
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Third-party modules
from astropy.time import Time
import time
import os
def send_timestamp(socketio):
while True:
now = Time.now()
socketio.emit('timestamp', now.unix)
time.sleep(1)
name = url # maybe different?
while True:
with app.test_client() as client:
res = client.get(url).get_json()
Davide Ricci
committed
time.sleep(1)
def send_webcam(app, socketio, url):
name = url # maybe different?
with app.test_client() as client:
res = client.get(url).get_json()
img = res["response"].encode("ISO-8859-1")
socketio.emit(url, bytes(img))
time.sleep(1)
def tail_f(socketio, num_lines=30):
filename = f"./data/log/OARPAF.{today}.log"
file_size = os.path.getsize(filename)
while True:
current_size = os.path.getsize(filename)
if current_size < file_size:
# File has been truncated or deleted, reset file pointer
file_size = 0
with open(filename, 'r') as f:
lines = f.readlines()[-num_lines:]
elif current_size > file_size:
# File has been appended
with open(filename, 'r') as f:
f.seek(file_size)
new_lines = f.readlines()
file_size = current_size
if new_lines:
socketio.emit('new_lines', new_lines)
else:
# File remains unchanged
pass
socketio.sleep(0.2) # Sleep for 2 seconds before checking again
socketio.start_background_task(tail_f, socketio=socketio)
socketio.start_background_task(send_timestamp, socketio=socketio)
socketio.start_background_task(send_status, app=app, socketio=socketio,
socketio.start_background_task(send_status, app=app, socketio=socketio,
socketio.start_background_task(send_status, app=app, socketio=socketio,
socketio.start_background_task(send_status, app=app, socketio=socketio,
url="/api/environment/status", timeout=40)
socketio.start_background_task(send_status, app=app, socketio=socketio,
socketio.start_background_task(send_webcam, app=app, socketio=socketio,
url="/api/webcam/snapshot")