Skip to content

Commit

Permalink
ACM-PCA: Ensure CertificateAuthority is serialisable
Browse files Browse the repository at this point in the history
  • Loading branch information
viren-nadkarni committed Aug 20, 2024
1 parent c86d1a2 commit 27575d3
Showing 1 changed file with 36 additions and 32 deletions.
68 changes: 36 additions & 32 deletions moto/acmpca/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,36 +60,7 @@ def __init__(
self.certificate_chain: Optional[bytes] = None
self.issued_certificates: Dict[str, bytes] = dict()

subject = self.certificate_authority_configuration.get("Subject", {})
name_attributes = []
if "Country" in subject:
name_attributes.append(
x509.NameAttribute(x509.NameOID.COUNTRY_NAME, subject["Country"])
)
if "State" in subject:
name_attributes.append(
x509.NameAttribute(
x509.NameOID.STATE_OR_PROVINCE_NAME, subject["State"]
)
)
if "Organization" in subject:
name_attributes.append(
x509.NameAttribute(
x509.NameOID.ORGANIZATION_NAME, subject["Organization"]
)
)
if "OrganizationalUnit" in subject:
name_attributes.append(
x509.NameAttribute(
x509.NameOID.ORGANIZATIONAL_UNIT_NAME, subject["OrganizationalUnit"]
)
)
if "CommonName" in subject:
name_attributes.append(
x509.NameAttribute(x509.NameOID.COMMON_NAME, subject["CommonName"])
)
self.issuer = x509.Name(name_attributes)
self.csr = self._ca_csr(self.issuer)
self.subject = self.certificate_authority_configuration.get("Subject", {})

def generate_cert(
self,
Expand All @@ -114,10 +85,43 @@ def generate_cert(

return cert.public_bytes(serialization.Encoding.PEM)

def _ca_csr(self, issuer: x509.Name) -> bytes:
@property
def issuer(self) -> x509.Name:
name_attributes = []
if "Country" in self.subject:
name_attributes.append(
x509.NameAttribute(x509.NameOID.COUNTRY_NAME, self.subject["Country"])
)
if "State" in self.subject:
name_attributes.append(
x509.NameAttribute(
x509.NameOID.STATE_OR_PROVINCE_NAME, self.subject["State"]
)
)
if "Organization" in self.subject:
name_attributes.append(
x509.NameAttribute(
x509.NameOID.ORGANIZATION_NAME, self.subject["Organization"]
)
)
if "OrganizationalUnit" in self.subject:
name_attributes.append(
x509.NameAttribute(
x509.NameOID.ORGANIZATIONAL_UNIT_NAME,
self.subject["OrganizationalUnit"],
)
)
if "CommonName" in self.subject:
name_attributes.append(
x509.NameAttribute(x509.NameOID.COMMON_NAME, self.subject["CommonName"])
)
return x509.Name(name_attributes)

@property
def csr(self) -> bytes:
csr = (
x509.CertificateSigningRequestBuilder()
.subject_name(issuer)
.subject_name(self.issuer)
.add_extension(
x509.BasicConstraints(ca=True, path_length=None),
critical=True,
Expand Down

0 comments on commit 27575d3

Please sign in to comment.