From 9ffab3a16039140039120f404e1b2e02948d2cbb Mon Sep 17 00:00:00 2001 From: mozzy11 Date: Wed, 29 May 2024 12:56:08 +0300 Subject: [PATCH] add ability to switch off/on creation of parquet dwh --- cloudbuild.yaml | 38 ++++++++---- docker/config/application.yaml | 4 +- docker/config_fhir_sink/application.yaml | 59 +++++++++++++++++++ docker/config_fhir_sink/flink-conf.yaml | 31 ++++++++++ .../hapi-postgres-config_local.json | 9 +++ e2e-tests/controller-spark/Dockerfile | 3 +- .../controller_spark_sql_validation.sh | 54 ++++++++++++----- .../fhir/analytics/ConvertResourceFn.java | 2 +- .../fhir/analytics/FetchSearchPageFn.java | 6 +- .../google/fhir/analytics/FhirEtlOptions.java | 6 ++ pipelines/controller/config/application.yaml | 3 + .../google/fhir/analytics/DataProperties.java | 9 +++ 12 files changed, 194 insertions(+), 30 deletions(-) create mode 100644 docker/config_fhir_sink/application.yaml create mode 100644 docker/config_fhir_sink/flink-conf.yaml create mode 100644 docker/config_fhir_sink/hapi-postgres-config_local.json diff --git a/cloudbuild.yaml b/cloudbuild.yaml index 67c2e5079..5448cc896 100644 --- a/cloudbuild.yaml +++ b/cloudbuild.yaml @@ -118,15 +118,6 @@ steps: args: [ '-U', 'admin', '-d', 'postgres', '-h', 'hapi-fhir-db', '-p', '5432', '-c', 'CREATE DATABASE views;'] -# Resetting FHIR sink server -- name: 'docker/compose' - id: 'Turn down FHIR Sink Server' - args: [ '-f', './docker/sink-compose.yml', 'down' ,'-v'] - -- name: 'docker/compose' - id: 'Launch HAPI FHIR Sink Server' - args: [ '-f', './docker/sink-compose.yml', 'up','--force-recreate', '-d' ] - - name: 'docker/compose' id: 'Bring up controller and Spark containers' env: @@ -148,9 +139,36 @@ steps: # - -c # - docker logs pipeline-controller +- name: 'docker/compose' + id: 'Bring down controller and Spark containers for FHIR server to FHIR server sync' + args: [ '-f', './docker/compose-controller-spark-sql-single.yaml', 'down' ,'-v'] + +# Resetting Sink FHIR server +- name: 'docker/compose' + id: 'Turn down HAPI Sink FHIR Server for FHIR server to FHIR server sync' + args: [ '-f', './docker/sink-compose.yml', 'down' ,'-v'] + +- name: 'docker/compose' + id: 'Launch Sink FHIR Server for FHIR server to FHIR server sync' + args: ['-f', './docker/sink-compose.yml', 'up', '--force-recreate', '-d'] + +# Spinning up only the pipeline controller for FHIR server to FHIR server sync +- name: 'docker/compose' + id: 'Bring up only the pipeline controller for FHIR server to FHIR server sync' + env: + - PIPELINE_CONFIG=/workspace/docker/config_fhir_sink + - DWH_ROOT=/workspace/e2e-tests/controller-spark/dwh + args: [ '-f', './docker/compose-controller-spark-sql-single.yaml', 'up', + '--force-recreate', '--no-deps' , '-d' ,'pipeline-controller' ] + +- name: '${_REPOSITORY}/e2e-tests/controller-spark:${_TAG}' + id: 'Run E2E Test for Dockerized Controller for FHIR server to FHIR server sync' + env: + - DWH_TYPE="FHIR" + - name: 'docker/compose' id: 'Bring down controller and Spark containers' - args: [ '-f', './docker/compose-controller-spark-sql-single.yaml', 'down' ] + args: [ '-f', './docker/compose-controller-spark-sql-single.yaml', 'down' ,'-v'] - name: 'docker/compose' id: 'Turn down HAPI Source and Sink Servers' diff --git a/docker/config/application.yaml b/docker/config/application.yaml index 83295dc7c..ee261e582 100644 --- a/docker/config/application.yaml +++ b/docker/config/application.yaml @@ -25,6 +25,8 @@ fhirdata: # fhirServerUrl: "http://hapi-server:8080/fhir" dbConfig: "config/hapi-postgres-config_local.json" dwhRootPrefix: "/dwh/controller_DWH" + #Whether to create a Parquet DWH or not + createParquetDwh: true incrementalSchedule: "0 0 * * * *" purgeSchedule: "0 30 * * * *" numOfDwhSnapshotsToRetain: 2 @@ -43,7 +45,7 @@ fhirdata: rowGroupSizeForParquetFiles: 33554432 # 32mb viewDefinitionsDir: "config/views" sinkDbConfigPath: "config/hapi-postgres-config_local_views.json" - sinkFhirServerUrl: "http://sink-server:8080/fhir" + #sinkFhirServerUrl: "http://sink-server:8080/fhir" #sinkUserName: "hapi" #sinkPassword: "hapi123" recursiveDepth: 1 diff --git a/docker/config_fhir_sink/application.yaml b/docker/config_fhir_sink/application.yaml new file mode 100644 index 000000000..942a6c325 --- /dev/null +++ b/docker/config_fhir_sink/application.yaml @@ -0,0 +1,59 @@ +# +# Copyright 2020-2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# See `pipelines/controller/config/application.yaml` for full documentation +# of these options. +# This config is meant to be used by `compose-controller-spark-sql.yaml`. +fhirdata: + # 172.17.0.1 is an example docker network interface ip address; + # `hapi-server` is another docker example where a container with that name is + # running on the same docker network. + # fhirServerUrl: "http://172.17.0.1:8091/fhir" + # fhirServerUrl: "http://hapi-server:8080/fhir" + dbConfig: "config/hapi-postgres-config_local.json" + dwhRootPrefix: "/dwh/controller_DWH" + #Whether to create a Parquet DWH or not + createParquetDwh: false + incrementalSchedule: "0 0 * * * *" + purgeSchedule: "0 30 * * * *" + numOfDwhSnapshotsToRetain: 2 + # There is no Questionnaire in our test FHIR server, but it is added to + # prevent regression of https://github.com/google/fhir-data-pipes/issues/785. + # TODO: add resource table creation to e2e tests. + resourceList: "Patient,Encounter,Observation,Questionnaire,Condition,Practitioner,Location,Organization,DiagnosticReport,Immunization,MedicationRequest,PractitionerRole,Procedure" + numThreads: 1 + autoGenerateFlinkConfiguration: true + createHiveResourceTables: false + #thriftserverHiveConfig: "config/thriftserver-hive-config_local.json" + #hiveResourceViewsDir: "config/views" + # structureDefinitionsPath: "config/profile-definitions" + structureDefinitionsPath: "classpath:/r4-us-core-definitions" + fhirVersion: "R4" + rowGroupSizeForParquetFiles: 33554432 # 32mb + #viewDefinitionsDir: "config/views" + #sinkDbConfigPath: "config/hapi-postgres-config_local_views.json" + sinkFhirServerUrl: "http://sink-server:8080/fhir" + #sinkUserName: "hapi" + #sinkPassword: "hapi123" + recursiveDepth: 1 + +# Enable spring boot actuator end points, use "*" to expose all endpoints, or a comma-separated +# list to expose selected ones +management: + endpoints: + web: + exposure: + include: health,info,metrics,prometheus,pipeline-metrics diff --git a/docker/config_fhir_sink/flink-conf.yaml b/docker/config_fhir_sink/flink-conf.yaml new file mode 100644 index 000000000..109de5192 --- /dev/null +++ b/docker/config_fhir_sink/flink-conf.yaml @@ -0,0 +1,31 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# To use this config, FLINK_CONF_DIR env. var should be set to the parent dir. + +# This is needed to prevent an "Insufficient number of network buffers" +# exceptions when running the merger on large input with many workers. +taskmanager.memory.network.max: 256mb + +# This is needed to be able to process large resources, otherwise in JDBC +# mode we may get the following exception: +# "The record exceeds the maximum size of a sort buffer ..." +taskmanager.memory.managed.size: 256mb + +# This is to make pipeline.run() non-blocking with FlinkRunner; unfortunately +# this is overwritten in `local` mode: https://stackoverflow.com/a/74416240 +execution.attached: false + +# This is required to track the pipeline metrics when FlinkRunner is used. +execution.job-listeners: com.google.fhir.analytics.metrics.FlinkJobListener diff --git a/docker/config_fhir_sink/hapi-postgres-config_local.json b/docker/config_fhir_sink/hapi-postgres-config_local.json new file mode 100644 index 000000000..743efc6bb --- /dev/null +++ b/docker/config_fhir_sink/hapi-postgres-config_local.json @@ -0,0 +1,9 @@ +{ + "jdbcDriverClass": "org.postgresql.Driver", + "databaseService" : "postgresql", + "databaseHostName" : "hapi-fhir-db", + "databasePort" : "5432", + "databaseUser" : "admin", + "databasePassword" : "admin", + "databaseName" : "hapi" +} diff --git a/e2e-tests/controller-spark/Dockerfile b/e2e-tests/controller-spark/Dockerfile index 2d55aa88f..aada6bd39 100644 --- a/e2e-tests/controller-spark/Dockerfile +++ b/e2e-tests/controller-spark/Dockerfile @@ -23,5 +23,6 @@ COPY parquet-tools-1.11.1.jar parquet-tools-1.11.1.jar ENV PARQUET_SUBDIR="dwh" ENV DOCKER_NETWORK="--use_docker_network" ENV HOME_DIR="/workspace/e2e-tests/controller-spark" +ENV DWH_TYPE="PARQUET" -ENTRYPOINT cd ${HOME_DIR}; ./controller_spark_sql_validation.sh ${HOME_DIR} ${PARQUET_SUBDIR} ${DOCKER_NETWORK} +ENTRYPOINT cd ${HOME_DIR}; ./controller_spark_sql_validation.sh ${HOME_DIR} ${PARQUET_SUBDIR} ${DOCKER_NETWORK} ${DWH_TYPE} diff --git a/e2e-tests/controller-spark/controller_spark_sql_validation.sh b/e2e-tests/controller-spark/controller_spark_sql_validation.sh index 4ed3f86dd..c351d773f 100755 --- a/e2e-tests/controller-spark/controller_spark_sql_validation.sh +++ b/e2e-tests/controller-spark/controller_spark_sql_validation.sh @@ -65,7 +65,13 @@ function validate_args() { # anything that needs printing ################################################# function print_message() { - local print_prefix="E2E TEST FOR CONTROLLER SPARK DEPLOYMENT:" + local print_prefix="" + if [[ "${DWH_TYPE}" == "PARQUET" ]] + then + print_prefix="E2E TEST FOR CONTROLLER SPARK DEPLOYMENT:" + else + print_prefix="E2E TEST FOR CONTROLLER FHIR SERVER TO FHIR SERVER SYNC:" + fi echo "${print_prefix} $*" } @@ -88,6 +94,7 @@ function print_message() { function setup() { HOME_PATH=$1 PARQUET_SUBDIR=$2 + DWH_TYPE=$4 SOURCE_FHIR_SERVER_URL='http://localhost:8091' SINK_FHIR_SERVER_URL='http://localhost:8098' PIPELINE_CONTROLLER_URL='http://localhost:8090' @@ -187,7 +194,7 @@ function run_pipeline() { ####################################################################### function check_parquet() { local isIncremental=$1 - local runtime="15 minute" + local runtime="5 minute" local end_time=$(date -ud "$runtime" +%s) local output="${HOME_PATH}/${PARQUET_SUBDIR}" local timeout=true @@ -224,7 +231,7 @@ function check_parquet() { timeout=false break else - sleep 20 + sleep 10 fi fi done @@ -412,8 +419,14 @@ setup "$@" fhir_source_query sleep 50 run_pipeline "FULL" -check_parquet false -test_fhir_sink "FULL" +if [[ "${DWH_TYPE}" == "PARQUET" ]] +then + check_parquet false +else + # Provide enough Buffer time for FULL pipeline to completely run before testing the sink FHIR server + sleep 900 + test_fhir_sink "FULL" +fi clear @@ -425,16 +438,27 @@ update_resource sleep 60 # Incremental run. run_pipeline "INCREMENTAL" -check_parquet true -fhir_source_query -test_fhir_sink "INCREMENTAL" - -validate_resource_tables -validate_resource_tables_data -validate_updated_resource +if [[ "${DWH_TYPE}" == "PARQUET" ]] +then + check_parquet true +else + fhir_source_query + # Provide enough Buffer time for FULL pipeline to completely run before testing the sink FHIR server + sleep 300 + test_fhir_sink "INCREMENTAL" +fi + +if [[ "${DWH_TYPE}" == "PARQUET" ]] +then + validate_resource_tables + validate_resource_tables_data + validate_updated_resource + + # View recreation run + # TODO add validation for the views as well + run_pipeline "VIEWS" + +fi -# View recreation run -# TODO add validation for the views as well -run_pipeline "VIEWS" print_message "END!!" \ No newline at end of file diff --git a/pipelines/batch/src/main/java/com/google/fhir/analytics/ConvertResourceFn.java b/pipelines/batch/src/main/java/com/google/fhir/analytics/ConvertResourceFn.java index 1a29ef425..5820a50b3 100644 --- a/pipelines/batch/src/main/java/com/google/fhir/analytics/ConvertResourceFn.java +++ b/pipelines/batch/src/main/java/com/google/fhir/analytics/ConvertResourceFn.java @@ -142,7 +142,7 @@ public void writeResource(HapiRowDescriptor element) numFetchedResourcesMap.get(resourceType).inc(1); - if (!parquetFile.isEmpty()) { + if (parquetUtil != null) { startTime = System.currentTimeMillis(); parquetUtil.write(resource); totalGenerateTimeMillisMap.get(resourceType).inc(System.currentTimeMillis() - startTime); diff --git a/pipelines/batch/src/main/java/com/google/fhir/analytics/FetchSearchPageFn.java b/pipelines/batch/src/main/java/com/google/fhir/analytics/FetchSearchPageFn.java index 5ba19f11f..5019052f0 100644 --- a/pipelines/batch/src/main/java/com/google/fhir/analytics/FetchSearchPageFn.java +++ b/pipelines/batch/src/main/java/com/google/fhir/analytics/FetchSearchPageFn.java @@ -21,7 +21,6 @@ import ca.uhn.fhir.parser.IParser; import com.cerner.bunsen.exception.ProfileException; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Strings; import com.google.fhir.analytics.JdbcConnectionPools.DataSourceConfig; import com.google.fhir.analytics.model.DatabaseConfiguration; import com.google.fhir.analytics.view.ViewApplicationException; @@ -88,6 +87,8 @@ abstract class FetchSearchPageFn extends DoFn> { protected final String parquetFile; + protected final Boolean createParquetDwh; + private final int secondsToFlush; private final int rowGroupSize; @@ -130,6 +131,7 @@ abstract class FetchSearchPageFn extends DoFn> { this.oAuthClientSecret = options.getFhirServerOAuthClientSecret(); this.stageIdentifier = stageIdentifier; this.parquetFile = options.getOutputParquetPath(); + this.createParquetDwh = options.isCreateParquetDwh(); this.secondsToFlush = options.getSecondsToFlushParquetFiles(); this.rowGroupSize = options.getRowGroupSizeForParquetFiles(); this.viewDefinitionsDir = options.getViewDefinitionsDir(); @@ -200,7 +202,7 @@ public void setup() throws SQLException, ProfileException { oAuthClientSecret, fhirContext); fhirSearchUtil = new FhirSearchUtil(fetchUtil); - if (!Strings.isNullOrEmpty(parquetFile)) { + if (createParquetDwh) { parquetUtil = new ParquetUtil( fhirContext.getVersion().getVersion(), diff --git a/pipelines/batch/src/main/java/com/google/fhir/analytics/FhirEtlOptions.java b/pipelines/batch/src/main/java/com/google/fhir/analytics/FhirEtlOptions.java index c28e05902..8ff02bc10 100644 --- a/pipelines/batch/src/main/java/com/google/fhir/analytics/FhirEtlOptions.java +++ b/pipelines/batch/src/main/java/com/google/fhir/analytics/FhirEtlOptions.java @@ -260,4 +260,10 @@ public interface FhirEtlOptions extends BasePipelineOptions { String getSourceNDJsonFilePattern(); void setSourceNDJsonFilePattern(String value); + + @Description("Flag to switch off/on creation of a parquet DWH") + @Default.Boolean(true) + Boolean isCreateParquetDwh(); + + void setCreateParquetDwh(Boolean value); } diff --git a/pipelines/controller/config/application.yaml b/pipelines/controller/config/application.yaml index 81c63eae9..15129f56a 100644 --- a/pipelines/controller/config/application.yaml +++ b/pipelines/controller/config/application.yaml @@ -72,6 +72,9 @@ fhirdata: # that directory too, such that files created by the pipelines are readable by # the Thrift Server, e.g., `setfacl -d -m o::rx dwh/`. dwhRootPrefix: "dwh/controller_DEV_DWH" + #Whether to create a Parquet DWH or not.In case of syncying between a FHIR server to FHIR server , + # generation of parquet DWH could be switched off/on + createParquetDwh: true # The schedule for automatic incremental pipeline runs. # Uses the Spring CronExpression format, i.e., diff --git a/pipelines/controller/src/main/java/com/google/fhir/analytics/DataProperties.java b/pipelines/controller/src/main/java/com/google/fhir/analytics/DataProperties.java index ca68f910f..58c20ddee 100644 --- a/pipelines/controller/src/main/java/com/google/fhir/analytics/DataProperties.java +++ b/pipelines/controller/src/main/java/com/google/fhir/analytics/DataProperties.java @@ -118,6 +118,8 @@ public class DataProperties { private int recursiveDepth; + private boolean createParquetDwh; + @PostConstruct void validateProperties() { CronExpression.parse(incrementalSchedule); @@ -127,6 +129,9 @@ void validateProperties() { "At least one of fhirServerUrl or dbConfig should be set!"); Preconditions.checkState(fhirVersion != null, "FhirVersion cannot be empty"); + Preconditions.checkArgument( + !Strings.isNullOrEmpty(dwhRootPrefix), "dwhRootPrefix is required!"); + if (!Strings.isNullOrEmpty(dbConfig)) { if (!Strings.isNullOrEmpty(fhirServerUrl)) { logger.warn("Both fhirServerUrl and dbConfig are set; ignoring fhirServerUrl!"); @@ -138,6 +143,7 @@ void validateProperties() { logger.info("Using FHIR-search mode since dbConfig is not set."); } Preconditions.checkState(!createHiveResourceTables || !thriftserverHiveConfig.isEmpty()); + Preconditions.checkState(!createHiveResourceTables || createParquetDwh); } private PipelineConfig.PipelineConfigBuilder addFlinkOptions(FhirEtlOptions options) { @@ -213,6 +219,8 @@ PipelineConfig createBatchOptions() { Instant.now().toString().replace(":", "-").replace("-", "_").replace(".", "_"); options.setOutputParquetPath(dwhRootPrefix + TIMESTAMP_PREFIX + timestampSuffix); + options.setCreateParquetDwh(createParquetDwh); + PipelineConfig.PipelineConfigBuilder pipelineConfigBuilder = addFlinkOptions(options); // Get hold of thrift server parquet directory from dwhRootPrefix config. @@ -230,6 +238,7 @@ List getConfigParams() { return List.of( new ConfigFields("fhirdata.fhirServerUrl", fhirServerUrl, "", ""), new ConfigFields("fhirdata.dwhRootPrefix", dwhRootPrefix, "", ""), + new ConfigFields("fhirdata.createParquetDwh", String.valueOf(createParquetDwh), "", ""), new ConfigFields("fhirdata.incrementalSchedule", incrementalSchedule, "", ""), new ConfigFields("fhirdata.purgeSchedule", purgeSchedule, "", ""), new ConfigFields(