Skip to content

Commit

Permalink
ps2: Added support for host->device communication
Browse files Browse the repository at this point in the history
Detects if host or device is initiaing communication
Outputs byte for stacked decoder
Outputs human-readable "binary" for logging
  • Loading branch information
kamocat committed Apr 28, 2023
1 parent 73cb546 commit 7d8409c
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 65 deletions.
7 changes: 5 additions & 2 deletions decoders/ps2/__init__.py
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
## This file is part of the libsigrokdecode project.
##
## Copyright (C) 2016 Daniel Schulte <[email protected]>
## Copyright (C) 2023 Marshal Horn <[email protected]>
##
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
Expand All @@ -18,9 +19,11 @@
##

'''
This protocol decoder can decode PS/2 device -> host communication.
This protocol decoder can decode PS/2 device -> host communication \
and host -> device communication.
Host -> device communication is currently not supported.
To interpret the data, please stack the appropriate keyboard or mouse \
decoder
'''

from .pd import Decoder
167 changes: 104 additions & 63 deletions decoders/ps2/pd.py
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
## This file is part of the libsigrokdecode project.
##
## Copyright (C) 2016 Daniel Schulte <[email protected]>
## Copyright (C) 2023 Marshal Horn <[email protected]>
##
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
Expand All @@ -18,22 +19,33 @@
##

import sigrokdecode as srd
from collections import namedtuple

class Ann:
BIT, START, STOP, PARITY_OK, PARITY_ERR, DATA, WORD = range(7)
BIT, START, WORD, PARITY_OK, PARITY_ERR, STOP, ACK, NACK = range(8)

class Bit:
def __init__(self, val, ss, es):
self.val = val
self.ss = ss
self.es = es

class Ps2Packet:
def __init__(self, val, host=False, pok=False, ack=False):
self.val = val #byte value
self.host = host #Host transmissions
self.pok = pok #Parity ok
self.ack = ack #Acknowlege ok for host transmission.

Bit = namedtuple('Bit', 'val ss es')

class Decoder(srd.Decoder):
api_version = 3
id = 'ps2'
name = 'PS/2'
longname = 'PS/2'
desc = 'PS/2 keyboard/mouse interface.'
desc = 'PS/2 packet interface used by PC keyboards and mice'
license = 'gplv2+'
inputs = ['logic']
outputs = []
outputs = ['ps2_packet']
tags = ['PC']
channels = (
{'id': 'clk', 'name': 'Clock', 'desc': 'Clock line'},
Expand All @@ -42,15 +54,22 @@ class Decoder(srd.Decoder):
annotations = (
('bit', 'Bit'),
('start-bit', 'Start bit'),
('stop-bit', 'Stop bit'),
('word', 'Word'),
('parity-ok', 'Parity OK bit'),
('parity-err', 'Parity error bit'),
('data-bit', 'Data bit'),
('stop-bit', 'Stop bit'),
('ack', 'Acknowledge'),
('nack', 'Not Acknowledge'),
('start-bit', 'Start bit'),
('word', 'Word'),
('parity-ok', 'Parity OK bit'),
('parity-err', 'Parity error bit'),
('stop-bit', 'Stop bit'),
)
annotation_rows = (
('bits', 'Bits', (0,)),
('fields', 'Fields', (1, 2, 3, 4, 5, 6)),
('fields', 'Device', (1,2,3,4,5,6,7,)),
('host', 'Host', (8,9,10,11,12)),
)

def __init__(self):
Expand All @@ -62,65 +81,87 @@ def reset(self):

def start(self):
self.out_ann = self.register(srd.OUTPUT_ANN)

def putb(self, bit, ann_idx):
b = self.bits[bit]
self.put(b.ss, b.es, self.out_ann, [ann_idx, [str(b.val)]])

def putx(self, bit, ann):
self.out_py = self.register(srd.OUTPUT_PYTHON)

def metadata(self,key,value):
if key == srd.SRD_CONF_SAMPLERATE:
self.samplerate = value

def get_bits(self, n, edge:'r', timeout=100e-6):
max_period = int(timeout * self.samplerate) + 1
for i in range(n):
_, dat = self.wait([{0:edge},{'skip':max_period}])
if not self.matched[0]:
break #Timed out
self.bits.append(Bit(dat, self.samplenum, self.samplenum+max_period))
if i>0: #Fix the ending period
self.bits[i-1].es = self.samplenum
if len(self.bits) == n:
self.wait([{0:'r'},{'skip':max_period}])
self.bits[-1].es = self.samplenum
self.bitcount = len(self.bits)

def putx(self, bit, ann, host=False):
if host:
ann[0] += 7 #host annotation offset
self.put(self.bits[bit].ss, self.bits[bit].es, self.out_ann, ann)

def handle_bits(self, datapin):
# Ignore non start condition bits (useful during keyboard init).
if self.bitcount == 0 and datapin == 1:
return

# Store individual bits and their start/end samplenumbers.
self.bits.append(Bit(datapin, self.samplenum, self.samplenum))

# Fix up end sample numbers of the bits.
if self.bitcount > 0:
b = self.bits[self.bitcount - 1]
self.bits[self.bitcount - 1] = Bit(b.val, b.ss, self.samplenum)
if self.bitcount == 11:
self.bitwidth = self.bits[1].es - self.bits[2].es
b = self.bits[-1]
self.bits[-1] = Bit(b.val, b.ss, b.es + self.bitwidth)

# Find all 11 bits. Start + 8 data + odd parity + stop.
if self.bitcount < 11:
self.bitcount += 1
return

# Extract data word.
word = 0
for i in range(8):
word |= (self.bits[i + 1].val << i)
def handle_bits(self, host=False):
# Annotate individual bits
for b in self.bits:
self.put(b.ss, b.es, self.out_ann, [Ann.BIT, [str(b.val)]])

packet = None
# Annotate start bit
if self.bitcount > 8:
self.putx(0, [Ann.START, ['Start bit', 'Start', 'S']], host)
# Annotate the data word
word = 0
for i in range(8):
word |= (self.bits[i + 1].val << i)
self.put(self.bits[1].ss, self.bits[8].es, self.out_ann,
[Ann.WORD+7 if host else Ann.WORD,
['Data: %02x' % word, 'D: %02x' % word, '%02x' % word]])
packet = Ps2Packet(val = word, host = host)

# Calculate parity.
parity_ok = (bin(word).count('1') + self.bits[9].val) % 2 == 1

# Emit annotations.
for i in range(11):
self.putb(i, Ann.BIT)
self.putx(0, [Ann.START, ['Start bit', 'Start', 'S']])
self.put(self.bits[1].ss, self.bits[8].es, self.out_ann, [Ann.WORD,
['Data: %02x' % word, 'D: %02x' % word, '%02x' % word]])
if parity_ok:
self.putx(9, [Ann.PARITY_OK, ['Parity OK', 'Par OK', 'P']])
else:
self.putx(9, [Ann.PARITY_ERR, ['Parity error', 'Par err', 'PE']])
self.putx(10, [Ann.STOP, ['Stop bit', 'Stop', 'St', 'T']])

self.bits, self.bitcount = [], 0
if self.bitcount > 9:
parity_ok = 0
for bit in self.bits[1:10]:
parity_ok ^= bit.val
if bool(parity_ok):
self.putx(9, [Ann.PARITY_OK, ['Parity OK', 'Par OK', 'P']], host)
packet.pok = True #Defaults to false in case packet was interrupted
else:
self.putx(9, [Ann.PARITY_ERR, ['Parity error', 'Par err', 'PE']], host)

# Annotate stop bit
if self.bitcount > 10:
self.putx(10, [Ann.STOP+7 if host else Ann.STOP, ['Stop bit', 'Stop', 'St', 'T']])
# Annotate ACK
if host and self.bitcount > 11:
if self.bits[11].val == 0:
self.putx(11, [Ann.ACK, ['Acknowledge', 'Ack', 'A']])
else:
self.putx(11, [Ann.NACK, ['Not Acknowledge', 'Nack', 'N']])
packet.ack = not bool(self.bits[11].val)

if(packet):
self.put(self.bits[0].ss, self.bits[-1].ss, self.out_py,packet)
self.reset()


def decode(self):
if not self.samplerate:
raise SamplerateError("Cannot decode without samplerate")
while True:
# Sample data bits on the falling clock edge (assume the device
# is the transmitter). Expect the data byte transmission to end
# at the rising clock edge. Cope with the absence of host activity.
_, data_pin = self.wait({0: 'f'})
self.handle_bits(data_pin)
if self.bitcount == 1 + 8 + 1 + 1:
_, data_pin = self.wait({0: 'r'})
self.handle_bits(data_pin)
# Falling edge of data indicates start condition
clk, dat = self.wait({1: 'l'})
host = not bool(clk)
if host:
# Host emits bits on rising clk edge
self.get_bits(12, 'r')
else:
# Client emits data on falling edge
self.get_bits(11, 'f')
self.handle_bits(host=host)

0 comments on commit 7d8409c

Please sign in to comment.