diff --git a/integration_tests/dbt_project.yml b/integration_tests/dbt_project.yml index 1ddbdde9..33adebdf 100644 --- a/integration_tests/dbt_project.yml +++ b/integration_tests/dbt_project.yml @@ -43,7 +43,7 @@ seeds: destination: +column_types: created_at: timestamp - id: "{{ 'string' if target.name == 'bigquery' else 'varchar' }}" + id: "{{ 'string' if target.type == 'bigquery' else 'varchar' }}" destination_membership: +column_types: activated_at: timestamp @@ -51,17 +51,18 @@ seeds: log: +column_types: time_stamp: timestamp - transformation_id: "{{ 'string' if target.name == 'bigquery' else 'varchar' }}" + transformation_id: "{{ 'string' if target.type == 'bigquery' else 'varchar' }}" + message_event: "{{ 'string' if target.type == 'bigquery' else 'varchar' }}" transformation: +column_types: created_at: timestamp - destination_id: "{{ 'string' if target.name == 'bigquery' else 'varchar' }}" - id: "{{ 'string' if target.name == 'bigquery' else 'varchar' }}" + destination_id: "{{ 'string' if target.type == 'bigquery' else 'varchar' }}" + id: "{{ 'string' if target.type == 'bigquery' else 'varchar' }}" trigger_table: +quote_columns: "{{ true if target.type in ('redshift', 'postgres') else false }}" +enabled: "{{ true if target.type != 'snowflake' else false }}" +column_types: - transformation_id: "{{ 'string' if target.name == 'bigquery' else 'varchar' }}" + transformation_id: "{{ 'string' if target.type == 'bigquery' else 'varchar' }}" trigger_table_snowflake: +enabled: "{{ true if target.type == 'snowflake' else false }}" user: diff --git a/integration_tests/seeds/log.csv b/integration_tests/seeds/log.csv index e5bd6d96..1e97d244 100644 --- a/integration_tests/seeds/log.csv +++ b/integration_tests/seeds/log.csv @@ -22,4 +22,8 @@ intrinsic_departed,2021-12-09 14:26:29.860,2021-12-09 20:30:53.904,intrinsic_dep intrinsic_departed,2021-12-09 14:26:29.814,2021-12-09 20:30:53.903,intrinsic_departed,INFO,"{""table"":""user_insights""}",write_to_table_start, intrinsic_departed,2021-12-09 14:26:29.744,2021-12-09 20:30:53.901,intrinsic_departed,INFO,"{""table"":""user_history""}",write_to_table_start, intrinsic_departed,2021-12-09 14:26:29.719,2021-12-09 20:30:53.878,intrinsic_departed,INFO,"{""table"":""media_insights""}",write_to_table_start, -intrinsic_departed,2021-12-09 14:26:05.907,2021-12-09 20:30:53.778,intrinsic_departed,INFO,,sync_start, \ No newline at end of file +intrinsic_departed,2021-12-09 14:26:05.907,2021-12-09 20:30:53.778,intrinsic_departed,INFO,,sync_start, +intrinsic_departed,2021-12-10 14:26:29.744,2021-12-09 20:30:53.901,intrinsic_departed,INFO,"{""schema"":""instagram_business"",""name"":""activity_add_to_fivetran_user""}",create_table, +intrinsic_departed,2021-12-19 14:26:29.719,2021-12-09 20:30:53.878,intrinsic_departed,INFO,"{""type"":""ADD_COLUMN"",""table"":""lead""}",alter_table, +intrinsic_departed,2021-12-11 14:26:05.907,2021-12-09 20:30:53.778,intrinsic_departed,INFO,"{""connectorId"":""pg"",""properties"":{""REMOVAL"":[{""schema"":""public"",""tables"":[""active_rows_estimate""]}]}}",change_schema_config, +intrinsic_departed,2021-12-10 14:26:05.907,2021-12-09 20:30:53.778,intrinsic_departed,INFO,"{""schema"":""fivetran_log_2""}",create_schema, \ No newline at end of file diff --git a/models/fivetran_log__schema_changelog.sql b/models/fivetran_log__schema_changelog.sql index f00f5745..34be2395 100644 --- a/models/fivetran_log__schema_changelog.sql +++ b/models/fivetran_log__schema_changelog.sql @@ -1,9 +1,39 @@ +{{ config( + materialized='incremental', + unique_key='unique_schema_change_key', + partition_by={ + 'field': 'created_at', + 'data_type': 'timestamp', + 'granularity': 'day' + } if target.type == 'bigquery' else none, + incremental_strategy = 'merge', + file_format = 'delta' +) }} + with schema_changes as ( select * from {{ ref('stg_fivetran_log__log') }} where event_subtype in ('create_table', 'alter_table', 'create_schema', 'change_schema_config') + + {% if is_incremental() %} + + -- Capture the latest timestamp in a call statement instead of a subquery for optimizing BQ costs on incremental runs + {%- call statement('max_schema_change', fetch_result=True) -%} + select date(max(created_at)) from {{ this }} + {%- endcall -%} + + -- load the result from the above query into a new variable + {%- set query_result = load_result('max_schema_change') -%} + + -- the query_result is stored as a dataframe. Therefore, we want to now store it as a singular value. + {%- set max_schema_change = query_result['data'][0][0] -%} + + -- compare the new batch of data to the latest sync already stored in this model + and date(created_at) >= '{{ max_schema_change }}' + + {% endif %} ), connector as ( @@ -20,7 +50,8 @@ add_connector_info as ( connector.destination_id, connector.destination_name - from schema_changes join connector using(connector_id) + from schema_changes join + connector on schema_changes.connector_id = connector.connector_id ), final as ( @@ -41,10 +72,12 @@ final as ( case when event_subtype = 'create_schema' or event_subtype = 'create_table' then {{ fivetran_utils.json_extract(string='message_data', string_path='schema') }} - else null end as schema_name + else null end as schema_name, + + {{ dbt_utils.surrogate_key(['connector_id', 'destination_id', 'created_at']) }} as unique_schema_change_key + from add_connector_info ) -select * from final -order by created_at desc, connector_id \ No newline at end of file +select * from final \ No newline at end of file