Skip to content
Snippets Groups Projects
Commit fe5f2405 authored by Romolo Politi's avatar Romolo Politi
Browse files

add the show method, close #1

parent a5e66e13
No related branches found
No related tags found
No related merge requests found
Version 0.1.0
First delivery
# Python-CCSDS Changelog
## Version 0.2.0
- Migration to Poetry
- Added class PacketType
- Add method \_\_str__ and \_\_repr__
- add metaclass Seriazable to obtain a dict from a class
- add a method show to the class ccsds to obtain a Panel object with a summary of the packet.
## Version 0.1.0
- First delivery
This diff is collapsed.
......@@ -2,10 +2,19 @@
name = "Python-CCSDS"
version = "0.1.0"
keywords =['python','ccsds','nasa','esa','packets']
descriprion = 'Python library to read the CCSDS packets'
description = 'Python library to read the CCSDS packets'
authors = ["Romolo Politi <romolo.politi@inaf.it>"]
license = "GNU GPL ver3"
license = "GPL-3.0-or-later"
readme = "README.md"
packages=[{include = "PyCCSDS", from = "src"}]
classifiers = [
"Programming Language :: Python :: 3",
"License :: OSI Approved :: GNU General Public License v3 (GPLv3)",
"Operating System :: OS Independent",
"Development Status :: 3 - Alpha",
"Topic :: Scientific/Engineering :: Astronomy",
"Topic :: Software Development :: Libraries :: Python Modules",
]
[tool.poetry.dependencies]
python = "^3.12"
......
from PyCCSDS.__main__ import CCSDS
......@@ -3,32 +3,71 @@ from bitstring import BitStream
import spiceypy as spice
from datetime import datetime, timedelta
from rich.console import Console
from PyCCSDS.dic2table import dict2table
import types
class PacketId:
__version__ = "0.2.0"
dateFormat = "%Y-%m-%dT%H:%M:%S.%fZ"
missions = {"bepicolombo": -121, "juice": -29}
def isclass(obj):
"""Return true if the obj is a class.
Class objects provide these attributes:
__doc__ documentation string
__module__ name of module in which this class was defined"""
return hasattr(obj,'serialize') and callable(obj.serialize)
class Seriazable:
def serialize(self)->dict:
ret={}
for item in self.__dict__.items():
if isclass(item[1]):
ret[item[0]]=item[1].serialize()
else:
ret[item[0]]=item[1]
return ret
class PacketType(Seriazable):
def __init__(self,tp):
self.type = tp
if tp == 0:
self.typeName = "Telemetry"
else:
self.typeName = "Telecommand"
pass
def __str__(self):
return self.typeName
def __repr__(self) -> str:
return self.__str__()
class PacketId(Seriazable):
def __init__(self,data):
pID = BitStream(hex=data).unpack('uint: 3, 2*bin: 1, bits: 11')
self.VersionNum = pID[0]
self.packetType = pID[1]
self.packetType = PacketType(int(pID[1]))
self.dataFieldHeaderFlag = pID[2]
self.Apid = pID[3].uint
self.Pid = pID[3][0:7].uint
self.Pcat = pID[3][7:].uint
pass
def serialize(self):
return [self.VersionNum, self.packetType, self.dataFieldHeaderFlag,
self.Apid, self.Pid, self.Pcat]
class SeqControl:
class SeqControl(Seriazable):
def __init__(self,data):
sq = BitStream(hex=data).unpack('bin:2,uint:14')
self.SegmentationFlag = sq[0]
self.SSC = sq[1]
def serialize(self):
return [self.SegmentationFlag, self.SSC]
class SourcePacketHeader:
class SourcePacketHeader(Seriazable):
def __init__(self,data):
# Read the Source Packet Header(48 bits)
# - Packet ID (16 bits)
......@@ -43,14 +82,13 @@ class SourcePacketHeader:
self.packetLength = BitStream(hex=data[8:12]).unpack('uint:16')[0]+1
# Based on BepiColombo SIMBIO-SYS
# ref: BC-SIM-GAF-IC-002 pag. 48
def serialize(self):
return [*self.packetId.serialize(), *self.sequeceControl.serialize(), self.packetLength]
class DataFieldHeader:
class DataFieldHeader(Seriazable):
def __init__(self,data,missionID,t0):
# Read The Data Field Header (80 bit)
dfhData = BitStream(hex=data).unpack('bin:1,uint:3,bin:4,3*uint:8,uint:1,uint:31,uint:16')
self.pusVersion = dfhData[1]
self.pusVersion = int(dfhData[1])
self.ServiceType = dfhData[3]
self.ServiceSubType = dfhData[4]
self.DestinationId = dfhData[5]
......@@ -61,19 +99,15 @@ class DataFieldHeader:
if self.Synchronization == 0:
self.UTCTime = self.scet2UTC(missionID,t0)
else:
self.UTCTime = '1970-01-01T00:00:00.00000Z'
self.UTCTime = datetime.strptime('1970-01-01T00:00:00.00000Z', dateFormat)
pass
def serialize(self):
return [self.pusVersion, self.ServiceType, self.ServiceSubType,
self.DestinationId, self.SCET, self.UTCTime]
def scet2UTC(self,missionID,t0):
if t0 == None:
et = spice.scs2e(missionID, "{}.{}".format(self.CorseTime, self.FineTime))
ScTime = spice.et2utc(et, 'ISOC', 5)
else:
dateFormat = "%Y-%m-%dT%H:%M:%S.%f"
dt=datetime.strptime(t0,dateFormat)
sc = self.CorseTime + self.FineTime*(2**(-16))
f=dt+timedelta(seconds=sc)
......@@ -88,18 +122,20 @@ class PackeDataField:
self.Data = data[20:]
pass
class CCSDS:
""" Reader for the CCSDS header """
def __init__(self, missionID, data,console:Console=None,t0= None):
if type(missionID) is str:
if missionID.lower() == 'bepicolombo':
missionID=-121
elif missionID.lower() == 'juice':
missionID=-29
def __init__(self, missionName, data,console:Console=Console(),t0= None):
# Check Mission id and Name
if isinstance(missionName, str):
if missionName.lower() not in missions:
console.print(f"WARNING: the Mission name is not valid. time converte setted to 1970-01-01 00:00:00",style="bold red")
t0 = datetime.strptime("1970-01-01T00:00:00.0000Z", dateFormat)
missionID=missions.get(missionName.lower(),None)
else:
if t0 == None:
print("WARNING: the Mission name is not valid. time converte setted to 1970-01-01 00:00:00")
t0 = "1970-01-01T00:00:00"
missionID=missionName
self.console=console
self.missionName=missionName
# Source Packet Header
self.SPH = SourcePacketHeader(data[0:12])
# Packet Data Field
......@@ -109,3 +145,17 @@ class CCSDS:
self.subService=self.PDF.DFHeader.ServiceSubType
self.Data=self.PDF.Data
def __str__(self)->str:
return f"{self.missionName.title()} - APID: {self.APID} - {'TM(' if self.SPH.packetId.packetType.type == 0 else 'TC('}{self.Service},{self.subService}) - Data: {self.Data}"
def __repr__(self) -> str:
return self.__str__()
def show(self):
from rich.panel import Panel
from rich.columns import Columns
c=Columns([Panel(dict2table(self.SPH.serialize()),title="Source Packet Header", border_style='magenta'),
Panel(dict2table(self.PDF.DFHeader.serialize(),reset=True),title="Data Field Header",border_style="magenta"),],
expand=False,equal=False)
p=Panel(c,title=self.__str__(),style='yellow', expand=False)
return p
from rich.table import Table
from rich.align import Align
from re import sub
from functools import wraps
from rich.console import Console
console=Console()
def snake_case(s: str) -> str:
"""Convert a string to snake_case"""
return "_".join(
sub(
"([A-Z][a-z]+)", r" \1", sub("([A-Z]+)", r" \1", s.replace("-", " "))
).split()
).lower()
def sentence_case(s: str) -> str:
"""Convert a string to Sentence case"""
return sub(r"(_|-)+", " ", snake_case(s)).title()
def get_style(value)->str:
"""
Get the style for the value
"""
if isinstance(value, (int, float)):
return "cyan"
return "green"
def counter(f):
@wraps(f)
def wrapper(*args, **kwargs):
if not 'cnt' in globals():
global cnt
cnt = 0
else:
cnt += 1
return f(*args, **kwargs)
return wrapper
styles_array=['yellow','magenta','green','blue', 'red']
@counter
def dict2table(my_data:dict, grid:bool=True, title:str=None,reset:bool =False)->Table:
"""
Convert a dictionary to a table
"""
if reset:
global cnt
cnt=0
if grid:
external=Table.grid()
if title:
external.add_row(Align(f"{title}\n", style="italic", align="center"))
tb=Table.grid()
tb.padding=(0,3)
else:
tb=Table()
tb.title=title
console.log(f"cnt: {cnt}")
tb.add_column('Key',style=f"{styles_array[cnt]} bold")
tb.add_column('Value',style="")
for item in my_data.items():
if isinstance(item[1], dict):
elem=dict2table(item[1],grid=True)
else:
st=get_style(item[1])
elem=f"[{st}]{item[1]}[/]"
tb.add_row(sentence_case(item[0]),elem)
if grid:
external.add_row(tb)
return external
return tb
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment