From 35fc49e26a5767c1fbc488cf470b7f429721be54 Mon Sep 17 00:00:00 2001 From: Hisanobu Okuda Date: Mon, 18 Oct 2021 23:04:31 +0900 Subject: [PATCH] Add support for RFC 4511 --- CHANGES.txt | 1 + pyasn1_modules/rfc4511.py | 662 ++++++++++++++++++++++++++++++++++++++ tests/__main__.py | 1 + tests/test_rfc4511.py | 303 +++++++++++++++++ 4 files changed, 967 insertions(+) create mode 100644 pyasn1_modules/rfc4511.py create mode 100644 tests/test_rfc4511.py diff --git a/CHANGES.txt b/CHANGES.txt index b9535c0..aae5135 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -41,6 +41,7 @@ Revision 0.3.0, released XX-03-2020 - Add RFC8708 providing HSS/LMS Hash-based Signature Algorithm for CMS - Advance copyright statement to year 2020 - Add RFC8769 providing CBOR and CBOR Sequence content types for CMS +- Add RFC4511 providing Lightweight Directory Access Protocol (LDAP) Revision 0.2.8, released 16-11-2019 ----------------------------------- diff --git a/pyasn1_modules/rfc4511.py b/pyasn1_modules/rfc4511.py new file mode 100644 index 0000000..903f9ee --- /dev/null +++ b/pyasn1_modules/rfc4511.py @@ -0,0 +1,662 @@ +# This file is being contributed to pyasn1-modules software. +# +# Created by Hisanobu Okuda with assistance from asn1ate v.0.6.0. +# +# Copyright (c) 2021, Hisanobu Okuda +# License: http://snmplabs.com/pyasn1/license.html +# +# ASN.1 source from: +# https://www.rfc-editor.org/rfc/rfc4511.txt + + +from pyasn1.type import univ, char, namedtype, namedval, tag, constraint + + +maxInt = univ.Integer(2147483647) +MAX = float('inf') + + +class MessageID(univ.Integer): + pass + + +MessageID.subtypeSpec = constraint.ValueRangeConstraint(0, maxInt) + + +class AbandonRequest(MessageID): + pass + + +AbandonRequest.tagSet = MessageID.tagSet.tagImplicitly( + tag.Tag(tag.tagClassApplication, tag.tagFormatSimple, 16)) + + +class AttributeValue(univ.OctetString): + pass + + +class LDAPString(univ.OctetString): + pass + + +class AttributeDescription(LDAPString): + pass + + +class PartialAttribute(univ.Sequence): + pass + + +PartialAttribute.componentType = namedtype.NamedTypes( + namedtype.NamedType('type', AttributeDescription()), + namedtype.NamedType('vals', univ.SetOf(componentType=AttributeValue()))) + + +class Attribute(PartialAttribute): + pass + + +class AttributeList(univ.SequenceOf): + pass + + +AttributeList.componentType = Attribute() + + +class LDAPDN(LDAPString): + pass + + +class AddRequest(univ.Sequence): + pass + + +AddRequest.tagSet = univ.Sequence.tagSet.tagImplicitly( + tag.Tag(tag.tagClassApplication, tag.tagFormatConstructed, 8)) +AddRequest.componentType = namedtype.NamedTypes( + namedtype.NamedType('entry', LDAPDN()), + namedtype.NamedType('attributes', AttributeList())) + + +class URI(LDAPString): + pass + + +class Referral(univ.SequenceOf): + pass + + +Referral.componentType = URI() +Referral.subtypeSpec = constraint.ValueSizeConstraint(1, MAX) + + +class LDAPResult(univ.Sequence): + pass + + +LDAPResult.componentType = namedtype.NamedTypes( + namedtype.NamedType('resultCode', univ.Enumerated(namedValues=namedval.NamedValues( + ('success', 0), + ('operationsError', 1), + ('protocolError', 2), + ('timeLimitExceeded', 3), + ('sizeLimitExceeded', 4), + ('compareFalse', 5), + ('compareTrue', 6), + ('authMethodNotSupported', 7), + ('strongerAuthRequired', 8), + ('referral', 10), + ('adminLimitExceeded', 11), + ('unavailableCriticalExtension', 12), + ('confidentialityRequired', 13), + ('saslBindInProgress', 14), + ('noSuchAttribute', 16), + ('undefinedAttributeType', 17), + ('inappropriateMatching', 18), + ('constraintViolation', 19), + ('attributeOrValueExists', 20), + ('invalidAttributeSyntax', 21), + ('noSuchObject', 32), + ('aliasProblem', 33), + ('invalidDNSyntax', 34), + ('aliasDereferencingProblem', 36), + ('inappropriateAuthentication', 48), + ('invalidCredentials', 49), + ('insufficientAccessRights', 50), + ('busy', 51), + ('unavailable', 52), + ('unwillingToPerform', 53), + ('loopDetect', 54), + ('namingViolation', 64), + ('objectClassViolation', 65), + ('notAllowedOnNonLeaf', 66), + ('notAllowedOnRDN', 67), + ('entryAlreadyExists', 68), + ('objectClassModsProhibited', 69), + ('affectsMultipleDSAs', 71), + ('other', 80)))), + namedtype.NamedType('matchedDN', LDAPDN()), + namedtype.NamedType('diagnosticMessage', LDAPString()), + namedtype.OptionalNamedType('referral', Referral().subtype( + implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 3)))) + + +class AddResponse(LDAPResult): + pass + + +AddResponse.tagSet = LDAPResult.tagSet.tagImplicitly( + tag.Tag(tag.tagClassApplication, tag.tagFormatConstructed, 9)) + + +class AssertionValue(univ.OctetString): + pass + + +class AttributeSelection(univ.SequenceOf): + class Selector(univ.Sequence): + pass + + +AttributeSelection.componentType = LDAPString() + + +class AttributeValueAssertion(univ.Sequence): + pass + + +AttributeValueAssertion.componentType = namedtype.NamedTypes( + namedtype.NamedType('attributeDesc', AttributeDescription()), + namedtype.NamedType('assertionValue', AssertionValue())) + + +class SaslCredentials(univ.Sequence): + pass + + +SaslCredentials.componentType = namedtype.NamedTypes( + namedtype.NamedType('mechanism', LDAPString()), + namedtype.OptionalNamedType('credentials', univ.OctetString())) + + +class AuthenticationChoice(univ.Choice): + pass + + +AuthenticationChoice.componentType = namedtype.NamedTypes( + namedtype.NamedType('simple', univ.OctetString().subtype( + implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 0))), + namedtype.NamedType('sasl', SaslCredentials().subtype( + implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 3)))) + + +class BindRequest(univ.Sequence): + pass + + +BindRequest.tagSet = univ.Sequence.tagSet.tagImplicitly( + tag.Tag(tag.tagClassApplication, tag.tagFormatConstructed, 0)) +BindRequest.componentType = namedtype.NamedTypes( + namedtype.NamedType('version', univ.Integer().subtype( + subtypeSpec=constraint.ValueRangeConstraint(1, 127))), + namedtype.NamedType('name', LDAPDN()), + namedtype.NamedType('authentication', AuthenticationChoice())) + + +class BindResponse(univ.Sequence): + pass + + +BindResponse.tagSet = univ.Sequence.tagSet.tagImplicitly( + tag.Tag(tag.tagClassApplication, tag.tagFormatConstructed, 1)) +BindResponse.componentType = namedtype.NamedTypes( + namedtype.NamedType('resultCode', univ.Enumerated(namedValues=namedval.NamedValues( + ('success', 0), + ('operationsError', 1), + ('protocolError', 2), + ('timeLimitExceeded', 3), + ('sizeLimitExceeded', 4), + ('compareFalse', 5), + ('compareTrue', 6), + ('authMethodNotSupported', 7), + ('strongerAuthRequired', 8), + ('referral', 10), + ('adminLimitExceeded', 11), + ('unavailableCriticalExtension', 12), + ('confidentialityRequired', 13), + ('saslBindInProgress', 14), + ('noSuchAttribute', 16), + ('undefinedAttributeType', 17), + ('inappropriateMatching', 18), + ('constraintViolation', 19), + ('attributeOrValueExists', 20), + ('invalidAttributeSyntax', 21), + ('noSuchObject', 32), + ('aliasProblem', 33), + ('invalidDNSyntax', 34), + ('aliasDereferencingProblem', 36), + ('inappropriateAuthentication', 48), + ('invalidCredentials', 49), + ('insufficientAccessRights', 50), + ('busy', 51), + ('unavailable', 52), + ('unwillingToPerform', 53), + ('loopDetect', 54), + ('namingViolation', 64), + ('objectClassViolation', 65), + ('notAllowedOnNonLeaf', 66), + ('notAllowedOnRDN', 67), + ('entryAlreadyExists', 68), + ('objectClassModsProhibited', 69), + ('affectsMultipleDSAs', 71), + ('other', 80)))), + namedtype.NamedType('matchedDN', LDAPDN()), + namedtype.NamedType('diagnosticMessage', LDAPString()), + namedtype.OptionalNamedType('referral', Referral().subtype( + implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 3))), + namedtype.OptionalNamedType('serverSaslCreds', univ.OctetString().subtype( + implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 7)))) + + +class CompareRequest(univ.Sequence): + pass + + +CompareRequest.tagSet = univ.Sequence.tagSet.tagImplicitly( + tag.Tag(tag.tagClassApplication, tag.tagFormatConstructed, 14)) +CompareRequest.componentType = namedtype.NamedTypes( + namedtype.NamedType('entry', LDAPDN()), + namedtype.NamedType('ava', AttributeValueAssertion())) + + +class CompareResponse(LDAPResult): + pass + + +CompareResponse.tagSet = LDAPResult.tagSet.tagImplicitly( + tag.Tag(tag.tagClassApplication, tag.tagFormatConstructed, 15)) + + +class LDAPOID(univ.OctetString): + pass + + +class Control(univ.Sequence): + pass + + +Control.componentType = namedtype.NamedTypes( + namedtype.NamedType('controlType', LDAPOID()), + namedtype.DefaultedNamedType('criticality', univ.Boolean().subtype(value=0)), + namedtype.OptionalNamedType('controlValue', univ.OctetString())) + + +class Controls(univ.SequenceOf): + pass + + +Controls.componentType = Control() + + +class DelRequest(LDAPDN): + pass + + +DelRequest.tagSet = LDAPDN.tagSet.tagImplicitly( + tag.Tag(tag.tagClassApplication, tag.tagFormatSimple, 10)) + + +class DelResponse(LDAPResult): + pass + + +DelResponse.tagSet = LDAPResult.tagSet.tagImplicitly( + tag.Tag(tag.tagClassApplication, tag.tagFormatConstructed, 11)) + + +class ExtendedRequest(univ.Sequence): + pass + + +ExtendedRequest.tagSet = univ.Sequence.tagSet.tagImplicitly( + tag.Tag(tag.tagClassApplication, tag.tagFormatConstructed, 23)) +ExtendedRequest.componentType = namedtype.NamedTypes( + namedtype.NamedType('requestName', LDAPOID().subtype( + implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 0))), + namedtype.OptionalNamedType('requestValue', univ.OctetString().subtype( + implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 1)))) + + +class ExtendedResponse(univ.Sequence): + pass + + +ExtendedResponse.tagSet = univ.Sequence.tagSet.tagImplicitly( + tag.Tag(tag.tagClassApplication, tag.tagFormatConstructed, 24)) +ExtendedResponse.componentType = namedtype.NamedTypes( + namedtype.NamedType('resultCode', univ.Enumerated(namedValues=namedval.NamedValues( + ('success', 0), + ('operationsError', 1), + ('protocolError', 2), + ('timeLimitExceeded', 3), + ('sizeLimitExceeded', 4), + ('compareFalse', 5), + ('compareTrue', 6), + ('authMethodNotSupported', 7), + ('strongerAuthRequired', 8), + ('referral', 10), + ('adminLimitExceeded', 11), + ('unavailableCriticalExtension', 12), + ('confidentialityRequired', 13), + ('saslBindInProgress', 14), + ('noSuchAttribute', 16), + ('undefinedAttributeType', 17), + ('inappropriateMatching', 18), + ('constraintViolation', 19), + ('attributeOrValueExists', 20), + ('invalidAttributeSyntax', 21), + ('noSuchObject', 32), + ('aliasProblem', 33), + ('invalidDNSyntax', 34), + ('aliasDereferencingProblem', 36), + ('inappropriateAuthentication', 48), + ('invalidCredentials', 49), + ('insufficientAccessRights', 50), + ('busy', 51), + ('unavailable', 52), + ('unwillingToPerform', 53), + ('loopDetect', 54), + ('namingViolation', 64), + ('objectClassViolation', 65), + ('notAllowedOnNonLeaf', 66), + ('notAllowedOnRDN', 67), + ('entryAlreadyExists', 68), + ('objectClassModsProhibited', 69), + ('affectsMultipleDSAs', 71), + ('other', 80)))), + namedtype.NamedType('matchedDN', LDAPDN()), + namedtype.NamedType('diagnosticMessage', LDAPString()), + namedtype.OptionalNamedType('referral', Referral().subtype( + implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 3))), + namedtype.OptionalNamedType('responseName', LDAPOID().subtype( + implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 10))), + namedtype.OptionalNamedType('responseValue', univ.OctetString().subtype( + implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 11))) +) + + +class MatchingRuleId(LDAPString): + pass + + +class MatchingRuleAssertion(univ.Sequence): + pass + + +MatchingRuleAssertion.componentType = namedtype.NamedTypes( + namedtype.OptionalNamedType('matchingRule', MatchingRuleId().subtype( + implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 1))), + namedtype.OptionalNamedType('type', AttributeDescription().subtype( + implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 2))), + namedtype.NamedType('matchValue', AssertionValue().subtype( + implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 3))), + namedtype.DefaultedNamedType('dnAttributes', univ.Boolean().subtype( + implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 4)).subtype(value=0)) +) + + +class SubstringFilter(univ.Sequence): + pass + + +SubstringFilter.componentType = namedtype.NamedTypes( + namedtype.NamedType('type', AttributeDescription()), + namedtype.NamedType('substrings', univ.SequenceOf( + componentType=univ.Choice(componentType=namedtype.NamedTypes( + namedtype.NamedType('initial', AssertionValue().subtype( + implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 0))), + namedtype.NamedType('any', AssertionValue().subtype( + implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 1))), + namedtype.NamedType('final', AssertionValue().subtype( + implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 2)))))).subtype( + subtypeSpec=constraint.ValueSizeConstraint(1, MAX)))) + + +# Tricky hack to handle recursive filter definitions: +class MetaFilter(type): + _recursion = 0 + _max_recursion = 4 # up to 5 levels + + def __new__(self, name, bases, dictionary): + return MetaFilter.get_class() + + @classmethod + def get_class(cls): + recursion = cls._recursion + cls_name = "Filter" + str(recursion) + cls._recursion += 1 + + if recursion < cls._max_recursion: + sub_filter_class = MetaFilter.get_class() + cls = type(cls_name, (univ.Choice,), { + "componentType": namedtype.NamedTypes( + namedtype.NamedType('and', univ.SetOf(componentType=sub_filter_class()).subtype( + subtypeSpec=constraint.ValueSizeConstraint(1, MAX) + ).subtype( + implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 0))), + namedtype.NamedType('or', univ.SetOf(componentType=sub_filter_class()).subtype( + subtypeSpec=constraint.ValueSizeConstraint(1, MAX) + ).subtype( + implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 1))), + namedtype.NamedType('not', sub_filter_class().subtype( + implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 2))), + namedtype.NamedType('equalityMatch', AttributeValueAssertion().subtype( + implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 3))), + namedtype.NamedType('substrings', SubstringFilter().subtype( + implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 4))), + namedtype.NamedType('greaterOrEqual', AttributeValueAssertion().subtype( + implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 5))), + namedtype.NamedType('lessOrEqual', AttributeValueAssertion().subtype( + implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 6))), + namedtype.NamedType('present', AttributeDescription().subtype( + implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 7))), + namedtype.NamedType('approxMatch', AttributeValueAssertion().subtype( + implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 8))), + namedtype.NamedType('extensibleMatch', MatchingRuleAssertion().subtype( + implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 9))) + ), + }) + else: + cls = type(cls_name + "last", (univ.Choice,), { + "componentType": namedtype.NamedTypes( + namedtype.NamedType('equalityMatch', AttributeValueAssertion().subtype( + implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 3))), + namedtype.NamedType('substrings', SubstringFilter().subtype( + implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 4))), + namedtype.NamedType('greaterOrEqual', AttributeValueAssertion().subtype( + implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 5))), + namedtype.NamedType('lessOrEqual', AttributeValueAssertion().subtype( + implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 6))), + namedtype.NamedType('present', AttributeDescription().subtype( + implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 7))), + namedtype.NamedType('approxMatch', AttributeValueAssertion().subtype( + implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 8))), + namedtype.NamedType('extensibleMatch', MatchingRuleAssertion().subtype( + implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 9))) + ), + }) + return cls + + +# Tricky hack to support both of python2 and 3 +FilterBase = MetaFilter("Filter", (object, ), {"__doc__": MetaFilter.__doc__}) + + +class Filter(FilterBase): + pass + + +class IntermediateResponse(univ.Sequence): + pass + + +IntermediateResponse.tagSet = univ.Sequence.tagSet.tagImplicitly( + tag.Tag(tag.tagClassApplication, tag.tagFormatConstructed, 25)) +IntermediateResponse.componentType = namedtype.NamedTypes( + namedtype.OptionalNamedType('responseName', LDAPOID().subtype( + implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 0))), + namedtype.OptionalNamedType('responseValue', univ.OctetString().subtype( + implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 1)))) + + +class SearchRequest(univ.Sequence): + pass + + +SearchRequest.tagSet = univ.Sequence.tagSet.tagImplicitly( + tag.Tag(tag.tagClassApplication, tag.tagFormatConstructed, 3)) +SearchRequest.componentType = namedtype.NamedTypes( + namedtype.NamedType('baseObject', LDAPDN()), + namedtype.NamedType('scope', univ.Enumerated(namedValues=namedval.NamedValues( + ('baseObject', 0), + ('singleLevel', 1), + ('wholeSubtree', 2)))), + namedtype.NamedType('derefAliases', univ.Enumerated(namedValues=namedval.NamedValues( + ('neverDerefAliases', 0), + ('derefInSearching', 1), + ('derefFindingBaseObj', 2), + ('derefAlways', 3)))), + namedtype.NamedType('sizeLimit', univ.Integer().subtype( + subtypeSpec=constraint.ValueRangeConstraint(0, maxInt))), + namedtype.NamedType('timeLimit', univ.Integer().subtype( + subtypeSpec=constraint.ValueRangeConstraint(0, maxInt))), + namedtype.NamedType('typesOnly', univ.Boolean()), + namedtype.NamedType('filter', Filter()), + namedtype.NamedType('attributes', AttributeSelection())) + + +class RelativeLDAPDN(LDAPString): + pass + + +class ModifyDNRequest(univ.Sequence): + pass + + +ModifyDNRequest.tagSet = univ.Sequence.tagSet.tagImplicitly( + tag.Tag(tag.tagClassApplication, tag.tagFormatConstructed, 12)) +ModifyDNRequest.componentType = namedtype.NamedTypes( + namedtype.NamedType('entry', LDAPDN()), + namedtype.NamedType('newrdn', RelativeLDAPDN()), + namedtype.NamedType('deleteoldrdn', univ.Boolean()), + namedtype.OptionalNamedType('newSuperior', LDAPDN().subtype( + implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 0)))) + + +class PartialAttributeList(univ.SequenceOf): + pass + + +PartialAttributeList.componentType = PartialAttribute() + + +class SearchResultEntry(univ.Sequence): + pass + + +SearchResultEntry.tagSet = univ.Sequence.tagSet.tagImplicitly( + tag.Tag(tag.tagClassApplication, tag.tagFormatConstructed, 4)) +SearchResultEntry.componentType = namedtype.NamedTypes( + namedtype.NamedType('objectName', LDAPDN()), + namedtype.NamedType('attributes', PartialAttributeList())) + + +class ModifyDNResponse(LDAPResult): + pass + + +ModifyDNResponse.tagSet = LDAPResult.tagSet.tagImplicitly( + tag.Tag(tag.tagClassApplication, tag.tagFormatConstructed, 13)) + + +class SearchResultReference(univ.SequenceOf): + pass + + +SearchResultReference.tagSet = univ.SequenceOf.tagSet.tagImplicitly( + tag.Tag(tag.tagClassApplication, tag.tagFormatSimple, 19)) +SearchResultReference.componentType = URI() +SearchResultReference.subtypeSpec = constraint.ValueSizeConstraint(1, MAX) + + +class SearchResultDone(LDAPResult): + pass + + +SearchResultDone.tagSet = LDAPResult.tagSet.tagImplicitly( + tag.Tag(tag.tagClassApplication, tag.tagFormatConstructed, 5)) + + +class ModifyResponse(LDAPResult): + pass + + +ModifyResponse.tagSet = LDAPResult.tagSet.tagImplicitly( + tag.Tag(tag.tagClassApplication, tag.tagFormatConstructed, 7)) + + +class ModifyRequest(univ.Sequence): + pass + + +ModifyRequest.tagSet = univ.Sequence.tagSet.tagImplicitly( + tag.Tag(tag.tagClassApplication, tag.tagFormatConstructed, 6)) +ModifyRequest.componentType = namedtype.NamedTypes( + namedtype.NamedType('object', LDAPDN()), + namedtype.NamedType('changes', univ.SequenceOf(componentType=univ.Sequence( + componentType=namedtype.NamedTypes( + namedtype.NamedType('operation', univ.Enumerated(namedValues=namedval.NamedValues( + ('add', 0), + ('delete', 1), + ('replace', 2)))), + namedtype.NamedType('modification', PartialAttribute())))))) + + +class UnbindRequest(univ.Null): + pass + + +UnbindRequest.tagSet = univ.Null.tagSet.tagImplicitly( + tag.Tag(tag.tagClassApplication, tag.tagFormatSimple, 2)) + + +class LDAPMessage(univ.Sequence): + pass + + +LDAPMessage.componentType = namedtype.NamedTypes( + namedtype.NamedType('messageID', MessageID()), + namedtype.NamedType('protocolOp', univ.Choice(componentType=namedtype.NamedTypes( + namedtype.NamedType('bindRequest', BindRequest()), + namedtype.NamedType('bindResponse', BindResponse()), + namedtype.NamedType('unbindRequest', UnbindRequest()), + namedtype.NamedType('searchRequest', SearchRequest()), + namedtype.NamedType('searchResEntry', SearchResultEntry()), + namedtype.NamedType('searchResDone', SearchResultDone()), + namedtype.NamedType('searchResRef', SearchResultReference()), + namedtype.NamedType('modifyRequest', ModifyRequest()), + namedtype.NamedType('modifyResponse', ModifyResponse()), + namedtype.NamedType('addRequest', AddRequest()), + namedtype.NamedType('addResponse', AddResponse()), + namedtype.NamedType('delRequest', DelRequest()), + namedtype.NamedType('delResponse', DelResponse()), + namedtype.NamedType('modDNRequest', ModifyDNRequest()), + namedtype.NamedType('modDNResponse', ModifyDNResponse()), + namedtype.NamedType('compareRequest', CompareRequest()), + namedtype.NamedType('compareResponse', CompareResponse()), + namedtype.NamedType('abandonRequest', AbandonRequest()), + namedtype.NamedType('extendedReq', ExtendedRequest()), + namedtype.NamedType('extendedResp', ExtendedResponse()), + namedtype.NamedType('intermediateResponse', IntermediateResponse())))), + namedtype.OptionalNamedType('controls', Controls().subtype( + implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 0)))) diff --git a/tests/__main__.py b/tests/__main__.py index 4e10bc8..620dbe5 100644 --- a/tests/__main__.py +++ b/tests/__main__.py @@ -52,6 +52,7 @@ 'tests.test_rfc4476.suite', 'tests.test_rfc4490.suite', 'tests.test_rfc4491.suite', + 'tests.test_rfc4511.suite', 'tests.test_rfc4683.suite', 'tests.test_rfc4985.suite', 'tests.test_rfc5035.suite', diff --git a/tests/test_rfc4511.py b/tests/test_rfc4511.py new file mode 100644 index 0000000..a6eafad --- /dev/null +++ b/tests/test_rfc4511.py @@ -0,0 +1,303 @@ +# +# This file is part of pyasn1-modules software. +# +# Copyright (c) 2021, Hisanobu Okuda +# License: http://snmplabs.com/pyasn1/license.html +# +import sys +import unittest +import base64 + +from pyasn1.codec.ber.decoder import decode as ber_decoder +from pyasn1.codec.ber.encoder import encode as ber_encoder + +from pyasn1_modules import pem +from pyasn1_modules import rfc4511 + + +class LDAPTestCaseBase(): + b64_text = None + + def testBerCodec(self): + assert(self.__class__.b64_text is not None) + substrate = pem.readBase64fromText(self.__class__.b64_text) + + asn1Object, rest = ber_decoder( + substrate, + asn1Spec=rfc4511.LDAPMessage() + ) + + self.assertFalse(rest) + self.assertTrue(asn1Object.prettyPrint()) + self.assertEqual(substrate, ber_encoder(asn1Object)) + + def testProtocolOpType(self): + assert(self.b64_text is not None) + substrate = pem.readBase64fromText(self.b64_text) + + asn1Object, rest = ber_decoder( + substrate, + asn1Spec=rfc4511.LDAPMessage() + ) + + protocolOp = next(asn1Object['protocolOp'].values()) + + self.assertTrue( + isinstance(protocolOp, self.__class__.expected_protocolop_class) + ) + + +class BindRequestSimpleTestCase(unittest.TestCase, LDAPTestCaseBase): + b64_text = """\ +MC8CAQFgKgIBAwQaQWRtaW5pc3RyYXRvckBFWEFNUExFMi5DT02ACVBhc3N3MHJkLg== +""" + + expected_protocolop_class = rfc4511.BindRequest + + +class BindRequestSaslTestCase(unittest.TestCase, LDAPTestCaseBase): + b64_text = """\ +MIIBOgIBAmCCATMCAQMEAKOCASoECkRJR0VTVC1NRDUEggEadXNlcm5hbWU9ImNuPURpcmVj +dG9yeSBNYW5hZ2VyIixyZWFsbT0iNzA3NjU3ZTQyZTUxIixub25jZT0iZUYrTEpIYWF2ak1k +Vk1vb2YvSkphaEY1R3N2WWxCeDJrYTVJOWtaZFg0bz0iLGNub25jZT0iRUo2R0FETm1ITGR6 +RzBmMit6cFB5QkJzVEFuQm9SVXNCOE1jbGMwM1M5QT0iLG5jPTAwMDAwMDAxLHFvcD1hdXRo +LWNvbmYsY2lwaGVyPXJjNCxtYXhidWY9MTY3NzcyMTUsZGlnZXN0LXVyaT0ibGRhcC9sb2Nh +bGhvc3QiLHJlc3BvbnNlPTQ4OGE5YjUzMDE1YjE0MWUwMGI2MjUwOTZiODA2MjY0 +""" + + expected_protocolop_class = rfc4511.BindRequest + + +class BindResponseSuccessTestCase(unittest.TestCase, LDAPTestCaseBase): + b64_text = """\ +MAwCAQFhBwoBAAQABAA= +""" + + expected_protocolop_class = rfc4511.BindResponse + + +class BindResponseSaslBindInProgressTestCase( + unittest.TestCase, LDAPTestCaseBase): + b64_text = """\ +MIHLAgEBYYHFCgEOBAAEAIeBu25vbmNlPSJlRitMSkhhYXZqTWRWTW9vZi9KSmFoRjVHc3ZZ +bEJ4MmthNUk5a1pkWDRvPSIscmVhbG09IjcwNzY1N2U0MmU1MSIscW9wPSJhdXRoLGF1dGgt +aW50LGF1dGgtY29uZiIsY2lwaGVyPSJyYzQtNDAscmM0LTU2LHJjNCxkZXMsM2RlcyIsbWF4 +YnVmPTIwOTcxNTIsY2hhcnNldD11dGYtOCxhbGdvcml0aG09bWQ1LXNlc3M= +""" + + expected_protocolop_class = rfc4511.BindResponse + + +class UnbindRequestTestCase(unittest.TestCase, LDAPTestCaseBase): + b64_text = """\ +MAUCAQNCAA== +""" + + expected_protocolop_class = rfc4511.UnbindRequest + + +class SearchRequestComplicatedFilterTestCase( + unittest.TestCase, LDAPTestCaseBase): + b64_text = """\ +MIIBeQIBA2OCAXIEEGRjPXJlZGhhdCxkYz1jb20KAQIKAQMCAQACAQABAQCggc2HA3VpZKGB +xaIToxEEA3VpZAQKbm9zdWNodXNlcqksgRsyLjE2Ljg0MC4xLjExMzczMC4zLjMuMi43LjGC +AnNugwZwYXNzaW6EAQGlIgQPY3JlYXRlVGltZXN0YW1wBA8yMDA3MDEwMTAwMDAwMFqmIgQP +Y3JlYXRlVGltZXN0YW1wBA8yMTAwMDEwMTAwMDAwMFqkCQQCY24wA4ABdaQJBAJjbjADggF1 +pAkEAmNuMAOBAXWkDAQCY24wBoABdYEBc6gJBAJjbgQDc2VyMH8EB2VudHJ5ZG4EA3VpZAQL +b2JqZWN0Q2xhc3MECWdpdmVuTmFtZQQCc24ECG1lbWJlck9mBAV0aXRsZQQPY3JlYXRldGlt +ZXN0YW1wBAl1aWROdW1iZXIEAmNuBA5zYW1hY2NvdW50bmFtZQQGbWVtYmVyBApwd2RMYXN0 +U2V0 +""" + + expected_protocolop_class = rfc4511.SearchRequest + + +class SearchRequestSimpleFilterTestCase(unittest.TestCase, LDAPTestCaseBase): + b64_text = """\ +MC0CAQJjKAQQZGM9cmVkaGF0LGRjPWNvbQoBAgoBAAIBAAIBAAEBAIcDdWlkMAA= +""" + + expected_protocolop_class = rfc4511.SearchRequest + + +class SearchRequestWithControlTestCase(unittest.TestCase, LDAPTestCaseBase): + b64_text = """\ +MEACAQJjOwQQZGM9cmVkaGF0LGRjPWNvbQoBAgoBAAIBAAIBAAEBAIcDdWlkMBMEAmRuBAN1 +aWQEAmNuBARtYWls +""" + + expected_protocolop_class = rfc4511.SearchRequest + + +class SearchResultEntryTestCase(unittest.TestCase, LDAPTestCaseBase): + b64_text = """\ +MFkCAQJkVAQpdWlkPWRlbW9fdXNlcixvdT1wZW9wbGUsZGM9ZXhhbXBsZSxkYz1jb20wJzAS +BAN1aWQxCwQJZGVtb191c2VyMBEEAmNuMQsECURlbW8gVXNlcg== +""" + + expected_protocolop_class = rfc4511.SearchResultEntry + + +class SearchResultDoneTestCase(unittest.TestCase, LDAPTestCaseBase): + b64_text = """\ +MAwCAQJlBwoBAAQABAA= +""" + + expected_protocolop_class = rfc4511.SearchResultDone + + +class SearchResultReferenceTestCase(unittest.TestCase, LDAPTestCaseBase): + b64_text = """\ +MEECAQJzPAQ6bGRhcDovL2xkYXAyLmV4YW1wbGUuY29tL289Z3JvbW1ldHMsZGM9ZXhhbXBs +ZSxkYz1uZXQ/P3N1Yg== +""" + + expected_protocolop_class = rfc4511.SearchResultReference + + +class ModifyRequestTestCase(unittest.TestCase, LDAPTestCaseBase): + b64_text = """\ +MEoCAQZmRQQnY249dGVtcHVzZXIsb3U9cGVvcGxlLGRjPWV4YW1wbGUsZGM9Y29tMBowGAoB +AjATBAJzbjENBAtzdl9yZXBsYWNlZA== +""" + + expected_protocolop_class = rfc4511.ModifyRequest + + +class ModifyResponseTestCase(unittest.TestCase, LDAPTestCaseBase): + b64_text = """\ +MEwCAQdnRwoBQQQABEBtaXNzaW5nIGF0dHJpYnV0ZSAic24iIHJlcXVpcmVkIGJ5IG9iamVj +dCBjbGFzcyAiaW5ldE9yZ1BlcnNvbiIK +""" + + expected_protocolop_class = rfc4511.ModifyResponse + + +class AddRequestTestCase(unittest.TestCase, LDAPTestCaseBase): + b64_text = """\ +MIGIAgECaIGCBCdjbj10ZW1wdXNlcixvdT1wZW9wbGUsZGM9ZXhhbXBsZSxkYz1jb20wVzAe +BAtvYmplY3RDbGFzczEPBA1pbmV0T3JnUGVyc29uMBEEA3VpZDEKBAh0ZW1wdXNlcjAQBAJz +bjEKBAh0ZW1wdXNlcjAQBAJjbjEKBAh0ZW1wdXNlcg== +""" + + expected_protocolop_class = rfc4511.AddRequest + + +class AddResponseTestCase(unittest.TestCase, LDAPTestCaseBase): + b64_text = """\ +MAwCAQJpBwoBAAQABAA= +""" + + expected_protocolop_class = rfc4511.AddResponse + + +class DelRequestTestCase(unittest.TestCase, LDAPTestCaseBase): + b64_text = """\ +MCwCAQpKJ2NuPXRlbXB1c2VyLG91PXBlb3BsZSxkYz1leGFtcGxlLGRjPWNvbQ== +""" + + expected_protocolop_class = rfc4511.DelRequest + + +class DelResponseTestCase(unittest.TestCase, LDAPTestCaseBase): + b64_text = """\ +MCcCAQprIgoBIAQbb3U9cGVvcGxlLGRjPWV4YW1wbGUsZGM9Y29tBAA= +""" + + expected_protocolop_class = rfc4511.DelResponse + + +class ModifyDNRequestTestCase(unittest.TestCase, LDAPTestCaseBase): + b64_text = """\ +MF0CAQhsWAQnY249dGVtcHVzZXIsb3U9cGVvcGxlLGRjPWV4YW1wbGUsZGM9Y29tBA9jbj1u +ZXdfdGVtcHVzZXIBAf+AGW91PXBlb3BsZSxkYz1XUk9ORyxkYz1jb20= +""" + + expected_protocolop_class = rfc4511.ModifyDNRequest + + def setUp(self): + """ + According to the chapter 8.2 in + https://www.itu.int/ITU-T/studygroups/com17/languages/X.690-0207.pdf, + the boolean value true is encoded to any non-zero value. + Since some of LDAP client like openldap ldapsearch command encode True + to 0xff, and pyasn1 encodes True to 0x01, it is needed to replace FF to + 01 for testing purpose. + """ + if sys.version_info[0] <= 2: + substrate_ff = base64.b64decode(self.__class__.b64_text) + substrate_01 = str( + bytearray(substrate_ff).replace(b'\xff', b'\x01') + ) + self.__class__.b64_text = base64.b64encode(substrate_01) + else: + substrate_ff = base64.b64decode(self.__class__.b64_text.encode()) + substrate_01 = substrate_ff.replace(b'\xff', b'\x01') + self.__class__.b64_text = base64.b64encode(substrate_01).decode() + + +class ModifyDNResponseTestCase(unittest.TestCase, LDAPTestCaseBase): + b64_text = """\ +MDACAQhtKwoBRwQABCRDYW5ub3QgbW92ZSBlbnRyaWVzIGFjcm9zcyBiYWNrZW5kcwo= +""" + + expected_protocolop_class = rfc4511.ModifyDNResponse + + +class CompareRequestTestCase(unittest.TestCase, LDAPTestCaseBase): + b64_text = """\ +MEICAQJuPQQpdWlkPWRlbW9fdXNlcixvdT1wZW9wbGUsZGM9ZXhhbXBsZSxkYz1jb20wEAQC +Y24ECiBEZW1vIFVzZXI= +""" + + expected_protocolop_class = rfc4511.CompareRequest + + +class CompareResponseTestCase(unittest.TestCase, LDAPTestCaseBase): + b64_text = """\ +MAwCAQJvBwoBBgQABAA= +""" + + expected_protocolop_class = rfc4511.CompareResponse + + +class AbandonRequestTestCase(unittest.TestCase, LDAPTestCaseBase): + b64_text = """\ +MAYCAQNQAQM= +""" + + expected_protocolop_class = rfc4511.AbandonRequest + + +class ExtendedRequestTestCase(unittest.TestCase, LDAPTestCaseBase): + b64_text = """\ +MB0CAQF3GIAWMS4zLjYuMS40LjEuMTQ2Ni4yMDAzNw== +""" + + expected_protocolop_class = rfc4511.ExtendedRequest + + +class ExtendedResponseTestCase(unittest.TestCase, LDAPTestCaseBase): + b64_text = """\ +MF8CAQF4WgoBAAQABDtTdGFydCBUTFMgcmVxdWVzdCBhY2NlcHRlZC5TZXJ2ZXIgd2lsbGlu +ZyB0byBuZWdvdGlhdGUgU1NMLooWMS4zLjYuMS40LjEuMTQ2Ni4yMDAzNw== +""" + + expected_protocolop_class = rfc4511.ExtendedResponse + + +class IntermediateResponseTestCase(unittest.TestCase, LDAPTestCaseBase): + b64_text = """\ +MIGDAgECeXyAGDEuMy42LjEuNC4xLjQyMDMuMS45LjEuNIFgoV4EXGxvY2FsaG9zdC5sb2Nh +bGRvbWFpbjozODkjY249ZGlyZWN0b3J5IG1hbmFnZXI6b3U9cGVvcGxlLGRjPWV4YW1wbGUs +ZGM9Y29tOihvYmplY3RDbGFzcz0qKSMxoAA= +""" + + expected_protocolop_class = rfc4511.IntermediateResponse + + +suite = unittest.TestLoader().loadTestsFromModule(sys.modules[__name__]) + +if __name__ == '__main__': + result = unittest.TextTestRunner(verbosity=2).run(suite) + sys.exit(not result.wasSuccessful())