Source code for commandment.cms.decorators
from typing import List, Tuple
from asn1crypto.cms import CMSAttribute
from cryptography.exceptions import InvalidSignature
from flask import request, g, current_app, abort
from functools import wraps
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from asn1crypto import cms
from base64 import b64decode, b64encode
from . import _certificate_by_signer_identifier, _cryptography_hash_function, _cryptography_pad_function
def _verify_cms_signers(signed_data: bytes, detached: bool = False) -> Tuple[List[x509.Certificate], bytes]:
ci = cms.ContentInfo.load(signed_data)
assert ci['content_type'].native == 'signed_data'
signed: cms.SignedData = ci['content']
current_app.logger.debug("CMS request contains %d certificate(s)", len(signed['certificates']))
signers = []
for signer in signed['signer_infos']:
asn_certificate = _certificate_by_signer_identifier(signed['certificates'], signer['sid'])
assert asn_certificate is not None
certificate = x509.load_der_x509_certificate(asn_certificate.dump(), default_backend())
digest_algorithm = signer['digest_algorithm']
signature_algorithm = signer['signature_algorithm']
hash_function = _cryptography_hash_function(digest_algorithm)
pad_function = _cryptography_pad_function(signature_algorithm)
if hash_function is None or pad_function is None:
raise ValueError('Unsupported signature algorithm: {}'.format(signature_algorithm))
else:
current_app.logger.debug("Using signature algorithm: %s", signature_algorithm.native)
assert signed['encap_content_info']['content_type'].native == 'data'
if detached:
data = request.data
else:
data = signed['encap_content_info']['content'].native
if 'signed_attrs' in signer and len(signer['signed_attrs']) > 0:
for i in range(0, len(signer['signed_attrs'])):
signed_attr: CMSAttribute = signer['signed_attrs'][i]
if signed_attr['type'].native == "message_digest":
current_app.logger.debug("SignerInfo digest: %s", b64encode(signed_attr['values'][0].native))
certificate.public_key().verify(
signer['signature'].native,
signer['signed_attrs'].dump(),
pad_function(),
hash_function()
)
else: # No signed attributes means we are only validating the digest
certificate.public_key().verify(
signer['signature'].native,
data,
pad_function(),
hash_function()
)
signers.append(certificate)
# TODO: Don't assume that content is OctetString
if detached:
return signers, request.data
else:
return signers, signed['encap_content_info']['content'].native
[docs]def verify_cms_signers(f):
"""Verify the signers of a request containing a CMS/PKCS#7, DER encoded body.
The certificate of each signer is placed on the global **g** variable as **g.signers** and the signed data is
set as **g.signed_data**.
In unit tests, this decorator is completely disabled by the presence of testing = True
Raises:
- TypeError if *Content-Type* header is not "application/pkcs7-signature"
- SigningError if any signer on the CMS content is not valid.
"""
@wraps(f)
def decorator(*args, **kwargs):
if current_app.testing:
return f(*args, **kwargs)
current_app.logger.debug('Verifying CMS Request Data for request to %s', request.url)
if request.headers['Content-Type'] != "application/pkcs7-signature":
raise TypeError("verify_cms_signers expects application/pkcs7-signature, got: {}".format(
request.headers['Content-Type']))
g.signers, g.signed_data = _verify_cms_signers(request.data)
return f(*args, **kwargs)
return decorator
[docs]def verify_mdm_signature(f):
"""Verify the signature supplied by the client in the request using the ``Mdm-Signature`` header.
If the authenticity of the message has been verified,
then the signer is attached to the **g** object as **g.signer**.
In unit tests, this decorator is completely disabled by the presence of app.testing = True.
You can also disable enforcement in dev by setting the flask setting DEBUG to true.
:reqheader Mdm-Signature: BASE64-encoded CMS Detached Signature of the message. (if `SignMessage` was true)
"""
@wraps(f)
def decorator(*args, **kwargs):
if current_app.testing:
return f(*args, **kwargs)
if 'Mdm-Signature' not in request.headers:
raise TypeError('Client did not supply an Mdm-Signature header but signature is required.')
detached_signature = b64decode(request.headers['Mdm-Signature'])
try:
signers, signed_data = _verify_cms_signers(detached_signature, detached=True)
g.signers = signers
g.signed_data = signed_data
except InvalidSignature as e:
current_app.logger.warn("Invalid Signature in Mdm-Signature header")
if not current_app.config.get('DEBUG', False):
return abort(403)
return f(*args, **kwargs)
return decorator