Skip to content

Commit

Permalink
Combine functionality ES/OpenSearch (#8491)
Browse files Browse the repository at this point in the history
  • Loading branch information
bblommers authored Jan 15, 2025
1 parent 67b915f commit 0bd01b2
Show file tree
Hide file tree
Showing 8 changed files with 137 additions and 278 deletions.
147 changes: 1 addition & 146 deletions moto/es/models.py
Original file line number Diff line number Diff line change
@@ -1,155 +1,10 @@
from typing import Any, Dict, List

from moto.core.base_backend import BackendDict, BaseBackend
from moto.core.common_models import BaseModel
from moto.moto_api._internal import mock_random
from moto.utilities.utils import get_partition

from .exceptions import DomainNotFound


class Domain(BaseModel):
def __init__(
self,
region_name: str,
domain_name: str,
es_version: str,
elasticsearch_cluster_config: Dict[str, Any],
ebs_options: Dict[str, Any],
access_policies: Dict[str, Any],
snapshot_options: Dict[str, Any],
vpc_options: Dict[str, Any],
cognito_options: Dict[str, Any],
encryption_at_rest_options: Dict[str, Any],
node_to_node_encryption_options: Dict[str, Any],
advanced_options: Dict[str, Any],
log_publishing_options: Dict[str, Any],
domain_endpoint_options: Dict[str, Any],
advanced_security_options: Dict[str, Any],
auto_tune_options: Dict[str, Any],
):
self.domain_id = mock_random.get_random_hex(8)
self.region_name = region_name
self.domain_name = domain_name
self.es_version = es_version
self.elasticsearch_cluster_config = elasticsearch_cluster_config
self.ebs_options = ebs_options
self.access_policies = access_policies
self.snapshot_options = snapshot_options
self.vpc_options = vpc_options
self.cognito_options = cognito_options
self.encryption_at_rest_options = encryption_at_rest_options
self.node_to_node_encryption_options = node_to_node_encryption_options
self.advanced_options = advanced_options
self.log_publishing_options = log_publishing_options
self.domain_endpoint_options = domain_endpoint_options
self.advanced_security_options = advanced_security_options
self.auto_tune_options = auto_tune_options
if self.auto_tune_options:
self.auto_tune_options["State"] = "ENABLED"

@property
def arn(self) -> str:
return f"arn:{get_partition(self.region_name)}:es:{self.region_name}:domain/{self.domain_id}"

def to_json(self) -> Dict[str, Any]:
return {
"DomainId": self.domain_id,
"DomainName": self.domain_name,
"ARN": self.arn,
"Created": True,
"Deleted": False,
"Processing": False,
"UpgradeProcessing": False,
"ElasticsearchVersion": self.es_version,
"ElasticsearchClusterConfig": self.elasticsearch_cluster_config,
"EBSOptions": self.ebs_options,
"AccessPolicies": self.access_policies,
"SnapshotOptions": self.snapshot_options,
"VPCOptions": self.vpc_options,
"CognitoOptions": self.cognito_options,
"EncryptionAtRestOptions": self.encryption_at_rest_options,
"NodeToNodeEncryptionOptions": self.node_to_node_encryption_options,
"AdvancedOptions": self.advanced_options,
"LogPublishingOptions": self.log_publishing_options,
"DomainEndpointOptions": self.domain_endpoint_options,
"AdvancedSecurityOptions": self.advanced_security_options,
"AutoTuneOptions": self.auto_tune_options,
}


class ElasticsearchServiceBackend(BaseBackend):
"""Implementation of ElasticsearchService APIs."""

def __init__(self, region_name: str, account_id: str):
# Functionality is part of OpenSearch, as that includes all of ES functionality + more
super().__init__(region_name, account_id)
self.domains: Dict[str, Domain] = dict()

def create_elasticsearch_domain(
self,
domain_name: str,
elasticsearch_version: str,
elasticsearch_cluster_config: Dict[str, Any],
ebs_options: Dict[str, Any],
access_policies: Dict[str, Any],
snapshot_options: Dict[str, Any],
vpc_options: Dict[str, Any],
cognito_options: Dict[str, Any],
encryption_at_rest_options: Dict[str, Any],
node_to_node_encryption_options: Dict[str, Any],
advanced_options: Dict[str, Any],
log_publishing_options: Dict[str, Any],
domain_endpoint_options: Dict[str, Any],
advanced_security_options: Dict[str, Any],
auto_tune_options: Dict[str, Any],
) -> Dict[str, Any]:
# TODO: Persist/Return other attributes
new_domain = Domain(
region_name=self.region_name,
domain_name=domain_name,
es_version=elasticsearch_version,
elasticsearch_cluster_config=elasticsearch_cluster_config,
ebs_options=ebs_options,
access_policies=access_policies,
snapshot_options=snapshot_options,
vpc_options=vpc_options,
cognito_options=cognito_options,
encryption_at_rest_options=encryption_at_rest_options,
node_to_node_encryption_options=node_to_node_encryption_options,
advanced_options=advanced_options,
log_publishing_options=log_publishing_options,
domain_endpoint_options=domain_endpoint_options,
advanced_security_options=advanced_security_options,
auto_tune_options=auto_tune_options,
)
self.domains[domain_name] = new_domain
return new_domain.to_json()

def delete_elasticsearch_domain(self, domain_name: str) -> None:
if domain_name not in self.domains:
raise DomainNotFound(domain_name)
del self.domains[domain_name]

def describe_elasticsearch_domain(self, domain_name: str) -> Dict[str, Any]:
if domain_name not in self.domains:
raise DomainNotFound(domain_name)
return self.domains[domain_name].to_json()

def list_domain_names(self) -> List[Dict[str, str]]:
"""
The engine-type parameter is not yet supported.
Pagination is not yet implemented.
"""
return [{"DomainName": domain.domain_name} for domain in self.domains.values()]

def describe_elasticsearch_domains(
self, domain_names: List[str]
) -> List[Dict[str, Any]]:
queried_domains = []
for domain_name in domain_names:
if domain_name in self.domains:
queried_domains.append(self.domains[domain_name].to_json())
return queried_domains


es_backends = BackendDict(ElasticsearchServiceBackend, "es")
107 changes: 0 additions & 107 deletions moto/es/responses.py

This file was deleted.

10 changes: 4 additions & 6 deletions moto/es/urls.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
from moto.opensearch.responses import OpenSearchServiceResponse

from .responses import ElasticsearchServiceResponse

url_bases = [
r"https?://es\.(.+)\.amazonaws\.com",
]


url_paths = {
"{0}/2015-01-01/domain$": ElasticsearchServiceResponse.list_domains,
"{0}/2015-01-01/es/domain$": ElasticsearchServiceResponse.domains,
"{0}/2015-01-01/es/domain/(?P<domainname>[^/]+)": ElasticsearchServiceResponse.domain,
"{0}/2015-01-01/es/domain-info$": ElasticsearchServiceResponse.dispatch,
"{0}/2015-01-01/domain$": OpenSearchServiceResponse.list_domains,
"{0}/2015-01-01/es/domain$": OpenSearchServiceResponse.domains,
"{0}/2015-01-01/es/domain/(?P<domainname>[^/]+)": OpenSearchServiceResponse.domain,
"{0}/2015-01-01/es/domain-info$": OpenSearchServiceResponse.list_domains,
"{0}/2021-01-01/domain$": OpenSearchServiceResponse.dispatch,
"{0}/2021-01-01/opensearch/compatibleVersions": OpenSearchServiceResponse.dispatch,
"{0}/2021-01-01/opensearch/domain": OpenSearchServiceResponse.dispatch,
Expand Down
2 changes: 2 additions & 0 deletions moto/opensearch/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@


class ResourceNotFoundException(JsonRESTError):
code = 409

def __init__(self, name: str):
super().__init__("ResourceNotFoundException", f"Domain not found: {name}")

Expand Down
33 changes: 22 additions & 11 deletions moto/opensearch/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ def __init__(
auto_tune_options: Dict[str, Any],
off_peak_window_options: Dict[str, Any],
software_update_options: Dict[str, bool],
is_es: bool,
elasticsearch_version: Optional[str],
elasticsearch_cluster_config: Optional[str],
):
self.domain_id = f"{account_id}/{domain_name}"
self.domain_name = domain_name
Expand Down Expand Up @@ -113,10 +116,16 @@ def __init__(
advanced_security_options or default_advanced_security_options
)
self.auto_tune_options = auto_tune_options or {"State": "ENABLE_IN_PROGRESS"}
if not self.auto_tune_options.get("State"):
self.auto_tune_options["State"] = "ENABLED"
self.off_peak_windows_options = off_peak_window_options
self.software_update_options = (
software_update_options or default_software_update_options
)
self.engine_type = "Elasticsearch" if is_es else "OpenSearch"
self.is_es = is_es
self.elasticsearch_version = elasticsearch_version
self.elasticsearch_cluster_config = elasticsearch_cluster_config

self.deleted = False
self.processing = False
Expand Down Expand Up @@ -157,6 +166,8 @@ def dct_options(self) -> Dict[str, Any]:
"AutoTuneOptions": self.auto_tune_options,
"OffPeakWindowsOptions": self.off_peak_windows_options,
"SoftwareUpdateOptions": self.software_update_options,
"ElasticsearchVersion": self.elasticsearch_version,
"ElasticsearchClusterConfig": self.elasticsearch_cluster_config,
}

def to_dict(self) -> Dict[str, Any]:
Expand Down Expand Up @@ -259,6 +270,9 @@ def create_domain(
auto_tune_options: Dict[str, Any],
off_peak_window_options: Dict[str, Any],
software_update_options: Dict[str, Any],
is_es: bool,
elasticsearch_version: Optional[str],
elasticsearch_cluster_config: Optional[str],
) -> OpenSearchDomain:
domain = OpenSearchDomain(
account_id=self.account_id,
Expand All @@ -280,6 +294,9 @@ def create_domain(
auto_tune_options=auto_tune_options,
off_peak_window_options=off_peak_window_options,
software_update_options=software_update_options,
is_es=is_es,
elasticsearch_version=elasticsearch_version,
elasticsearch_cluster_config=elasticsearch_cluster_config,
)
self.domains[domain_name] = domain
if tag_list:
Expand Down Expand Up @@ -355,23 +372,17 @@ def remove_tags(self, arn: str, tag_keys: List[str]) -> None:

def list_domain_names(self, engine_type: str) -> List[Dict[str, str]]:
domains = []
if engine_type and engine_type not in ["Elasticsearch", "OpenSearch"]:
raise EngineTypeNotFoundException(engine_type)
for domain in self.domains.values():
if engine_type:
if engine_type in domain.engine_version.options:
if engine_type == domain.engine_type:
domains.append(
{
"DomainName": domain.domain_name,
"EngineType": engine_type.split("_")[0],
}
{"DomainName": domain.domain_name, "EngineType": engine_type}
)
else:
raise EngineTypeNotFoundException(domain.domain_name)
else:
domains.append(
{
"DomainName": domain.domain_name,
"EngineType": domain.engine_version.options.split("_")[0],
}
{"DomainName": domain.domain_name, "EngineType": domain.engine_type}
)
return domains

Expand Down
Loading

0 comments on commit 0bd01b2

Please sign in to comment.