Skip to content
Open
2 changes: 2 additions & 0 deletions .coveragerc
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ omit =
# Schema tested by `make check-black`
samtranslator/schema/*
samtranslator/internal/schema_source/*
# Deprecated validator module
samtranslator/validator/validator.py
[report]
exclude_lines =
pragma: no cover
210 changes: 190 additions & 20 deletions samtranslator/model/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
""" CloudFormation Resource serialization, deserialization, and validation """
"""CloudFormation Resource serialization, deserialization, and validation"""

import inspect
import re
from abc import ABC, ABCMeta, abstractmethod
from contextlib import suppress
from enum import Enum
from typing import Any, Callable, Dict, List, Optional, Tuple, Type, TypeVar

from samtranslator.compat import pydantic
Expand Down Expand Up @@ -163,7 +164,7 @@ def __init__(
self.set_resource_attribute(attr, value)

@classmethod
def get_supported_resource_attributes(cls): # type: ignore[no-untyped-def]
def get_supported_resource_attributes(cls) -> Tuple[str, ...]:
"""
A getter method for the supported resource attributes
returns: a tuple that contains the name of all supported resource attributes
Expand Down Expand Up @@ -205,7 +206,7 @@ def from_dict(cls, logical_id: str, resource_dict: Dict[str, Any], relative_id:

resource = cls(logical_id, relative_id=relative_id)

resource._validate_resource_dict(logical_id, resource_dict) # type: ignore[no-untyped-call]
resource._validate_resource_dict(logical_id, resource_dict)

# Default to empty properties dictionary. If customers skip the Properties section, an empty dictionary
# accurately captures the intent.
Expand Down Expand Up @@ -247,7 +248,7 @@ def _validate_logical_id(logical_id: Optional[Any]) -> str:
raise InvalidResourceException(str(logical_id), "Logical ids must be alphanumeric.")

@classmethod
def _validate_resource_dict(cls, logical_id, resource_dict): # type: ignore[no-untyped-def]
def _validate_resource_dict(cls, logical_id: str, resource_dict: Dict[str, Any]) -> None:
"""Validates that the provided resource dict contains the correct Type string, and the required Properties dict.

:param dict resource_dict: the resource dict to validate
Expand Down Expand Up @@ -335,22 +336,82 @@ def __setattr__(self, name, value): # type: ignore[no-untyped-def]
)

# Note: For compabitliy issue, we should ONLY use this with new abstraction/resources.
def validate_properties_and_return_model(self, cls: Type[RT]) -> RT:
def validate_properties_and_return_model(self, cls: Type[RT], collect_all_errors: bool = False) -> RT:
"""
Given a resource properties, return a typed object from the definitions of SAM schema model

param:
resource_properties: properties from input template
Args:
cls: schema models
collect_all_errors: If True, collect all validation errors. If False (default), only first error.
"""
try:
return cls.parse_obj(self._generate_resource_dict()["Properties"])
except pydantic.error_wrappers.ValidationError as e:
if collect_all_errors:
# Comprehensive error collection with union type consolidation
error_messages = self._format_all_errors(e.errors()) # type: ignore[arg-type]
raise InvalidResourceException(self.logical_id, " ".join(error_messages)) from e
error_properties: str = ""
with suppress(KeyError):
error_properties = ".".join(str(x) for x in e.errors()[0]["loc"])
raise InvalidResourceException(self.logical_id, f"Property '{error_properties}' is invalid.") from e

def _format_all_errors(self, errors: List[Dict[str, Any]]) -> List[str]:
"""Format all validation errors, consolidating union type errors in single pass."""
type_mapping = {
"not a valid dict": "dictionary",
"not a valid int": "integer",
"not a valid float": "number",
"not a valid list": "list",
"not a valid str": "string",
}

# Group errors by path in a single pass
path_to_errors: Dict[str, Dict[str, Any]] = {}

for error in errors:
property_path = ".".join(str(x) for x in error["loc"])
raw_message = error.get("msg", "")

# Extract type for union consolidation
extracted_type = None
for pattern, type_name in type_mapping.items():
if pattern in raw_message:
extracted_type = type_name
break

if property_path not in path_to_errors:
path_to_errors[property_path] = {"types": [], "error": error}

if extracted_type:
path_to_errors[property_path]["types"].append(extracted_type)

# Format messages based on collected data
result = []
for path, data in path_to_errors.items():
unique_types = list(dict.fromkeys(data["types"])) # Remove duplicates, preserve order

if len(unique_types) > 1:
# Multiple types - consolidate with union
type_text = " or ".join(unique_types)
result.append(f"Property '{path}' value must be {type_text}.")
else:
# Single or no types - format normally
result.append(self._format_single_error(data["error"]))

return result

def _format_single_error(self, error: Dict[str, Any]) -> str:
"""Format a single Pydantic error into user-friendly message."""
property_path = ".".join(str(x) for x in error["loc"])
raw_message = error["msg"]

if error["type"] == "value_error.missing":
return f"Property '{property_path}' is required."
if "extra fields not permitted" in raw_message:
return f"Property '{property_path}' is an invalid property."
return f"Property '{property_path}' {raw_message.lower()}."

def validate_properties(self) -> None:
"""Validates that the required properties for this Resource have been populated, and that all properties have
valid values.
Expand Down Expand Up @@ -481,6 +542,16 @@ def to_cloudformation(self, **kwargs: Any) -> List[Any]:
"""


class ValidationRule(Enum):
MUTUALLY_EXCLUSIVE = "mutually_exclusive"
MUTUALLY_INCLUSIVE = "mutually_inclusive"
CONDITIONAL_REQUIREMENT = "conditional_requirement"


# Simple tuple-based rules: (rule_type, [property_names])
PropertyRule = Tuple[ValidationRule, List[str]]


class SamResourceMacro(ResourceMacro, metaclass=ABCMeta):
"""ResourceMacro that specifically refers to SAM (AWS::Serverless::*) resources."""

Expand Down Expand Up @@ -536,14 +607,14 @@ def get_resource_references(self, generated_cfn_resources, supported_resource_re
def _construct_tag_list(
self, tags: Optional[Dict[str, Any]], additional_tags: Optional[Dict[str, Any]] = None
) -> List[Dict[str, Any]]:
if not bool(tags):
tags = {}
tags_dict: Dict[str, Any] = tags or {}

if additional_tags is None:
additional_tags = {}

# At this point tags is guaranteed to be a Dict[str, Any] since we set it to {} if it was falsy
for tag in self._RESERVED_TAGS:
self._check_tag(tag, tags) # type: ignore[no-untyped-call]
self._check_tag(tag, tags_dict)

sam_tag = {self._SAM_KEY: self._SAM_VALUE}

Expand All @@ -553,6 +624,40 @@ def _construct_tag_list(
# customer's knowledge.
return get_tag_list(sam_tag) + get_tag_list(additional_tags) + get_tag_list(tags)

@staticmethod
def propagate_tags_combine(
resources: List[Resource], tags: Optional[Dict[str, Any]], propagate_tags: Optional[bool] = False
) -> None:
"""
Propagates tags to the resources
Similar to propagate_tags() method but this method will combine provided tags with existing resource tags.

Note:
- This method created after propagate_tags() to combine propagate tags with resource tags create during to_cloudformation()
- Create this method because updating propagate_tags() will cause regression issue;
- Use this method for new resource if you want to assign combined tags, not replace.

:param propagate_tags: Whether we should pass the tags to generated resources.
:param resources: List of generated resources
:param tags: dictionary of tags to propagate to the resources.

:return: None
"""
if not propagate_tags or not tags:
return

for resource in resources:
if hasattr(resource, "Tags"):
if resource.Tags:
propagated_tags = get_tag_list(tags)
combined_tags = [
{"Key": k, "Value": v}
for k, v in {tag["Key"]: tag["Value"] for tag in resource.Tags + propagated_tags}.items()
]
resource.Tags = combined_tags
else:
resource.assign_tags(tags)

@staticmethod
def propagate_tags(
resources: List[Resource], tags: Optional[Dict[str, Any]], propagate_tags: Optional[bool] = False
Expand All @@ -572,7 +677,7 @@ def propagate_tags(
for resource in resources:
resource.assign_tags(tags)

def _check_tag(self, reserved_tag_name, tags): # type: ignore[no-untyped-def]
def _check_tag(self, reserved_tag_name: str, tags: Dict[str, Any]) -> None:
if reserved_tag_name in tags:
raise InvalidResourceException(
self.logical_id,
Expand All @@ -582,6 +687,71 @@ def _check_tag(self, reserved_tag_name, tags): # type: ignore[no-untyped-def]
"input.",
)

def validate_before_transform(self, schema_class: Optional[Type[RT]], collect_all_errors: bool = False) -> None:
if not hasattr(self, "__validation_rules__"):
return

rules = self.__validation_rules__
validated_model = (
self.validate_properties_and_return_model(schema_class, collect_all_errors) if schema_class else None
)

error_messages = []

for rule_type, properties in rules:
present = [prop for prop in properties if self._get_property_value(prop, validated_model) is not None]
if rule_type == ValidationRule.MUTUALLY_EXCLUSIVE:
# Check if more than one property exists
if len(present) > 1:
prop_names = [f"'{p}'" for p in present]
error_messages.append(f"Cannot specify {self._combine_string(prop_names)} together.")

elif rule_type == ValidationRule.MUTUALLY_INCLUSIVE:
# If any property in the group is present, then all properties in the group must be present
# Check if some but not all properties from the group are present
missing_some_properties = 0 < len(present) < len(properties)
if missing_some_properties:
error_messages.append(f"Properties must be used together: {self._combine_string(properties)}.")

elif (
rule_type == ValidationRule.CONDITIONAL_REQUIREMENT
and self._get_property_value(properties[0], validated_model) is not None
and self._get_property_value(properties[1], validated_model) is None
):
# First property requires second property
error_messages.append(f"'{properties[0]}' requires '{properties[1]}'.")

# If there are any validation errors, raise a single exception with all error messages
if error_messages:
raise InvalidResourceException(self.logical_id, "\n".join(error_messages))

def _combine_string(self, words: List[str]) -> str:
return ", ".join(words[:-1]) + (" and " + words[-1] if len(words) > 1 else words[0] if words else "")

def _get_property_value(self, prop: str, validated_model: Any = None) -> Any:
"""Original property value getter. Supports nested properties with dot notation."""
if "." not in prop:
# Simple property - use existing logic for direct attributes
return getattr(self, prop, None)

# Nested property - use validated model
if validated_model is None:
return None

try:
# Navigate through nested properties using dot notation
value = validated_model
for part in prop.split("."):
if hasattr(value, part):
value = getattr(value, part)
if value is None:
return None
else:
return None
return value
except Exception:
return None


class ResourceTypeResolver:
"""ResourceTypeResolver maps Resource Types to Resource classes, e.g. AWS::Serverless::Function to
Expand Down Expand Up @@ -643,7 +813,7 @@ def get_all_resources(self) -> Dict[str, Any]:
"""Return a dictionary of all resources from the SAM template."""
return self.resources

def get_resource_by_logical_id(self, _input: str) -> Dict[str, Any]:
def get_resource_by_logical_id(self, _input: str) -> Optional[Dict[str, Any]]:
"""
Recursively find resource with matching Logical ID that are present in the template and returns the value.
If it is not in template, this method simply returns the input unchanged.
Expand All @@ -661,16 +831,16 @@ def get_resource_by_logical_id(self, _input: str) -> Dict[str, Any]:
__all__: List[str] = [
"IS_DICT",
"IS_STR",
"Validator",
"any_type",
"is_type",
"PropertyType",
"Property",
"PassThroughProperty",
"MutatedPassThroughProperty",
"PassThroughProperty",
"Property",
"PropertyType",
"Resource",
"ResourceMacro",
"SamResourceMacro",
"ResourceTypeResolver",
"ResourceResolver",
"ResourceTypeResolver",
"SamResourceMacro",
"Validator",
"any_type",
"is_type",
]
18 changes: 17 additions & 1 deletion samtranslator/model/sam_resources.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
""" SAM macro definitions """
"""SAM macro definitions"""

import copy
import re
Expand Down Expand Up @@ -139,6 +139,7 @@ class SamFunction(SamResourceMacro):
"""SAM function macro."""

resource_type = "AWS::Serverless::Function"

property_types = {
"FunctionName": PropertyType(False, one_of(IS_STR, IS_DICT)),
"Handler": PassThroughProperty(False),
Expand Down Expand Up @@ -252,6 +253,15 @@ class SamFunction(SamResourceMacro):
"DestinationQueue": SQSQueue.resource_type,
}

# Validation rules
# TODO: To enable these rules, we need to update translator test input/output files to property configure template
# to avoid fail-fast. eg: test with DeploymentPreference without AutoPublishAlias would fail fast before reaching testing state
# __validation_rules__ = [
# (ValidationRule.MUTUALLY_EXCLUSIVE, ["ImageUri", "InlineCode", "CodeUri"]),
# (ValidationRule.CONDITIONAL_REQUIREMENT, ["DeploymentPreference", "AutoPublishAlias"]),
# (ValidationRule.CONDITIONAL_REQUIREMENT, ["ProvisionedConcurrencyConfig", "AutoPublishAlias"]),
# ]

def resources_to_link(self, resources: Dict[str, Any]) -> Dict[str, Any]:
try:
return {"event_resources": self._event_resources_to_link(resources)}
Expand All @@ -274,6 +284,11 @@ def to_cloudformation(self, **kwargs): # type: ignore[no-untyped-def] # noqa: P
conditions = kwargs.get("conditions", {})
feature_toggle = kwargs.get("feature_toggle")

# TODO: Skip pass schema_class=aws_serverless_function.Properties to skip schema validation for now.
# - adding this now would required update error message in error error_function_*_test.py
# - add this when we can verify that changing error message would not break customers
# self.validate_before_transform(schema_class=None, collect_all_errors=False)

if self.DeadLetterQueue:
self._validate_dlq(self.DeadLetterQueue)

Expand Down Expand Up @@ -1809,6 +1824,7 @@ def _validate_architectures(self, lambda_layer: LambdaLayerVersion) -> None:
# Intrinsics are not validated
if is_intrinsic(architectures):
return

for arq in architectures:
# We validate the values only if we they're not intrinsics
if not is_intrinsic(arq) and arq not in [ARM64, X86_64]:
Expand Down
Loading