Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .buildkite/scripts/run_models.sh
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,4 @@ dbt test --target "$db"
dbt run --vars '{amazon_ads__portfolio_history_enabled: false}' --target "$db" --full-refresh
dbt test --vars '{amazon_ads__portfolio_history_enabled: false}' --target "$db"

dbt run-operation fivetran_utils.drop_schemas_automation --target "$db"
dbt run-operation fivetran_utils.drop_schemas_automation --target "$db"
17 changes: 16 additions & 1 deletion integration_tests/dbt_project.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,29 @@ vars:
amazon_ads_ad_group_history_identifier: "ad_group_history_data"
amazon_ads_ad_group_level_report_identifier: "ad_group_level_report_data"
amazon_ads_advertised_product_report_identifier: "advertised_product_report_data"
amazon_ads_campaign_history_identifier: "campaign_history_data"
amazon_ads_campaign_history_identifier: "campaign_history"
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for testing

amazon_ads_campaign_level_report_identifier: "campaign_level_report_data"
amazon_ads_portfolio_history_identifier: "portfolio_history_data"
amazon_ads_product_ad_history_identifier: "product_ad_history_data"
amazon_ads_profile_identifier: "profile_data"
amazon_ads_keyword_history_identifier: "keyword_history_data"
amazon_ads_targeting_keyword_report_identifier: "targeting_keyword_report_data"
amazon_ads_search_term_ad_keyword_report_identifier: "search_term_ad_keyword_report_data"
# amazon_ads_union_schemas: ['zz_dbt_catherine', 'zz_dbt_catherine_02']

amazon_ads_using_custom_names: true
amazon_ads_custom_column_names:
amazon_ads_integration_tests_03:
ad_group_history:
creation_date: creationDate
default_bid: defaultBid
ad_group_level_report:
campaign_bidding_strategy: campaignBiddingStrategy
campaign_history:
name: campaignName
amazon_ads_integration_tests_04:
campaign_history:
name: campaignName02

amazon_ads__campaign_passthrough_metrics:
- name: sales_7_d
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/seeds/ad_group_history_data.csv
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
id,last_updated_date,_fivetran_synced,campaign_id,creation_date,default_bid,name,serving_status,state
id,last_updated_date,_fivetran_synced,campaign_id,creationDate,defaultBid,name,serving_status,state
421,2022-07-11 22:38:16.551000,2022-12-13 17:10:16.297000,2187,2022-07-11 22:38:16.551000,1.75,Red 7,CAMPAIGN_PAUSED,enabled
501,2022-11-09 14:37:05.332000,2022-12-13 17:11:08.594000,5555,2022-11-09 14:37:05.332000,1.45,All,AD_GROUP_STATUS_ENABLED,enabled
2 changes: 1 addition & 1 deletion integration_tests/seeds/ad_group_level_report_data.csv
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
ad_group_id,date,_fivetran_synced,campaign_bidding_strategy,clicks,cost,impressions
ad_group_id,date,_fivetran_synced,campaignBiddingStrategy,clicks,cost,impressions
501,2022-12-03,2022-12-13 11:35:56.905000,legacy,0,0.0,88
501,2022-12-04,2022-12-13 11:35:56.908000,legacy,0,0.0,111
501,2022-12-01,2022-12-13 11:35:56.899000,legacy,0,0.0,252
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
id,last_updated_date,_fivetran_synced,bidding_strategy,creation_date,budget,end_date,name,portfolio_id,profile_id,serving_status,start_date,state,targeting_type,budget_type,effective_budget
id,last_updated_date,_fivetran_synced,bidding_strategy,creation_date,budget,end_date,campaignName02,portfolio_id,profile_id,serving_status,start_date,state,targeting_type,budget_type,effective_budget
2187,2022-08-25 15:36:51.687000,2022-12-13 17:09:00.337000,autoForSales,2022-07-11 22:38:16.327000,2000,,Gold Leader,1138,66,CAMPAIGN_PAUSED,2022-07-11,paused,manual,daily,
2187,2022-07-14 11:58:58.857000,2022-08-09 11:09:43.874000,autoForSales,2022-07-11 22:38:16.327000,2000,,Gold Leader,1138,66,CAMPAIGN_PAUSED,2022-07-11,paused,manual,daily,
2187,2022-07-11 22:38:16.327000,2022-07-12 11:07:55.188000,autoForSales,2022-07-11 22:38:16.327000,300,,Red Leader,1138,66,CAMPAIGN_STATUS_ENABLED,2022-07-11,enabled,manual,daily,
Expand Down
2 changes: 1 addition & 1 deletion macros/staging/get_campaign_history_columns.sql
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
{"name": "end_date", "datatype": "date"},
{"name": "id", "datatype": dbt.type_string()},
{"name": "last_updated_date", "datatype": dbt.type_timestamp()},
{"name": "name", "datatype": dbt.type_string()},
{"name": "name", "datatype": dbt.type_string(), "alias": "campaign_name"},
{"name": "portfolio_id", "datatype": dbt.type_int()},
{"name": "profile_id", "datatype": dbt.type_int()},
{"name": "serving_status", "datatype": dbt.type_string()},
Expand Down
55 changes: 55 additions & 0 deletions macros/staging/resolve_column_names.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
{% macro resolve_column_names(_fivetran_table_name, columns, package_name='amazon_ads') %}
{#-
Resolves custom column name mappings for a given table across all configured schemas.

This macro creates a complete mapping of standard column names to their possible variants
(both custom names and standard names) for use in column standardization.

Args:
_fivetran_table_name: The base table name (e.g., 'ad_group_level_report')
columns: List of column objects with 'name' and 'datatype' attributes
package_name: Package name for variable lookup (default: 'amazon_ads')

Returns:
Dict mapping standard names to lists of possible column names.

Example output for campaign_bidding_strategy with custom mapping:
{
"campaign_bidding_strategy": ["campaignBiddingStrategy", "campaign_bidding_strategy"],
"ad_group_id": ["ad_group_id"],
"clicks": ["clicks"]
}
-#}
{{ return(adapter.dispatch('resolve_column_names', 'amazon_ads')(_fivetran_table_name, columns, package_name)) }}
{% endmacro %}

{% macro default__resolve_column_names(_fivetran_table_name, columns, package_name) %}

{% if var(package_name ~ '_using_custom_names', false) %}
{# Step 1: Get the custom column name configuration from variables #}
{% set custom_column_names = var(package_name ~ '_custom_column_names', {}) %}
{% set column_name_mappings = {} %}

{# Step 2: Process each expected column to build complete name mappings #}
{% for column in columns %}
{% set standard_name = column.name %}
{% set custom_names = [] %}

{# Step 3: Look through all schemas for custom names for this standard field #}
{% for schema_name, custom_cols_for_schema in custom_column_names.items() %}
{% set custom_name = custom_cols_for_schema.get(_fivetran_table_name, {}).get(standard_name, false) %}
{% do custom_names.append(custom_name) if custom_name and custom_name not in custom_names %}
{% endfor %}

{# Step 4: Create complete name list (custom names first, then standard as fallback) #}
{% set all_names = custom_names + [standard_name] if custom_names else [standard_name] %}
{% do column_name_mappings.update({standard_name: all_names}) %}
{% endfor %}

{{ return(column_name_mappings) }}
{% else %}
{# Custom names not enabled - return empty dict #}
{{ return({}) }}
{% endif %}

{% endmacro %}
71 changes: 71 additions & 0 deletions macros/staging/standardize_column_names.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
{% macro standardize_column_names(tmp_relation, table_name, base_columns, package_name='amazon_ads') %}
{#-
Standardizes column names from source tables with mixed naming conventions.

This macro handles both column name standardization (coalescing custom names to
standard names) and missing column filling (creating NULLs for expected columns
that don't exist in the source).

Args:
tmp_relation: The source relation (typically from tmp staging model)
table_name: The base table name for custom name lookups
base_columns: List of expected column objects with 'name' and 'datatype'
package_name: Package name for variable lookup (default: 'amazon_ads')

Returns:
SQL select statement with standardized column names.

Example output:
select
coalesce("campaignBiddingStrategy", "campaign_bidding_strategy") as campaign_bidding_strategy,
"ad_group_id" as ad_group_id,
cast(null as integer) as missing_column
from tmp_relation
-#}
{{ return(adapter.dispatch('standardize_column_names', 'amazon_ads')(tmp_relation, table_name, base_columns, package_name)) }}
{% endmacro %}

{% macro default__standardize_column_names(tmp_relation, table_name, base_columns, package_name) %}

{# Step 1: Inspect the source relation to see what columns actually exist #}
{%- set actual_columns = adapter.get_columns_in_relation(tmp_relation) %}
{%- set actual_column_names = actual_columns | map(attribute='name') | list %}

{# Step 2: Get custom name mappings for this table if custom names are enabled #}
{%- set custom_mappings = amazon_ads.resolve_column_names(table_name, base_columns, package_name) if var(package_name ~ '_using_custom_names', false) else {} %}

{# Step 3: Process each expected column to generate standardized select statements #}
{%- for column in base_columns %}
{%- set standard_name = column.name %}
{%- set column_datatype = column.datatype %}
{%- set column_alias = column.alias if column.alias else standard_name %}

{%- if standard_name in custom_mappings -%}
{# Step 3a: Column has custom mappings - check which variants actually exist #}
{%- set available_variants = [] -%}
{%- for variant in custom_mappings[standard_name] if variant|lower in actual_column_names|map('lower') -%}
{%- do available_variants.append(variant) -%}
{%- endfor -%}

{%- if available_variants|length > 1 -%}
coalesce({{ available_variants | join(', ') }})
{%- elif available_variants|length == 1 -%}
{{ available_variants[0] }}
{%- else -%}
cast(null as {{ column_datatype }})
{%- endif %}

{%- else -%}
{# Step 3b: Column has no custom mappings - check if standard name exists #}
{%- if standard_name in actual_column_names -%}
{{ standard_name }}
{%- else -%}
cast(null as {{ column_datatype }})
{%- endif %}
{% endif %}

as {{ column_alias }}{{ ',' if not loop.last }}

{% endfor %}

{% endmacro %}
30 changes: 15 additions & 15 deletions models/staging/stg_amazon_ads__ad_group_history.sql
Original file line number Diff line number Diff line change
@@ -1,33 +1,33 @@
{{ config(enabled=var('ad_reporting__amazon_ads_enabled', True)) }}

{% set stg_tmp_relation = ref('stg_amazon_ads__ad_group_history_tmp') %}

with base as (

select *
from {{ ref('stg_amazon_ads__ad_group_history_tmp') }}
select *
from {{ stg_tmp_relation }}
),

fields as (

select
{{
fivetran_utils.fill_staging_columns(
source_columns=adapter.get_columns_in_relation(ref('stg_amazon_ads__ad_group_history_tmp')),
staging_columns=get_ad_group_history_columns()
)
}}

{{ fivetran_utils.source_relation(
union_schema_variable='amazon_ads_union_schemas',
union_database_variable='amazon_ads_union_databases')
select
{{ amazon_ads.standardize_column_names(
tmp_relation=stg_tmp_relation,
table_name='ad_group_history',
base_columns=get_ad_group_history_columns())
}}

from base
{{ fivetran_utils.source_relation(
union_schema_variable='amazon_ads_union_schemas',
union_database_variable='amazon_ads_union_databases')
}}
from base
),

final as (

select
source_relation,
source_relation,
cast(id as {{ dbt.type_string() }}) as ad_group_id,
cast(campaign_id as {{ dbt.type_string() }}) as campaign_id,
creation_date,
Expand Down
33 changes: 17 additions & 16 deletions models/staging/stg_amazon_ads__ad_group_level_report.sql
Original file line number Diff line number Diff line change
@@ -1,32 +1,33 @@
{{ config(enabled=var('ad_reporting__amazon_ads_enabled', True)) }}

{% set stg_tmp_relation = ref('stg_amazon_ads__ad_group_level_report_tmp') %}

with base as (

select *
from {{ ref('stg_amazon_ads__ad_group_level_report_tmp') }}
from {{ stg_tmp_relation }}
),

fields as (

select
{{
fivetran_utils.fill_staging_columns(
source_columns=adapter.get_columns_in_relation(ref('stg_amazon_ads__ad_group_level_report_tmp')),
staging_columns=get_ad_group_level_report_columns()
)
}}

{{ fivetran_utils.source_relation(
union_schema_variable='amazon_ads_union_schemas',
union_database_variable='amazon_ads_union_databases')
select
{{ amazon_ads.standardize_column_names(
tmp_relation=stg_tmp_relation,
table_name='ad_group_level_report',
base_columns=get_ad_group_level_report_columns())
}}

from base
{{ fivetran_utils.source_relation(
union_schema_variable='amazon_ads_union_schemas',
union_database_variable='amazon_ads_union_databases')
}}
from base
),

final as (

select
source_relation,
source_relation,
cast(ad_group_id as {{ dbt.type_string() }}) as ad_group_id,
campaign_bidding_strategy,
clicks,
Expand All @@ -35,8 +36,8 @@ final as (
impressions,
purchases_30_d,
sales_30_d
{{ amazon_ads_fill_pass_through_columns(pass_through_fields=var('amazon_ads__ad_group_passthrough_metrics'), except=['purchases_30_d', 'sales_30_d']) }}

{{ amazon_ads_fill_pass_through_columns(pass_through_fields=var('amazon_ads__ad_group_passthrough_metrics'), except=['purchases_30_d', 'sales_30_d']) -}}

from fields
)
Expand Down
32 changes: 16 additions & 16 deletions models/staging/stg_amazon_ads__advertised_product_report.sql
Original file line number Diff line number Diff line change
@@ -1,33 +1,33 @@
{{ config(enabled=var('ad_reporting__amazon_ads_enabled', True)) }}

{% set stg_tmp_relation = ref('stg_amazon_ads__advertised_product_report_tmp') %}

with base as (

select *
from {{ ref('stg_amazon_ads__advertised_product_report_tmp') }}
select *
from {{ stg_tmp_relation }}
),

fields as (

select
{{
fivetran_utils.fill_staging_columns(
source_columns=adapter.get_columns_in_relation(ref('stg_amazon_ads__advertised_product_report_tmp')),
staging_columns=get_advertised_product_report_columns()
)
}}

{{ fivetran_utils.source_relation(
union_schema_variable='amazon_ads_union_schemas',
union_database_variable='amazon_ads_union_databases')
select
{{ amazon_ads.standardize_column_names(
tmp_relation=stg_tmp_relation,
table_name='advertised_product_report',
base_columns=get_advertised_product_report_columns())
}}

from base
{{ fivetran_utils.source_relation(
union_schema_variable='amazon_ads_union_schemas',
union_database_variable='amazon_ads_union_databases')
}}
from base
),

final as (

select
source_relation,
source_relation,
cast(ad_id as {{ dbt.type_string() }}) as ad_id,
cast(ad_group_id as {{ dbt.type_string() }}) as ad_group_id,
advertised_asin,
Expand All @@ -42,7 +42,7 @@ final as (
impressions,
purchases_30_d,
sales_30_d

{{ amazon_ads_fill_pass_through_columns(pass_through_fields=var('amazon_ads__advertised_product_passthrough_metrics'), except=['purchases_30_d', 'sales_30_d']) }}

from fields
Expand Down
Loading