Skip to content

Commit

Permalink
Update PR and adress review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
mozzy11 committed Jan 6, 2025
1 parent 104990a commit f23ffee
Show file tree
Hide file tree
Showing 12 changed files with 87 additions and 212 deletions.
49 changes: 27 additions & 22 deletions cloudbuild.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -189,23 +189,20 @@ steps:
'-c', 'CREATE DATABASE views;']
waitFor: ['Turn down FHIR Sink Server Search']

- name: 'docker/compose'
id: 'Launch HAPI FHIR Sink Server Controller'
args: [ '-f', './docker/sink-compose.yml', '-p', 'sink-server-controller', 'up','--force-recreate', '-d' ]
env:
- SINK_SERVER_NAME=sink-server-controller
- SINK_SERVER_PORT=9001
waitFor: ['Create views database']

- name: 'docker/compose'
id: 'Bring up controller and Spark containers'
env:
- PIPELINE_CONFIG=/workspace/docker/config
- DWH_ROOT=/workspace/e2e-tests/controller-spark/dwh
- FHIRDATA_SINKFHIRSERVERURL=http://sink-server-controller:8080/fhir
- FHIRDATA_SINKFHIRSERVERURL=
- FHIRDATA_GENERATEPARQUETFILES=true
- FHIRDATA_NUMTHREADS=-1
- FHIRDATA_CREATEHIVERESOURCETABLES=true
- FHIRDATA_CREATEPARQUETVIEWS=true
- FHIRDATA_SINKDBCONFIGPATH=config/hapi-postgres-config_local_views.json
args: [ '-f', './docker/compose-controller-spark-sql-single.yaml', 'up',
'--force-recreate', '-d' ]
waitFor: ['Launch HAPI FHIR Sink Server Controller']
waitFor: ['Create views database']

- name: '${_REPOSITORY}/e2e-tests/controller-spark:${_TAG}'
id: 'Run E2E Test for Dockerized Controller and Spark Thriftserver'
Expand All @@ -224,39 +221,47 @@ steps:
- name: 'docker/compose'
id: 'Bring down controller and Spark containers for FHIR server to FHIR server sync'
args: [ '-f', './docker/compose-controller-spark-sql-single.yaml', 'down' ,'-v']

# Resetting Sink FHIR server
- name: 'docker/compose'
id: 'Turn down HAPI Sink FHIR Server for FHIR server to FHIR server sync'
args: [ '-f', './docker/sink-compose.yml', 'down' ,'-v']
waitFor: ['Run E2E Test for Dockerized Controller and Spark Thriftserver']

- name: 'docker/compose'
id: 'Launch Sink FHIR Server for FHIR server to FHIR server sync'
args: ['-f', './docker/sink-compose.yml', 'up', '--force-recreate', '-d']
id: 'Launch HAPI FHIR Sink Server Controller'
args: [ '-f', './docker/sink-compose.yml', '-p', 'sink-server-controller', 'up','--force-recreate', '-d' ]
env:
- SINK_SERVER_NAME=sink-server-controller
- SINK_SERVER_PORT=9001
waitFor: ['Bring down controller and Spark containers for FHIR server to FHIR server sync']

# Spinning up only the pipeline controller for FHIR server to FHIR server sync
- name: 'docker/compose'
id: 'Bring up only the pipeline controller for FHIR server to FHIR server sync'
id: 'Bring up the pipeline controller'
env:
- PIPELINE_CONFIG=/workspace/docker/config_fhir_sink
- PIPELINE_CONFIG=/workspace/docker/config
- DWH_ROOT=/workspace/e2e-tests/controller-spark/dwh
- FHIRDATA_SINKFHIRSERVERURL=http://sink-server-controller:8080/fhir
- FHIRDATA_GENERATEPARQUETFILES=false
- FHIRDATA_NUMTHREADS=1
- FHIRDATA_CREATEHIVERESOURCETABLES=false
- FHIRDATA_CREATEPARQUETVIEWS=false
- FHIRDATA_SINKDBCONFIGPATH=
args: [ '-f', './docker/compose-controller-spark-sql-single.yaml', 'up',
'--force-recreate', '--no-deps' , '-d' ,'pipeline-controller' ]
waitFor: ['Launch HAPI FHIR Sink Server Controller']

- name: '${_REPOSITORY}/e2e-tests/controller-spark:${_TAG}'
id: 'Run E2E Test for Dockerized Controller for FHIR server to FHIR server sync'
waitFor: ['Bring up the pipeline controller']
env:
- DWH_TYPE="FHIR"

- name: 'docker/compose'
id: 'Bring down controller and Spark containers'
id: 'Bring down controller'
args: [ '-f', './docker/compose-controller-spark-sql-single.yaml', 'down' ,'-v']
waitFor: ['Run E2E Test for Dockerized Controller and Spark Thriftserver']
waitFor: ['Run E2E Test for Dockerized Controller for FHIR server to FHIR server sync']

- name: 'docker/compose'
id: 'Turn down HAPI Source Server'
args: [ '-f', './docker/hapi-compose.yml', 'down' ]
waitFor: ['Bring down controller and Spark containers']
waitFor: ['Bring down controller']

- name: 'docker/compose'
id: 'Turn down FHIR Sink Server Controller for e2e tests'
Expand Down
7 changes: 7 additions & 0 deletions docker/compose-controller-spark-sql-single.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,13 @@ services:
- ${DWH_ROOT}:/dwh
environment:
- JAVA_OPTS=$JAVA_OPTS
# This is to turn this on in e2e but leave it off in the default config.
- FHIRDATA_SINKFHIRSERVERURL=$FHIRDATA_SINKFHIRSERVERURL
- FHIRDATA_GENERATEPARQUETFILES=$FHIRDATA_GENERATEPARQUETFILES
- FHIRDATA_NUMTHREADS=$FHIRDATA_NUMTHREADS
- FHIRDATA_CREATEHIVERESOURCETABLES=$FHIRDATA_CREATEHIVERESOURCETABLES
- FHIRDATA_CREATEPARQUETVIEWS=$FHIRDATA_CREATEPARQUETVIEWS
- FHIRDATA_SINKDBCONFIGPATH=$FHIRDATA_SINKDBCONFIGPATH
ports:
- '8090:8080'
networks:
Expand Down
3 changes: 1 addition & 2 deletions docker/config/application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ fhirdata:
# fhirServerUrl: "http://hapi-server:8080/fhir"
dbConfig: "config/hapi-postgres-config_local.json"
dwhRootPrefix: "/dwh/controller_DWH"
#Whether to create a Parquet DWH or not
createParquetDwh: true
generateParquetFiles: true
incrementalSchedule: "0 0 * * * *"
purgeSchedule: "0 30 * * * *"
numOfDwhSnapshotsToRetain: 2
Expand Down
59 changes: 0 additions & 59 deletions docker/config_fhir_sink/application.yaml

This file was deleted.

31 changes: 0 additions & 31 deletions docker/config_fhir_sink/flink-conf.yaml

This file was deleted.

9 changes: 0 additions & 9 deletions docker/config_fhir_sink/hapi-postgres-config_local.json

This file was deleted.

91 changes: 30 additions & 61 deletions e2e-tests/controller-spark/controller_spark_sql_validation.sh
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,8 @@ function run_pipeline() {
}

function wait_for_completion() {
local runtime="15 minute"
local time=$1
local runtime="${time} minute"
local end_time=$(date -ud "$runtime" +%s)

while [[ $(date -u +%s) -le $end_time ]]
Expand Down Expand Up @@ -222,8 +223,6 @@ function wait_for_completion() {
#######################################################################
function check_parquet() {
local isIncremental=$1
local runtime="5 minute"
local end_time=$(date -ud "$runtime" +%s)
local output="${HOME_PATH}/${PARQUET_SUBDIR}"
TOTAL_VIEW_PATIENTS=106

Expand All @@ -237,40 +236,8 @@ function check_parquet() {
TOTAL_TEST_OBS=$((2*TOTAL_TEST_OBS))
fi


while [[ $(date -u +%s) -le $end_time ]]
do
# check whether output directory has started receiving parquet files.
if [[ "$(ls -A $output)" ]]
then
local total_patients=$(java -Xms16g -Xmx16g -jar ./parquet-tools-1.11.1.jar rowcount \
"${output}/*/Patient/" | awk '{print $3}')
local total_encounters=$(java -Xms16g -Xmx16g -jar ./parquet-tools-1.11.1.jar rowcount \
"${output}/*/Encounter/" | awk '{print $3}')
local total_observations=$(java -Xms16g -Xmx16g -jar ./parquet-tools-1.11.1.jar rowcount \
"${output}/*/Observation/" | awk '{print $3}')

print_message "Total patients: $total_patients"
print_message "Total encounters: $total_encounters"
print_message "Total observations: $total_observations"

if [[ "${total_patients}" == "${TOTAL_TEST_PATIENTS}" && "${total_encounters}" \
== "${TOTAL_TEST_ENCOUNTERS}" && "${total_observations}" == "${TOTAL_TEST_OBS}" ]] \
; then
print_message "Pipeline transformation successfully completed."
timeout=false
break
else
sleep 10
fi
fi
done

if [[ "${timeout}" == "true" ]]

# check whether output directory has received parquet files.
if [[ "$(ls -A $output)" ]]

then
local total_patients=$(java -Xms16g -Xmx16g -jar ./parquet-tools-1.11.1.jar rowcount \
"${output}/*/Patient/" | awk '{print $3}')
Expand Down Expand Up @@ -451,6 +418,26 @@ function validate_updated_resource() {
}


function validate_updated_resource_in_fhir_sink() {
local fhir_username="hapi"
local fhir_password="hapi"
local fhir_url_extension="/fhir"

# Fetch the patient resource using the Patient ID
local updated_family_name=$(curl -X GET -H "Content-Type: application/json; charset=utf-8" -u $fhir_username:$fhir_password \
--connect-timeout 5 --max-time 20 "${SINK_FHIR_SERVER_URL}${fhir_url_extension}/Patient/${PATIENT_ID}" \
| jq -r '.name[0].family')

if [[ "${updated_family_name}" == "Anderson" ]]
then
print_message "Updated Patient data for ${PATIENT_ID} in FHIR sink verified successfully."
else
print_message "Updated Patient data verification for ${PATIENT_ID} in FHIR sink failed."
exit 6
fi
}


#################################################
# Function that counts resources in FHIR server and compares output to what is
# in the source FHIR server
Expand Down Expand Up @@ -493,7 +480,7 @@ setup "$@"
fhir_source_query
sleep 30
run_pipeline "FULL"

wait_for_completion 10
if [[ "${DWH_TYPE}" == "PARQUET" ]]
then
check_parquet false
Expand All @@ -503,49 +490,31 @@ else
test_fhir_sink "FULL"
fi

wait_for_completion
check_parquet false
test_fhir_sink "FULL"


clear

add_resource
update_resource
# Incremental run.
run_pipeline "INCREMENTAL"

wait_for_completion 3
if [[ "${DWH_TYPE}" == "PARQUET" ]]
then
check_parquet true
else
fhir_source_query
# Provide enough Buffer time for FULL pipeline to completely run before testing the sink FHIR server
sleep 300
test_fhir_sink "INCREMENTAL"
fi

if [[ "${DWH_TYPE}" == "PARQUET" ]]
then
validate_resource_tables
validate_resource_tables_data
validate_updated_resource

# View recreation run
# TODO add validation for the views as well
run_pipeline "VIEWS"
else
# Provide enough Buffer time for INCREMENTAL pipeline to completely run before testing the sink FHIR server
sleep 200

fhir_source_query
test_fhir_sink "INCREMENTAL"
validate_updated_resource_in_fhir_sink
fi

wait_for_completion
check_parquet true
fhir_source_query
test_fhir_sink "INCREMENTAL"

validate_resource_tables
validate_resource_tables_data
validate_updated_resource



print_message "END!!"
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2024 Google LLC
* Copyright 2020-2025 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
Loading

0 comments on commit f23ffee

Please sign in to comment.