Skip to content

Commit

Permalink
Merged etingof#196 from etingof/pyasn1
Browse files Browse the repository at this point in the history
  • Loading branch information
Lostboi committed Dec 21, 2020
1 parent d07dd2e commit 64ccb34
Show file tree
Hide file tree
Showing 10 changed files with 394 additions and 0 deletions.
2 changes: 2 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ Revision 0.5.0, released XX-03-2020
`StreamingDecoder` class. Previously published API is implemented
as a thin wrapper on top of that ensuring backward compatibility.

- Added support for previously missing `RELATIVE-OID` construct.

Revision 0.4.9, released XX-03-2020
-----------------------------------

Expand Down
1 change: 1 addition & 0 deletions THANKS.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ Alex Gaynor
Geoffrey Thomas
Daniel Bratell
Kim Gräsman
Russ Housley
49 changes: 49 additions & 0 deletions pyasn1/codec/ber/decoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,54 @@ def valueDecoder(self, substrate, asn1Spec,

yield self._createComponent(asn1Spec, tagSet, value, **options)

class RelativeOIDPayloadDecoder(AbstractSimplePayloadDecoder):
protoComponent = univ.RelativeOID(())

def valueDecoder(self, substrate, asn1Spec,
tagSet=None, length=None, state=None,
decodeFun=None, substrateFun=None,
**options):
if tagSet[0].tagFormat != tag.tagFormatSimple:
raise error.PyAsn1Error('Simple tag format expected')

for chunk in readFromStream(substrate, length, options):
if isinstance(chunk, SubstrateUnderrunError):
yield chunk

if not chunk:
raise error.PyAsn1Error('Empty substrate')

chunk = octs2ints(chunk)

reloid = ()
index = 0
substrateLen = len(chunk)
while index < substrateLen:
subId = chunk[index]
index += 1
if subId < 128:
reloid += (subId,)
elif subId > 128:
# Construct subid from a number of octets
nextSubId = subId
subId = 0
while nextSubId >= 128:
subId = (subId << 7) + (nextSubId & 0x7F)
if index >= substrateLen:
raise error.SubstrateUnderrunError(
'Short substrate for sub-OID past %s' % (reloid,)
)
nextSubId = chunk[index]
index += 1
reloid += ((subId << 7) + nextSubId,)
elif subId == 128:
# ASN.1 spec forbids leading zeros (0x80) in OID
# encoding, tolerating it opens a vulnerability. See
# https://www.esat.kuleuven.be/cosic/publications/article-1432.pdf
# page 7
raise error.PyAsn1Error('Invalid octet 0x80 in RELATIVE-OID encoding')

yield self._createComponent(asn1Spec, tagSet, reloid, **options)

class BooleanPayloadDecoder(IntegerPayloadDecoder):
protoComponent = univ.Boolean(0)
Expand Down Expand Up @@ -1417,6 +1465,7 @@ class UTCTimePayloadDecoder(OctetStringPayloadDecoder):
univ.OctetString.tagSet: OctetStringPayloadDecoder(),
univ.Null.tagSet: NullPayloadDecoder(),
univ.ObjectIdentifier.tagSet: ObjectIdentifierPayloadDecoder(),
univ.RelativeOID.tagSet: RelativeOIDPayloadDecoder(),
univ.Enumerated.tagSet: IntegerPayloadDecoder(),
univ.Real.tagSet: RealPayloadDecoder(),
univ.Sequence.tagSet: SequenceOrSequenceOfPayloadDecoder(), # conflicts with SequenceOf
Expand Down
35 changes: 35 additions & 0 deletions pyasn1/codec/ber/encoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,39 @@ def encodeValue(self, value, asn1Spec, encodeFun, **options):
return octets, False, False


class RelativeOIDEncoder(AbstractItemEncoder):
supportIndefLenMode = False

def encodeValue(self, value, asn1Spec, encodeFun, **options):
if asn1Spec is not None:
value = asn1Spec.clone(value)

octets = ()

# Cycle through subIds
for subOid in value.asTuple():
if 0 <= subOid <= 127:
# Optimize for the common case
octets += (subOid,)

elif subOid > 127:
# Pack large Sub-Object IDs
res = (subOid & 0x7f,)
subOid >>= 7

while subOid:
res = (0x80 | (subOid & 0x7f),) + res
subOid >>= 7

# Add packed Sub-Object ID to resulted RELATIVE-OID
octets += res

else:
raise error.PyAsn1Error('Negative RELATIVE-OID arc %s at %s' % (subOid, value))

return octets, False, False


class RealEncoder(AbstractItemEncoder):
supportIndefLenMode = 0
binEncBase = 2 # set to None to choose encoding base automatically
Expand Down Expand Up @@ -714,6 +747,7 @@ def encodeValue(self, value, asn1Spec, encodeFun, **options):
univ.OctetString.tagSet: OctetStringEncoder(),
univ.Null.tagSet: NullEncoder(),
univ.ObjectIdentifier.tagSet: ObjectIdentifierEncoder(),
univ.RelativeOID.tagSet: RelativeOIDEncoder(),
univ.Enumerated.tagSet: IntegerEncoder(),
univ.Real.tagSet: RealEncoder(),
# Sequence & Set have same tags as SequenceOf & SetOf
Expand Down Expand Up @@ -746,6 +780,7 @@ def encodeValue(self, value, asn1Spec, encodeFun, **options):
univ.OctetString.typeId: OctetStringEncoder(),
univ.Null.typeId: NullEncoder(),
univ.ObjectIdentifier.typeId: ObjectIdentifierEncoder(),
univ.RelativeOID.typeId: RelativeOIDEncoder(),
univ.Enumerated.typeId: IntegerEncoder(),
univ.Real.typeId: RealEncoder(),
# Sequence & Set have same tags as SequenceOf & SetOf
Expand Down
2 changes: 2 additions & 0 deletions pyasn1/codec/native/decoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ def __call__(self, pyObject, asn1Spec, decodeFun=None, **options):
univ.Boolean.tagSet: AbstractScalarPayloadDecoder(),
univ.BitString.tagSet: BitStringPayloadDecoder(),
univ.OctetString.tagSet: AbstractScalarPayloadDecoder(),
univ.RelativeOID.tagSet: AbstractScalarPayloadDecoder(),
univ.Null.tagSet: AbstractScalarPayloadDecoder(),
univ.ObjectIdentifier.tagSet: AbstractScalarPayloadDecoder(),
univ.Enumerated.tagSet: AbstractScalarPayloadDecoder(),
Expand Down Expand Up @@ -102,6 +103,7 @@ def __call__(self, pyObject, asn1Spec, decodeFun=None, **options):
univ.OctetString.typeId: AbstractScalarPayloadDecoder(),
univ.Null.typeId: AbstractScalarPayloadDecoder(),
univ.ObjectIdentifier.typeId: AbstractScalarPayloadDecoder(),
univ.RelativeOID.typeId: AbstractScalarPayloadDecoder(),
univ.Enumerated.typeId: AbstractScalarPayloadDecoder(),
univ.Real.typeId: AbstractScalarPayloadDecoder(),
# ambiguous base types
Expand Down
6 changes: 6 additions & 0 deletions pyasn1/codec/native/encoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ def encode(self, value, encodeFun, **options):
return str(value)


class RelativeOIDEncoder(AbstractItemEncoder):
def encode(self, value, encodeFun, **options):
return str(value)


class RealEncoder(AbstractItemEncoder):
def encode(self, value, encodeFun, **options):
return float(value)
Expand Down Expand Up @@ -108,6 +113,7 @@ def encode(self, value, encodeFun, **options):
univ.Integer.tagSet: IntegerEncoder(),
univ.BitString.tagSet: BitStringEncoder(),
univ.OctetString.tagSet: OctetStringEncoder(),
univ.RelativeOID.tagSet: RelativeOIDEncoder(),
univ.Null.tagSet: NullEncoder(),
univ.ObjectIdentifier.tagSet: ObjectIdentifierEncoder(),
univ.Enumerated.tagSet: IntegerEncoder(),
Expand Down
128 changes: 128 additions & 0 deletions pyasn1/type/univ.py
Original file line number Diff line number Diff line change
Expand Up @@ -1244,6 +1244,134 @@ def prettyOut(self, value):
return '.'.join([str(x) for x in value])


class RelativeOID(base.SimpleAsn1Type):
"""Create |ASN.1| schema or value object.
|ASN.1| class is based on :class:`~pyasn1.type.base.SimpleAsn1Type`, its
objects are immutable and duck-type Python :class:`tuple` objects
(tuple of non-negative integers).
Keyword Args
------------
value: :class:`tuple`, :class:`str` or |ASN.1| object
Python sequence of :class:`int` or :class:`str` literal or |ASN.1| object.
If `value` is not given, schema object will be created.
tagSet: :py:class:`~pyasn1.type.tag.TagSet`
Object representing non-default ASN.1 tag(s)
subtypeSpec: :py:class:`~pyasn1.type.constraint.ConstraintsIntersection`
Object representing non-default ASN.1 subtype constraint(s). Constraints
verification for |ASN.1| type occurs automatically on object
instantiation.
Raises
------
~pyasn1.error.ValueConstraintError, ~pyasn1.error.PyAsn1Error
On constraint violation or bad initializer.
Examples
--------
.. code-block:: python
class RelOID(RelativeOID):
'''
ASN.1 specification:
id-pad-null RELATIVE-OID ::= { 0 }
id-pad-once RELATIVE-OID ::= { 5 6 }
id-pad-twice RELATIVE-OID ::= { 5 6 7 }
'''
id_pad_null = RelOID('0')
id_pad_once = RelOID('5.6')
id_pad_twice = id_pad_once + (7,)
"""
#: Set (on class, not on instance) or return a
#: :py:class:`~pyasn1.type.tag.TagSet` object representing ASN.1 tag(s)
#: associated with |ASN.1| type.
tagSet = tag.initTagSet(
tag.Tag(tag.tagClassUniversal, tag.tagFormatSimple, 0x0d)
)

#: Set (on class, not on instance) or return a
#: :py:class:`~pyasn1.type.constraint.ConstraintsIntersection` object
#: imposing constraints on |ASN.1| type initialization values.
subtypeSpec = constraint.ConstraintsIntersection()

# Optimization for faster codec lookup
typeId = base.SimpleAsn1Type.getTypeId()

def __add__(self, other):
return self.clone(self._value + other)

def __radd__(self, other):
return self.clone(other + self._value)

def asTuple(self):
return self._value

# Sequence object protocol

def __len__(self):
return len(self._value)

def __getitem__(self, i):
if i.__class__ is slice:
return self.clone(self._value[i])
else:
return self._value[i]

def __iter__(self):
return iter(self._value)

def __contains__(self, value):
return value in self._value

def index(self, suboid):
return self._value.index(suboid)

def isPrefixOf(self, other):
"""Indicate if this |ASN.1| object is a prefix of other |ASN.1| object.
Parameters
----------
other: |ASN.1| object
|ASN.1| object
Returns
-------
: :class:`bool`
:obj:`True` if this |ASN.1| object is a parent (e.g. prefix) of the other |ASN.1| object
or :obj:`False` otherwise.
"""
l = len(self)
if l <= len(other):
if self._value[:l] == other[:l]:
return True
return False

def prettyIn(self, value):
if isinstance(value, RelativeOID):
return tuple(value)
elif octets.isStringType(value):
if '-' in value:
raise error.PyAsn1Error(
'Malformed RELATIVE-OID %s at %s: %s' % (value, self.__class__.__name__, sys.exc_info()[1])
)
try:
return tuple([int(subOid) for subOid in value.split('.') if subOid])
except ValueError:
raise error.PyAsn1Error(
'Malformed RELATIVE-OID %s at %s: %s' % (value, self.__class__.__name__, sys.exc_info()[1])
)

try:
tupleOfInts = tuple([int(subOid) for subOid in value if subOid >= 0])

except (ValueError, TypeError):
raise error.PyAsn1Error(
'Malformed RELATIVE-OID %s at %s: %s' % (value, self.__class__.__name__, sys.exc_info()[1])
)

if len(tupleOfInts) == len(value):
return tupleOfInts

raise error.PyAsn1Error('Malformed RELATIVE-OID %s at %s' % (value, self.__class__.__name__))

def prettyOut(self, value):
return '.'.join([str(x) for x in value])


class Real(base.SimpleAsn1Type):
"""Create |ASN.1| schema or value object.
Expand Down
70 changes: 70 additions & 0 deletions tests/codec/ber/test_decoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,76 @@ def testLarge2(self):
) == ((2, 999, 18446744073709551535184467440737095), null)


class RelativeOIDDecoderTestCase(BaseTestCase):
def testThree(self):
assert decoder.decode(
ints2octs((13, 3, 5, 6, 7))
) == ((5, 6, 7), null)

def testTwo(self):
assert decoder.decode(
ints2octs((13, 2, 5, 6))
) == ((5, 6), null)

def testOne(self):
obj, rest = decoder.decode(ints2octs((13, 1, 39)))
assert str(obj) == '39'
assert rest == null

def testNonLeading0x80(self):
assert decoder.decode(
ints2octs((13, 5, 85, 4, 129, 128, 0)),
) == ((85, 4, 16384), null)

def testLeading0x80(self):
try:
decoder.decode(
ints2octs((13, 5, 85, 4, 128, 129, 0))
)
except error.PyAsn1Error:
pass
else:
assert 0, 'Leading 0x80 tolerated'

def testTagFormat(self):
try:
decoder.decode(ints2octs((38, 1, 239)))
except error.PyAsn1Error:
pass
else:
assert 0, 'wrong tagFormat worked out'

def testZeroLength(self):
try:
decoder.decode(ints2octs((13, 0, 0)))
except error.PyAsn1Error:
pass
else:
assert 0, 'zero length tolerated'

def testIndefiniteLength(self):
try:
decoder.decode(ints2octs((13, 128, 0)))
except error.PyAsn1Error:
pass
else:
assert 0, 'indefinite length tolerated'

def testReservedLength(self):
try:
decoder.decode(ints2octs((13, 255, 0)))
except error.PyAsn1Error:
pass
else:
assert 0, 'reserved length tolerated'

def testLarge(self):
assert decoder.decode(
ints2octs((0x0D, 0x13, 0x88, 0x37, 0x83, 0xC6, 0xDF, 0xD4, 0xCC, 0xB3, 0xFF, 0xFF, 0xFE, 0xF0, 0xB8, 0xD6, 0xB8, 0xCB, 0xE2, 0xB6, 0x47))
) == ((1079, 18446744073709551535184467440737095), null)



class RealDecoderTestCase(BaseTestCase):
def testChar(self):
assert decoder.decode(
Expand Down
Loading

0 comments on commit 64ccb34

Please sign in to comment.