diff --git a/CHANGES.rst b/CHANGES.rst index 1ea3105b..85b6bbd5 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -22,6 +22,8 @@ Revision 0.5.0, released XX-03-2020 `StreamingDecoder` class. Previously published API is implemented as a thin wrapper on top of that ensuring backward compatibility. +- Added support for previously missing `RELATIVE-OID` construct. + Revision 0.4.9, released XX-03-2020 ----------------------------------- diff --git a/THANKS.txt b/THANKS.txt index 890ff725..48556ba6 100644 --- a/THANKS.txt +++ b/THANKS.txt @@ -8,3 +8,4 @@ Alex Gaynor Geoffrey Thomas Daniel Bratell Kim Gräsman +Russ Housley diff --git a/pyasn1/codec/ber/decoder.py b/pyasn1/codec/ber/decoder.py index f5605c29..b61a9f51 100644 --- a/pyasn1/codec/ber/decoder.py +++ b/pyasn1/codec/ber/decoder.py @@ -456,6 +456,56 @@ def valueDecoder(self, substrate, asn1Spec, yield self._createComponent(asn1Spec, tagSet, oid, **options) +class RelativeOIDPayloadDecoder(AbstractSimplePayloadDecoder): + protoComponent = univ.RelativeOID(()) + + def valueDecoder(self, substrate, asn1Spec, + tagSet=None, length=None, state=None, + decodeFun=None, substrateFun=None, + **options): + if tagSet[0].tagFormat != tag.tagFormatSimple: + raise error.PyAsn1Error('Simple tag format expected') + + for chunk in readFromStream(substrate, length, options): + if isinstance(chunk, SubstrateUnderrunError): + yield chunk + + if not chunk: + raise error.PyAsn1Error('Empty substrate') + + chunk = octs2ints(chunk) + + reloid = () + index = 0 + substrateLen = len(chunk) + while index < substrateLen: + subId = chunk[index] + index += 1 + if subId < 128: + reloid += (subId,) + elif subId > 128: + # Construct subid from a number of octets + nextSubId = subId + subId = 0 + while nextSubId >= 128: + subId = (subId << 7) + (nextSubId & 0x7F) + if index >= substrateLen: + raise error.SubstrateUnderrunError( + 'Short substrate for sub-OID past %s' % (reloid,) + ) + nextSubId = chunk[index] + index += 1 + reloid += ((subId << 7) + nextSubId,) + elif subId == 128: + # ASN.1 spec forbids leading zeros (0x80) in OID + # encoding, tolerating it opens a vulnerability. See + # https://www.esat.kuleuven.be/cosic/publications/article-1432.pdf + # page 7 + raise error.PyAsn1Error('Invalid octet 0x80 in RELATIVE-OID encoding') + + yield self._createComponent(asn1Spec, tagSet, reloid, **options) + + class RealPayloadDecoder(AbstractSimplePayloadDecoder): protoComponent = univ.Real() @@ -1417,6 +1467,7 @@ class UTCTimePayloadDecoder(OctetStringPayloadDecoder): univ.OctetString.tagSet: OctetStringPayloadDecoder(), univ.Null.tagSet: NullPayloadDecoder(), univ.ObjectIdentifier.tagSet: ObjectIdentifierPayloadDecoder(), + univ.RelativeOID.tagSet: RelativeOIDPayloadDecoder(), univ.Enumerated.tagSet: IntegerPayloadDecoder(), univ.Real.tagSet: RealPayloadDecoder(), univ.Sequence.tagSet: SequenceOrSequenceOfPayloadDecoder(), # conflicts with SequenceOf diff --git a/pyasn1/codec/ber/encoder.py b/pyasn1/codec/ber/encoder.py index 6cce134a..dca017d0 100644 --- a/pyasn1/codec/ber/encoder.py +++ b/pyasn1/codec/ber/encoder.py @@ -352,6 +352,39 @@ def encodeValue(self, value, asn1Spec, encodeFun, **options): return octets, False, False +class RelativeOIDEncoder(AbstractItemEncoder): + supportIndefLenMode = False + + def encodeValue(self, value, asn1Spec, encodeFun, **options): + if asn1Spec is not None: + value = asn1Spec.clone(value) + + octets = () + + # Cycle through subIds + for subOid in value.asTuple(): + if 0 <= subOid <= 127: + # Optimize for the common case + octets += (subOid,) + + elif subOid > 127: + # Pack large Sub-Object IDs + res = (subOid & 0x7f,) + subOid >>= 7 + + while subOid: + res = (0x80 | (subOid & 0x7f),) + res + subOid >>= 7 + + # Add packed Sub-Object ID to resulted RELATIVE-OID + octets += res + + else: + raise error.PyAsn1Error('Negative RELATIVE-OID arc %s at %s' % (subOid, value)) + + return octets, False, False + + class RealEncoder(AbstractItemEncoder): supportIndefLenMode = 0 binEncBase = 2 # set to None to choose encoding base automatically @@ -714,6 +747,7 @@ def encodeValue(self, value, asn1Spec, encodeFun, **options): univ.OctetString.tagSet: OctetStringEncoder(), univ.Null.tagSet: NullEncoder(), univ.ObjectIdentifier.tagSet: ObjectIdentifierEncoder(), + univ.RelativeOID.tagSet: RelativeOIDEncoder(), univ.Enumerated.tagSet: IntegerEncoder(), univ.Real.tagSet: RealEncoder(), # Sequence & Set have same tags as SequenceOf & SetOf @@ -746,6 +780,7 @@ def encodeValue(self, value, asn1Spec, encodeFun, **options): univ.OctetString.typeId: OctetStringEncoder(), univ.Null.typeId: NullEncoder(), univ.ObjectIdentifier.typeId: ObjectIdentifierEncoder(), + univ.RelativeOID.typeId: RelativeOIDEncoder(), univ.Enumerated.typeId: IntegerEncoder(), univ.Real.typeId: RealEncoder(), # Sequence & Set have same tags as SequenceOf & SetOf diff --git a/pyasn1/codec/native/decoder.py b/pyasn1/codec/native/decoder.py index 4dca0d04..d73b112d 100644 --- a/pyasn1/codec/native/decoder.py +++ b/pyasn1/codec/native/decoder.py @@ -71,6 +71,7 @@ def __call__(self, pyObject, asn1Spec, decodeFun=None, **options): univ.OctetString.tagSet: AbstractScalarPayloadDecoder(), univ.Null.tagSet: AbstractScalarPayloadDecoder(), univ.ObjectIdentifier.tagSet: AbstractScalarPayloadDecoder(), + univ.RelativeOID.tagSet: AbstractScalarPayloadDecoder(), univ.Enumerated.tagSet: AbstractScalarPayloadDecoder(), univ.Real.tagSet: AbstractScalarPayloadDecoder(), univ.Sequence.tagSet: SequenceOrSetPayloadDecoder(), # conflicts with SequenceOf @@ -102,6 +103,7 @@ def __call__(self, pyObject, asn1Spec, decodeFun=None, **options): univ.OctetString.typeId: AbstractScalarPayloadDecoder(), univ.Null.typeId: AbstractScalarPayloadDecoder(), univ.ObjectIdentifier.typeId: AbstractScalarPayloadDecoder(), + univ.RelativeOID.typeId: AbstractScalarPayloadDecoder(), univ.Enumerated.typeId: AbstractScalarPayloadDecoder(), univ.Real.typeId: AbstractScalarPayloadDecoder(), # ambiguous base types diff --git a/pyasn1/codec/native/encoder.py b/pyasn1/codec/native/encoder.py index 6fbedb6d..f4262d54 100644 --- a/pyasn1/codec/native/encoder.py +++ b/pyasn1/codec/native/encoder.py @@ -59,6 +59,11 @@ def encode(self, value, encodeFun, **options): return str(value) +class RelativeOIDEncoder(AbstractItemEncoder): + def encode(self, value, encodeFun, **options): + return str(value) + + class RealEncoder(AbstractItemEncoder): def encode(self, value, encodeFun, **options): return float(value) @@ -110,6 +115,7 @@ def encode(self, value, encodeFun, **options): univ.OctetString.tagSet: OctetStringEncoder(), univ.Null.tagSet: NullEncoder(), univ.ObjectIdentifier.tagSet: ObjectIdentifierEncoder(), + univ.RelativeOID.tagSet: RelativeOIDEncoder(), univ.Enumerated.tagSet: IntegerEncoder(), univ.Real.tagSet: RealEncoder(), # Sequence & Set have same tags as SequenceOf & SetOf @@ -143,6 +149,7 @@ def encode(self, value, encodeFun, **options): univ.OctetString.typeId: OctetStringEncoder(), univ.Null.typeId: NullEncoder(), univ.ObjectIdentifier.typeId: ObjectIdentifierEncoder(), + univ.RelativeOID.typeId: RelativeOIDEncoder(), univ.Enumerated.typeId: IntegerEncoder(), univ.Real.typeId: RealEncoder(), # Sequence & Set have same tags as SequenceOf & SetOf diff --git a/pyasn1/type/univ.py b/pyasn1/type/univ.py index 488241fd..01daad2c 100644 --- a/pyasn1/type/univ.py +++ b/pyasn1/type/univ.py @@ -1244,6 +1244,144 @@ def prettyOut(self, value): return '.'.join([str(x) for x in value]) +class RelativeOID(base.SimpleAsn1Type): + """Create |ASN.1| schema or value object. + + |ASN.1| class is based on :class:`~pyasn1.type.base.SimpleAsn1Type`, its + objects are immutable and duck-type Python :class:`tuple` objects + (tuple of non-negative integers). + + Keyword Args + ------------ + value: :class:`tuple`, :class:`str` or |ASN.1| object + Python sequence of :class:`int` or :class:`str` literal or |ASN.1| object. + If `value` is not given, schema object will be created. + + tagSet: :py:class:`~pyasn1.type.tag.TagSet` + Object representing non-default ASN.1 tag(s) + + subtypeSpec: :py:class:`~pyasn1.type.constraint.ConstraintsIntersection` + Object representing non-default ASN.1 subtype constraint(s). Constraints + verification for |ASN.1| type occurs automatically on object + instantiation. + + Raises + ------ + ~pyasn1.error.ValueConstraintError, ~pyasn1.error.PyAsn1Error + On constraint violation or bad initializer. + + Examples + -------- + .. code-block:: python + + class RelOID(RelativeOID): + ''' + ASN.1 specification: + + id-pad-null RELATIVE-OID ::= { 0 } + id-pad-once RELATIVE-OID ::= { 5 6 } + id-pad-twice RELATIVE-OID ::= { 5 6 7 } + ''' + id_pad_null = RelOID('0') + id_pad_once = RelOID('5.6') + id_pad_twice = id_pad_once + (7,) + """ + #: Set (on class, not on instance) or return a + #: :py:class:`~pyasn1.type.tag.TagSet` object representing ASN.1 tag(s) + #: associated with |ASN.1| type. + tagSet = tag.initTagSet( + tag.Tag(tag.tagClassUniversal, tag.tagFormatSimple, 0x0d) + ) + + #: Set (on class, not on instance) or return a + #: :py:class:`~pyasn1.type.constraint.ConstraintsIntersection` object + #: imposing constraints on |ASN.1| type initialization values. + subtypeSpec = constraint.ConstraintsIntersection() + + # Optimization for faster codec lookup + typeId = base.SimpleAsn1Type.getTypeId() + + def __add__(self, other): + return self.clone(self._value + other) + + def __radd__(self, other): + return self.clone(other + self._value) + + def asTuple(self): + return self._value + + # Sequence object protocol + + def __len__(self): + return len(self._value) + + def __getitem__(self, i): + if i.__class__ is slice: + return self.clone(self._value[i]) + else: + return self._value[i] + + def __iter__(self): + return iter(self._value) + + def __contains__(self, value): + return value in self._value + + def index(self, suboid): + return self._value.index(suboid) + + def isPrefixOf(self, other): + """Indicate if this |ASN.1| object is a prefix of other |ASN.1| object. + + Parameters + ---------- + other: |ASN.1| object + |ASN.1| object + + Returns + ------- + : :class:`bool` + :obj:`True` if this |ASN.1| object is a parent (e.g. prefix) of the other |ASN.1| object + or :obj:`False` otherwise. + """ + l = len(self) + if l <= len(other): + if self._value[:l] == other[:l]: + return True + return False + + def prettyIn(self, value): + if isinstance(value, RelativeOID): + return tuple(value) + elif octets.isStringType(value): + if '-' in value: + raise error.PyAsn1Error( + 'Malformed RELATIVE-OID %s at %s: %s' % (value, self.__class__.__name__, sys.exc_info()[1]) + ) + try: + return tuple([int(subOid) for subOid in value.split('.') if subOid]) + except ValueError: + raise error.PyAsn1Error( + 'Malformed RELATIVE-OID %s at %s: %s' % (value, self.__class__.__name__, sys.exc_info()[1]) + ) + + try: + tupleOfInts = tuple([int(subOid) for subOid in value if subOid >= 0]) + + except (ValueError, TypeError): + raise error.PyAsn1Error( + 'Malformed RELATIVE-OID %s at %s: %s' % (value, self.__class__.__name__, sys.exc_info()[1]) + ) + + if len(tupleOfInts) == len(value): + return tupleOfInts + + raise error.PyAsn1Error('Malformed RELATIVE-OID %s at %s' % (value, self.__class__.__name__)) + + def prettyOut(self, value): + return '.'.join([str(x) for x in value]) + + class Real(base.SimpleAsn1Type): """Create |ASN.1| schema or value object. diff --git a/tests/codec/ber/test_decoder.py b/tests/codec/ber/test_decoder.py index b6b1ae92..8fe5bc16 100644 --- a/tests/codec/ber/test_decoder.py +++ b/tests/codec/ber/test_decoder.py @@ -409,6 +409,75 @@ def testLarge2(self): ) == ((2, 999, 18446744073709551535184467440737095), null) +class RelativeOIDDecoderTestCase(BaseTestCase): + def testThree(self): + assert decoder.decode( + ints2octs((13, 3, 5, 6, 7)) + ) == ((5, 6, 7), null) + + def testTwo(self): + assert decoder.decode( + ints2octs((13, 2, 5, 6)) + ) == ((5, 6), null) + + def testOne(self): + obj, rest = decoder.decode(ints2octs((13, 1, 39))) + assert str(obj) == '39' + assert rest == null + + def testNonLeading0x80(self): + assert decoder.decode( + ints2octs((13, 5, 85, 4, 129, 128, 0)), + ) == ((85, 4, 16384), null) + + def testLeading0x80(self): + try: + decoder.decode( + ints2octs((13, 5, 85, 4, 128, 129, 0)) + ) + except error.PyAsn1Error: + pass + else: + assert 0, 'Leading 0x80 tolerated' + + def testTagFormat(self): + try: + decoder.decode(ints2octs((38, 1, 239))) + except error.PyAsn1Error: + pass + else: + assert 0, 'wrong tagFormat worked out' + + def testZeroLength(self): + try: + decoder.decode(ints2octs((13, 0, 0))) + except error.PyAsn1Error: + pass + else: + assert 0, 'zero length tolerated' + + def testIndefiniteLength(self): + try: + decoder.decode(ints2octs((13, 128, 0))) + except error.PyAsn1Error: + pass + else: + assert 0, 'indefinite length tolerated' + + def testReservedLength(self): + try: + decoder.decode(ints2octs((13, 255, 0))) + except error.PyAsn1Error: + pass + else: + assert 0, 'reserved length tolerated' + + def testLarge(self): + assert decoder.decode( + ints2octs((0x0D, 0x13, 0x88, 0x37, 0x83, 0xC6, 0xDF, 0xD4, 0xCC, 0xB3, 0xFF, 0xFF, 0xFE, 0xF0, 0xB8, 0xD6, 0xB8, 0xCB, 0xE2, 0xB6, 0x47)) + ) == ((1079, 18446744073709551535184467440737095), null) + + class RealDecoderTestCase(BaseTestCase): def testChar(self): assert decoder.decode( diff --git a/tests/codec/ber/test_encoder.py b/tests/codec/ber/test_encoder.py index bd4a8e23..3249cdaf 100644 --- a/tests/codec/ber/test_encoder.py +++ b/tests/codec/ber/test_encoder.py @@ -350,6 +350,30 @@ def testLarge2(self): 0xB8, 0xCB, 0xE2, 0xB6, 0x47)) +class RelativeOIDEncoderTestCase(BaseTestCase): + def testOne(self): + assert encoder.encode( + univ.RelativeOID((39,)) + ) == ints2octs((13, 1, 39)) + + def testTwo(self): + assert encoder.encode( + univ.RelativeOID((5, 6)) + ) == ints2octs((13, 2, 5, 6)) + + def testThree(self): + assert encoder.encode( + univ.RelativeOID((5, 6, 7)) + ) == ints2octs((13, 3, 5, 6, 7)) + + def testLarge(self): + assert encoder.encode( + univ.RelativeOID((1079, 18446744073709551535184467440737095)) + ) == ints2octs((0x0D, 0x13, 0x88, 0x37, 0x83, 0xC6, 0xDF, 0xD4, 0xCC, + 0xB3, 0xFF, 0xFF, 0xFE, 0xF0, 0xB8, 0xD6, 0xB8, 0xCB, + 0xE2, 0xB6, 0x47)) + + class ObjectIdentifierWithSchemaEncoderTestCase(BaseTestCase): def testOne(self): assert encoder.encode( diff --git a/tests/type/test_univ.py b/tests/type/test_univ.py index 6d3330a0..f666acc7 100644 --- a/tests/type/test_univ.py +++ b/tests/type/test_univ.py @@ -944,6 +944,83 @@ def testValuePickling(self): assert new_asn1 == (2, 3, 1, 1, 2) +class RelativeOID(BaseTestCase): + def testStr(self): + assert str(univ.RelativeOID((1, 3, 6))) == '1.3.6', 'str() fails' + + def testRepr(self): + assert '1.3.6' in repr(univ.RelativeOID('1.3.6')) + + def testEq(self): + assert univ.RelativeOID((1, 3, 6)) == (1, 3, 6), '__cmp__() fails' + + def testAdd(self): + assert univ.RelativeOID((1, 3)) + (6,) == (1, 3, 6), '__add__() fails' + + def testRadd(self): + assert (1,) + univ.RelativeOID((3, 6)) == (1, 3, 6), '__radd__() fails' + + def testLen(self): + assert len(univ.RelativeOID((1, 3))) == 2, '__len__() fails' + + def testPrefix(self): + o = univ.RelativeOID('1.3.6') + assert o.isPrefixOf((1, 3, 6)), 'isPrefixOf() fails' + assert o.isPrefixOf((1, 3, 6, 1)), 'isPrefixOf() fails' + assert not o.isPrefixOf((1, 3)), 'isPrefixOf() fails' + + def testInput1(self): + assert univ.RelativeOID('1.3.6') == (1, 3, 6), 'prettyIn() fails' + + def testInput2(self): + assert univ.RelativeOID((1, 3, 6)) == (1, 3, 6), 'prettyIn() fails' + + def testInput3(self): + assert univ.RelativeOID(univ.RelativeOID('1.3') + (6,)) == (1, 3, 6), 'prettyIn() fails' + + def testUnicode(self): + s = '1.3.6' + if sys.version_info[0] < 3: + s = s.decode() + assert univ.RelativeOID(s) == (1, 3, 6), 'unicode init fails' + + def testTag(self): + assert univ.RelativeOID().tagSet == tag.TagSet( + (), + tag.Tag(tag.tagClassUniversal, tag.tagFormatSimple, 0x0d) + ) + + def testContains(self): + s = univ.RelativeOID('1.3.6.1234.99999') + assert 1234 in s + assert 4321 not in s + + def testStaticDef(self): + + class RelOID(univ.ObjectIdentifier): + pass + + assert str(RelOID((1, 3, 6))) == '1.3.6' + + +class RelativeOIDPicklingTestCase(unittest.TestCase): + + def testSchemaPickling(self): + old_asn1 = univ.RelativeOID() + serialised = pickle.dumps(old_asn1) + assert serialised + new_asn1 = pickle.loads(serialised) + assert type(new_asn1) == univ.RelativeOID + assert old_asn1.isSameTypeWith(new_asn1) + + def testValuePickling(self): + old_asn1 = univ.RelativeOID('2.3.1.1.2') + serialised = pickle.dumps(old_asn1) + assert serialised + new_asn1 = pickle.loads(serialised) + assert new_asn1 == (2, 3, 1, 1, 2) + + class SequenceOf(BaseTestCase): def setUp(self): BaseTestCase.setUp(self)