Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue-#115 Add support for TC transfer frames #153

Merged
merged 1 commit into from
Feb 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 66 additions & 0 deletions ait/dsn/plugins/TCTF_Manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import ait.core
import ait.dsn.sle.tctf as tctf
from ait.core.server.plugins import Plugin


class TCTF_Manager(Plugin):
"""
Data Processing Pipeline that encodes payloads in TCTF protocol as described by CCSDS standards.
https://public.ccsds.org/Pubs/232x0b4.pdf

The managed parameters are loaded from the AIT config.yaml
See CCSDS Blue Book for information regarding managed parameters.


Sample Configuration Part I:
----------------------------
server:
plugins:
- plugin:
name: ait.dsn.plugins.TCTF_Manager.TCTF_Manager
inputs:
- command_stream

Sample Configuration Part II:
-----------------------------
default:
dsn:
tctf:
transfer_frame_version_number: 0
bypass_flag: 0
control_command_flag: 0
reserved: 0
uplink_spacecraft_id: 123
virtual_channel_id: 0
frame_sequence_number: 0
apply_error_correction_field: True
"""
def __init__(self, inputs=None, outputs=None, zmq_args=None,
command_subscriber=None, managed_parameters=None):
super().__init__(inputs, outputs, zmq_args)

config_prefix = 'dsn.sle.tctf.'
self.tf_version_num = ait.config.get(config_prefix+'transfer_frame_version_number', None)
self.bypass = ait.config.get(config_prefix+'bypass_flag', None)
self.cc = ait.config.get(config_prefix+'control_command_flag', None)
self.rsvd = ait.config.get(config_prefix+'reserved', None)
self.scID = ait.config.get(config_prefix+'uplink_spacecraft_id', None)
self.vcID = ait.config.get(config_prefix+'virtual_channel_id', None)
self.frame_seq_num = ait.config.get(config_prefix+'frame_sequence_number', None)
self.apply_ecf = ait.config.get(config_prefix+'apply_error_correction_field', None)

def process(self, data_field_byte_array, topic=None):
frame = tctf.TCTransFrame(tf_version_num=self.tf_version_num,
bypass=self.bypass, cc=self.cc,
rsvd=self.rsvd, scID=self.scID,
vcID=self.vcID,
frame_seq_num=self.frame_seq_num,
data_field=data_field_byte_array,
apply_ecf=self.apply_ecf)

encoded_frame = frame.encode()
ait.core.log.debug(f"TCTF_Manager: {encoded_frame}")

self.publish(encoded_frame)
self.frame_seq_num = (self.frame_seq_num + 1) % 255
return encoded_frame
1 change: 1 addition & 0 deletions ait/dsn/plugins/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

7 changes: 0 additions & 7 deletions ait/dsn/sle/frames.py
Original file line number Diff line number Diff line change
Expand Up @@ -585,10 +585,3 @@ def decode_dataField_Idle(self, datafield):

def encode(self):
pass


class TCTransFrame(object):
''''''
# TODO: Implement
# See C Space Data Link Protocol pg. 4-1 for further information
pass
245 changes: 245 additions & 0 deletions ait/dsn/sle/tctf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,245 @@
from enum import Enum, auto
from collections import OrderedDict, namedtuple
from ait.core import log
from bitstring import BitArray
import binascii


class HeaderKeys(Enum):
"""HeaderKeys is an Enum used as a key in TCTF hashmaps"""
TRANSFER_FRAME_VERSION_NUM = auto()
BYPASS_FLAG = auto()
CONTROL_COMMAND_FLAG = auto()
RESERVED = auto()
SPACECRAFT_ID = auto()
VIRTUAL_CHANNEL_ID = auto()
FRAME_LENGTH = auto()
FRAME_SEQ_NUM = auto()


class ICD:
"""
Contains definitions and other useful values as defined in the CCSDS ICD, 4.1.2.1
https://public.ccsds.org/Pubs/232x0b4.pdf
"""
class Sizes(Enum):
"""
Defines various useful size constants derrived from the CCSDS ICD.
TCTFS in the SDLS Protocol may define values smaller than the ones described here.
"""
MAX_FRAME_OCTETS = 1024
MAX_FRAME_BIN = MAX_FRAME_OCTETS * 8

ECF_OCTETS = 2
ECF_BIN = ECF_OCTETS * 8

PRIMARY_HEADER_OCTETS = 5
PRIMARY_HEADER_BIN = PRIMARY_HEADER_OCTETS * 8

MAX_DATA_FIELD_NO_ECF_OCTETS = 1019
MAX_FRAME_NO_ECF_OCTETS = PRIMARY_HEADER_OCTETS + MAX_DATA_FIELD_NO_ECF_OCTETS

MAX_DATA_FIELD_ECF_OCTETS = MAX_DATA_FIELD_NO_ECF_OCTETS - ECF_OCTETS
MAX_FRAME_ECF_OCTETS = PRIMARY_HEADER_OCTETS + MAX_DATA_FIELD_ECF_OCTETS + ECF_OCTETS

MAX_DATA_FIELD_NO_ECF_BIN = MAX_DATA_FIELD_NO_ECF_OCTETS * 8
MAX_DATA_FIELD_ECF_BIN = MAX_DATA_FIELD_ECF_OCTETS * 8

class Header():
"""
INFO contains a map associating a HeaderKey to a tuple of bit_size and mandatory,
which defines the Header Field per CCSDS.
"""
Field = namedtuple('Field', ['bit_size', 'mandatory'])
INFO = OrderedDict()
INFO[HeaderKeys.TRANSFER_FRAME_VERSION_NUM] = Field(2, True)
INFO[HeaderKeys.BYPASS_FLAG] = Field(1, True)
INFO[HeaderKeys.CONTROL_COMMAND_FLAG] = Field(1, True)
INFO[HeaderKeys.RESERVED] = Field(2, True)
INFO[HeaderKeys.SPACECRAFT_ID] = Field(10, True)
INFO[HeaderKeys.VIRTUAL_CHANNEL_ID] = Field(6, True)
INFO[HeaderKeys.FRAME_LENGTH] = Field(10, True)
INFO[HeaderKeys.FRAME_SEQ_NUM] = Field(8, True)

class CRC:
"""
Contains crc_func.
The ICD describes some variation of CRC-16-CCITT, where
Name Poly Reverse Remainder Final XOR Check
crc-ccitt-false 0x11021 False 0xFFFF 0x0000 0x29B1
Which is used for generating the ECF section of the TCTF.
"""
crc_func = binascii.crc_hqx
Mejiro-McQueen marked this conversation as resolved.
Show resolved Hide resolved

class HeaderSlices:
"""
HeaderSlices contains SLICES, which maps a HeaderKey to a python slice.
The slices are used to extract sections of the frame binary.

See the TCTransFrame decode function
"""
SLICES = OrderedDict()
SLICES[HeaderKeys.TRANSFER_FRAME_VERSION_NUM] = slice(0, 2)
SLICES[HeaderKeys.BYPASS_FLAG] = slice(2, 3)
SLICES[HeaderKeys.CONTROL_COMMAND_FLAG] = slice(3, 4)
SLICES[HeaderKeys.RESERVED] = slice(4, 6)
SLICES[HeaderKeys.SPACECRAFT_ID] = slice(6, 16)
SLICES[HeaderKeys.VIRTUAL_CHANNEL_ID] = slice(16, 22)
SLICES[HeaderKeys.FRAME_LENGTH] = slice(22, 32)
SLICES[HeaderKeys.FRAME_SEQ_NUM] = slice(32, 41)


class TCTransFrame():
"""
An instance of TCTransFrame fully defines a TCTF.
Encoding is deferred until the encode method is called.

The static decode method will translate a TCTF binary into a named tuple.

apply_ecf defines whether the ECF field should be calculated and attached to the TCTF.

Parameters are expected to comply with CCSDS standard, and no validation is done on them.
"""
DecodedTCTF = namedtuple('DecodedTCTF', ["header_map", "payload", "ecf"])

def decode(data, has_ecf=None):
if has_ecf:
payload = data[5:-2]
ecf = data[-2:]
else:
payload = data[5:]
ecf = None

header = BitArray(data[0:5]).bin

decoded_header = OrderedDict()
for key in HeaderKeys:
slice = ICD.HeaderSlices.SLICES[key]
val_bin = header[slice]
val_eng = int(val_bin, 2)
decoded_header[key] = val_eng
log.debug(f"TCTransFrame => decode -> {key}, {slice}, Val_Bin:{val_bin}, bin_len={len(val_bin)}, decode={val_eng}")

return TCTransFrame.DecodedTCTF(decoded_header, payload, ecf)

def __init__(self, tf_version_num, bypass, cc, rsvd, scID, vcID,
frame_seq_num, data_field, apply_ecf=False):

"""
Initialize a TCTF.
Encoding the value is deferred until the encode method is called.

Parameters
----------
tf_version_num : TeleCommand Transfer Version Number*
bypass : Bypass Flag*
cc : Control Command Flag*
rsvd : Reserved Space*
scID : Spacecraft Identifier*
vcID : Virtual Channel Identifier*
frame_seq_num : Frame Sequence Number*
data_field : Transfer Frame Data Field*, a bytearray representing the TCTF payload.
apply_ecf : Flag used to apply ECF frames.

* See CCSDS ICD for more information regarding parameter meaning and usage.
"""
# Header Data
self.primary_header = OrderedDict()

# Data Field
self.encoded_data_field = data_field
self.size_data_field_bin = len(self.encoded_data_field) * 8

# ECF Data
self.apply_ecf = apply_ecf
self.encoded_crc = None
# Finalize ecf size
if self.apply_ecf:
self.size_ecf_bin = ICD.Sizes.ECF_BIN.value
else:
self.size_ecf_bin = 0

# Final Frame Data
self.encoded_frame = None
self.size_frame_bin = sum([ICD.Sizes.PRIMARY_HEADER_BIN.value,
self.size_data_field_bin,
self.size_ecf_bin])
# Set Primary Header and defer encoding
self.set_primary_header(tf_version_num, bypass, cc, rsvd, scID, vcID,
frame_seq_num)

def encode(self):
"""
When called, returns bytes representing the TCTF.
If called a second time, returns the previously computed TCTF.
"""
# Exit early if we have previously encoded
if self.encoded_frame:
return self.encoded_frame

# Finalize primaryheader per ICD
self.encode_primary_header()

# Attach Payload
frame_no_crc_bytes = self.encoded_primary_header + self.encoded_data_field

# Attach CRC as ECF
self.encode_ecf(frame_no_crc_bytes)
self.encoded_frame = frame_no_crc_bytes + self.encoded_crc

return self.encoded_frame

def set_primary_header(self, tf_version_num, bypass, cc, rsvd, scID, vcID,
frame_seq_num):
"""
Prepares the TCTF instance for generating the TCTF.
"""
# Finalize frame size
frame_len = int(self.size_frame_bin / 8)-1
log.debug((f"TCTransFrame => set_primary_header -> "
f"FRAMELENGTH: {frame_len}"))

# Insertion order is critical. Must match ICD order.
self.primary_header[HeaderKeys.TRANSFER_FRAME_VERSION_NUM] = tf_version_num
self.primary_header[HeaderKeys.BYPASS_FLAG] = bypass
self.primary_header[HeaderKeys.CONTROL_COMMAND_FLAG] = cc
self.primary_header[HeaderKeys.RESERVED] = rsvd
self.primary_header[HeaderKeys.SPACECRAFT_ID] = scID
self.primary_header[HeaderKeys.VIRTUAL_CHANNEL_ID] = vcID
self.primary_header[HeaderKeys.FRAME_LENGTH] = frame_len
self.primary_header[HeaderKeys.FRAME_SEQ_NUM] = frame_seq_num

def encode_primary_header(self):
"""
Returns bytes representing the primary header.
"""
new_header = BitArray()
for header_field, header_data in self.primary_header.items():
size = ICD.Header.INFO[header_field].bit_size
padded_segment = format(header_data, f'0{size}b')
segment = BitArray(bin=padded_segment)
new_header.append(segment)
log.debug((f"TCTransFrame => encode_primary_header -> "
f"{header_field} ::=> "
f"Encoding: {segment}, "
f"Length: {len(segment)}"))
self.encoded_primary_header = new_header.bytes
log.debug((f"TCTransFrame => encode_primary_header -> "
f"Header= {new_header}, "
f"Size= {len(new_header)}"))
return self.primary_header

def encode_ecf(self, frame_no_crc_bytes):
"""
Returns bytes representing the ECF segment of the TCTF.
If the apply_ecf flag is not set, an empty bytearray is returned.
"""
if self.apply_ecf:
crc_val = ICD.CRC.crc_func(frame_no_crc_bytes, 0xFFFF)
self.encoded_crc = crc_val.to_bytes(ICD.Sizes.ECF_OCTETS.value,
byteorder="big")
else:
self.encoded_crc = bytes()
log.debug((f"TCTransFrame => encode_ecg -> Encoded CRC: "
f"{self.encoded_crc}"))
return self.encoded_crc
Loading