Skip to content

Commit

Permalink
Merge pull request #153 from Mejiro-McQueen/issue-115
Browse files Browse the repository at this point in the history
Issue-#115 Add support for TC transfer frames
  • Loading branch information
nttoole authored Feb 15, 2022
2 parents 8cfc0bd + 15f4b4f commit ac358cd
Show file tree
Hide file tree
Showing 7 changed files with 547 additions and 8 deletions.
66 changes: 66 additions & 0 deletions ait/dsn/plugins/TCTF_Manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import ait.core
import ait.dsn.sle.tctf as tctf
from ait.core.server.plugins import Plugin


class TCTF_Manager(Plugin):
"""
Data Processing Pipeline that encodes payloads in TCTF protocol as described by CCSDS standards.
https://public.ccsds.org/Pubs/232x0b4.pdf
The managed parameters are loaded from the AIT config.yaml
See CCSDS Blue Book for information regarding managed parameters.
Sample Configuration Part I:
----------------------------
server:
plugins:
- plugin:
name: ait.dsn.plugins.TCTF_Manager.TCTF_Manager
inputs:
- command_stream
Sample Configuration Part II:
-----------------------------
default:
dsn:
tctf:
transfer_frame_version_number: 0
bypass_flag: 0
control_command_flag: 0
reserved: 0
uplink_spacecraft_id: 123
virtual_channel_id: 0
frame_sequence_number: 0
apply_error_correction_field: True
"""
def __init__(self, inputs=None, outputs=None, zmq_args=None,
command_subscriber=None, managed_parameters=None):
super().__init__(inputs, outputs, zmq_args)

config_prefix = 'dsn.sle.tctf.'
self.tf_version_num = ait.config.get(config_prefix+'transfer_frame_version_number', None)
self.bypass = ait.config.get(config_prefix+'bypass_flag', None)
self.cc = ait.config.get(config_prefix+'control_command_flag', None)
self.rsvd = ait.config.get(config_prefix+'reserved', None)
self.scID = ait.config.get(config_prefix+'uplink_spacecraft_id', None)
self.vcID = ait.config.get(config_prefix+'virtual_channel_id', None)
self.frame_seq_num = ait.config.get(config_prefix+'frame_sequence_number', None)
self.apply_ecf = ait.config.get(config_prefix+'apply_error_correction_field', None)

def process(self, data_field_byte_array, topic=None):
frame = tctf.TCTransFrame(tf_version_num=self.tf_version_num,
bypass=self.bypass, cc=self.cc,
rsvd=self.rsvd, scID=self.scID,
vcID=self.vcID,
frame_seq_num=self.frame_seq_num,
data_field=data_field_byte_array,
apply_ecf=self.apply_ecf)

encoded_frame = frame.encode()
ait.core.log.debug(f"TCTF_Manager: {encoded_frame}")

self.publish(encoded_frame)
self.frame_seq_num = (self.frame_seq_num + 1) % 255
return encoded_frame
1 change: 1 addition & 0 deletions ait/dsn/plugins/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

7 changes: 0 additions & 7 deletions ait/dsn/sle/frames.py
Original file line number Diff line number Diff line change
Expand Up @@ -585,10 +585,3 @@ def decode_dataField_Idle(self, datafield):

def encode(self):
pass


class TCTransFrame(object):
''''''
# TODO: Implement
# See C Space Data Link Protocol pg. 4-1 for further information
pass
245 changes: 245 additions & 0 deletions ait/dsn/sle/tctf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,245 @@
from enum import Enum, auto
from collections import OrderedDict, namedtuple
from ait.core import log
from bitstring import BitArray
import binascii


class HeaderKeys(Enum):
"""HeaderKeys is an Enum used as a key in TCTF hashmaps"""
TRANSFER_FRAME_VERSION_NUM = auto()
BYPASS_FLAG = auto()
CONTROL_COMMAND_FLAG = auto()
RESERVED = auto()
SPACECRAFT_ID = auto()
VIRTUAL_CHANNEL_ID = auto()
FRAME_LENGTH = auto()
FRAME_SEQ_NUM = auto()


class ICD:
"""
Contains definitions and other useful values as defined in the CCSDS ICD, 4.1.2.1
https://public.ccsds.org/Pubs/232x0b4.pdf
"""
class Sizes(Enum):
"""
Defines various useful size constants derrived from the CCSDS ICD.
TCTFS in the SDLS Protocol may define values smaller than the ones described here.
"""
MAX_FRAME_OCTETS = 1024
MAX_FRAME_BIN = MAX_FRAME_OCTETS * 8

ECF_OCTETS = 2
ECF_BIN = ECF_OCTETS * 8

PRIMARY_HEADER_OCTETS = 5
PRIMARY_HEADER_BIN = PRIMARY_HEADER_OCTETS * 8

MAX_DATA_FIELD_NO_ECF_OCTETS = 1019
MAX_FRAME_NO_ECF_OCTETS = PRIMARY_HEADER_OCTETS + MAX_DATA_FIELD_NO_ECF_OCTETS

MAX_DATA_FIELD_ECF_OCTETS = MAX_DATA_FIELD_NO_ECF_OCTETS - ECF_OCTETS
MAX_FRAME_ECF_OCTETS = PRIMARY_HEADER_OCTETS + MAX_DATA_FIELD_ECF_OCTETS + ECF_OCTETS

MAX_DATA_FIELD_NO_ECF_BIN = MAX_DATA_FIELD_NO_ECF_OCTETS * 8
MAX_DATA_FIELD_ECF_BIN = MAX_DATA_FIELD_ECF_OCTETS * 8

class Header():
"""
INFO contains a map associating a HeaderKey to a tuple of bit_size and mandatory,
which defines the Header Field per CCSDS.
"""
Field = namedtuple('Field', ['bit_size', 'mandatory'])
INFO = OrderedDict()
INFO[HeaderKeys.TRANSFER_FRAME_VERSION_NUM] = Field(2, True)
INFO[HeaderKeys.BYPASS_FLAG] = Field(1, True)
INFO[HeaderKeys.CONTROL_COMMAND_FLAG] = Field(1, True)
INFO[HeaderKeys.RESERVED] = Field(2, True)
INFO[HeaderKeys.SPACECRAFT_ID] = Field(10, True)
INFO[HeaderKeys.VIRTUAL_CHANNEL_ID] = Field(6, True)
INFO[HeaderKeys.FRAME_LENGTH] = Field(10, True)
INFO[HeaderKeys.FRAME_SEQ_NUM] = Field(8, True)

class CRC:
"""
Contains crc_func.
The ICD describes some variation of CRC-16-CCITT, where
Name Poly Reverse Remainder Final XOR Check
crc-ccitt-false 0x11021 False 0xFFFF 0x0000 0x29B1
Which is used for generating the ECF section of the TCTF.
"""
crc_func = binascii.crc_hqx

class HeaderSlices:
"""
HeaderSlices contains SLICES, which maps a HeaderKey to a python slice.
The slices are used to extract sections of the frame binary.
See the TCTransFrame decode function
"""
SLICES = OrderedDict()
SLICES[HeaderKeys.TRANSFER_FRAME_VERSION_NUM] = slice(0, 2)
SLICES[HeaderKeys.BYPASS_FLAG] = slice(2, 3)
SLICES[HeaderKeys.CONTROL_COMMAND_FLAG] = slice(3, 4)
SLICES[HeaderKeys.RESERVED] = slice(4, 6)
SLICES[HeaderKeys.SPACECRAFT_ID] = slice(6, 16)
SLICES[HeaderKeys.VIRTUAL_CHANNEL_ID] = slice(16, 22)
SLICES[HeaderKeys.FRAME_LENGTH] = slice(22, 32)
SLICES[HeaderKeys.FRAME_SEQ_NUM] = slice(32, 41)


class TCTransFrame():
"""
An instance of TCTransFrame fully defines a TCTF.
Encoding is deferred until the encode method is called.
The static decode method will translate a TCTF binary into a named tuple.
apply_ecf defines whether the ECF field should be calculated and attached to the TCTF.
Parameters are expected to comply with CCSDS standard, and no validation is done on them.
"""
DecodedTCTF = namedtuple('DecodedTCTF', ["header_map", "payload", "ecf"])

def decode(data, has_ecf=None):
if has_ecf:
payload = data[5:-2]
ecf = data[-2:]
else:
payload = data[5:]
ecf = None

header = BitArray(data[0:5]).bin

decoded_header = OrderedDict()
for key in HeaderKeys:
slice = ICD.HeaderSlices.SLICES[key]
val_bin = header[slice]
val_eng = int(val_bin, 2)
decoded_header[key] = val_eng
log.debug(f"TCTransFrame => decode -> {key}, {slice}, Val_Bin:{val_bin}, bin_len={len(val_bin)}, decode={val_eng}")

return TCTransFrame.DecodedTCTF(decoded_header, payload, ecf)

def __init__(self, tf_version_num, bypass, cc, rsvd, scID, vcID,
frame_seq_num, data_field, apply_ecf=False):

"""
Initialize a TCTF.
Encoding the value is deferred until the encode method is called.
Parameters
----------
tf_version_num : TeleCommand Transfer Version Number*
bypass : Bypass Flag*
cc : Control Command Flag*
rsvd : Reserved Space*
scID : Spacecraft Identifier*
vcID : Virtual Channel Identifier*
frame_seq_num : Frame Sequence Number*
data_field : Transfer Frame Data Field*, a bytearray representing the TCTF payload.
apply_ecf : Flag used to apply ECF frames.
* See CCSDS ICD for more information regarding parameter meaning and usage.
"""
# Header Data
self.primary_header = OrderedDict()

# Data Field
self.encoded_data_field = data_field
self.size_data_field_bin = len(self.encoded_data_field) * 8

# ECF Data
self.apply_ecf = apply_ecf
self.encoded_crc = None
# Finalize ecf size
if self.apply_ecf:
self.size_ecf_bin = ICD.Sizes.ECF_BIN.value
else:
self.size_ecf_bin = 0

# Final Frame Data
self.encoded_frame = None
self.size_frame_bin = sum([ICD.Sizes.PRIMARY_HEADER_BIN.value,
self.size_data_field_bin,
self.size_ecf_bin])
# Set Primary Header and defer encoding
self.set_primary_header(tf_version_num, bypass, cc, rsvd, scID, vcID,
frame_seq_num)

def encode(self):
"""
When called, returns bytes representing the TCTF.
If called a second time, returns the previously computed TCTF.
"""
# Exit early if we have previously encoded
if self.encoded_frame:
return self.encoded_frame

# Finalize primaryheader per ICD
self.encode_primary_header()

# Attach Payload
frame_no_crc_bytes = self.encoded_primary_header + self.encoded_data_field

# Attach CRC as ECF
self.encode_ecf(frame_no_crc_bytes)
self.encoded_frame = frame_no_crc_bytes + self.encoded_crc

return self.encoded_frame

def set_primary_header(self, tf_version_num, bypass, cc, rsvd, scID, vcID,
frame_seq_num):
"""
Prepares the TCTF instance for generating the TCTF.
"""
# Finalize frame size
frame_len = int(self.size_frame_bin / 8)-1
log.debug((f"TCTransFrame => set_primary_header -> "
f"FRAMELENGTH: {frame_len}"))

# Insertion order is critical. Must match ICD order.
self.primary_header[HeaderKeys.TRANSFER_FRAME_VERSION_NUM] = tf_version_num
self.primary_header[HeaderKeys.BYPASS_FLAG] = bypass
self.primary_header[HeaderKeys.CONTROL_COMMAND_FLAG] = cc
self.primary_header[HeaderKeys.RESERVED] = rsvd
self.primary_header[HeaderKeys.SPACECRAFT_ID] = scID
self.primary_header[HeaderKeys.VIRTUAL_CHANNEL_ID] = vcID
self.primary_header[HeaderKeys.FRAME_LENGTH] = frame_len
self.primary_header[HeaderKeys.FRAME_SEQ_NUM] = frame_seq_num

def encode_primary_header(self):
"""
Returns bytes representing the primary header.
"""
new_header = BitArray()
for header_field, header_data in self.primary_header.items():
size = ICD.Header.INFO[header_field].bit_size
padded_segment = format(header_data, f'0{size}b')
segment = BitArray(bin=padded_segment)
new_header.append(segment)
log.debug((f"TCTransFrame => encode_primary_header -> "
f"{header_field} ::=> "
f"Encoding: {segment}, "
f"Length: {len(segment)}"))
self.encoded_primary_header = new_header.bytes
log.debug((f"TCTransFrame => encode_primary_header -> "
f"Header= {new_header}, "
f"Size= {len(new_header)}"))
return self.primary_header

def encode_ecf(self, frame_no_crc_bytes):
"""
Returns bytes representing the ECF segment of the TCTF.
If the apply_ecf flag is not set, an empty bytearray is returned.
"""
if self.apply_ecf:
crc_val = ICD.CRC.crc_func(frame_no_crc_bytes, 0xFFFF)
self.encoded_crc = crc_val.to_bytes(ICD.Sizes.ECF_OCTETS.value,
byteorder="big")
else:
self.encoded_crc = bytes()
log.debug((f"TCTransFrame => encode_ecg -> Encoded CRC: "
f"{self.encoded_crc}"))
return self.encoded_crc
Loading

0 comments on commit ac358cd

Please sign in to comment.