Skip to content
Open
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ Contributions are welcome! Please see [CONTRIBUTING.md](CONTRIBUTING.md) and [MA

The package currently supports

- AWS Athena :white_check_mark:
- Databricks :white_check_mark:
- Spark :white_check_mark:
- Snowflake :white_check_mark:
Expand Down
6 changes: 3 additions & 3 deletions dbt_project.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ clean-targets: # folders to be removed by `dbt clean`
models:
dbt_artifacts:
+materialized: view
+file_format: '{{ "delta" if target.name == "databricks" else "" }}'
+file_format: '{{ "delta" if target.name == "databricks" else "iceberg" if target.type == "athena" else "" }}'
sources:
+materialized: incremental
+full_refresh: false
+persist_docs:
# Databricks and SQL Server don't offer column-level support for persisting docs
columns: '{{ target.name != "databricks" and target.type != "sqlserver" }}'
relation: '{{ target.type != "sqlserver" }}'
columns: '{{ target.name != "databricks" and target.type != "sqlserver" and target.type != "athena" }}'
relation: '{{ target.type != "sqlserver" and target.type != "athena" }}'
+as_columnstore: False
4 changes: 4 additions & 0 deletions integration_test_project/example-env.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ export DBT_ENV_SECRET_DATABRICKS_TOKEN=
export DBT_ENV_SECRET_GCP_PROJECT=
export DBT_ENV_SPARK_DRIVER_PATH= # /Library/simba/spark/lib/libsparkodbc_sbu.dylib on a Mac
export DBT_ENV_SPARK_ENDPOINT= # The endpoint ID from the Databricks HTTP path
export DBT_ENV_SECRET_ATHENA_S3_STAGING_DIR=
export DBT_ENV_SECRET_ATHENA_S3_DATA_DIR=
export DBT_ENV_SECRET_ATHENA_REGION=
export DBT_ENV_SECRET_ATHENA_DATABASE=

# dbt environment variables, change these
export DBT_VERSION="1_5_0"
Expand Down
4 changes: 2 additions & 2 deletions integration_test_project/models/incremental.sql
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ select
{% if is_incremental() %}

1 as id,
'banana' as fruit
cast('banana' as {{ dbt.type_string() }}) as fruit

{% else %}

2 as id,
'apple' as fruit
cast('apple' as {{ dbt.type_string() }}) as fruit

{% endif %}
3 changes: 2 additions & 1 deletion integration_test_project/models/microbatch.sql
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
'field': 'transaction_ts',
'data_type': 'datetime',
'granularity': 'day'
}
},
table_type = 'iceberg' if target.type == 'athena' else none,
)
}}

Expand Down
9 changes: 9 additions & 0 deletions integration_test_project/profiles.yml
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,12 @@ dbt_artifacts:
schema: default
port: 8080
threads: 8
athena:
type: athena
s3_staging_dir: "{{ env_var('DBT_ENV_SECRET_ATHENA_S3_STAGING_DIR') }}"
s3_data_dir: "{{ env_var('DBT_ENV_SECRET_ATHENA_S3_DATA_DIR') }}"
region_name: "{{ env_var('DBT_ENV_SECRET_ATHENA_REGION', 'us-east-1') }}"
database: "{{ env_var('DBT_ENV_SECRET_ATHENA_DATABASE') }}"
schema: dbt_artifacts_test_commit_{{ env_var('DBT_VERSION', '') }}_{{ env_var('GITHUB_SHA_OVERRIDE', '') if env_var('GITHUB_SHA_OVERRIDE', '') else env_var('GITHUB_SHA') }}
work_group: dbt
threads: 8
4 changes: 4 additions & 0 deletions macros/database_specific_helpers/string_functions.sql
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,7 @@
{% macro trino__str_left(col, length) %}
substring({{ col }}, 1, {{ length }})
{% endmacro %}

{% macro athena__str_left(col, length) %}
substring({{ col }}, 1, {{ length }})
{% endmacro %}
8 changes: 8 additions & 0 deletions macros/database_specific_helpers/type_helpers.sql
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,11 @@
{% macro trino__type_numeric() %}
double
{% endmacro %}

{% macro athena__type_array() %}
array(varchar)
{% endmacro %}

{% macro athena__type_numeric() %}
double
{% endmacro %}
12 changes: 8 additions & 4 deletions macros/integration_tests/safe_cast.sql
Original file line number Diff line number Diff line change
Expand Up @@ -8,31 +8,35 @@
'bigquery': 'STRING',
'snowflake': 'VARCHAR',
'databricks': 'STRING',
'trino': 'VARCHAR'
'trino': 'VARCHAR',
'athena': 'VARCHAR'
},
'integer': {
'postgres': 'NUMERIC',
'sqlserver': 'FLOAT',
'bigquery': 'FLOAT64',
'snowflake': 'FLOAT',
'databricks': 'DOUBLE',
'trino': 'DOUBLE'
'trino': 'DOUBLE',
'athena': 'DOUBLE'
},
'date': {
'postgres': 'DATE',
'sqlserver': 'DATE',
'bigquery': 'DATE',
'snowflake': 'DATE',
'databricks': 'DATE',
'trino': 'DATE'
'trino': 'DATE',
'athena': 'DATE'
},
'timestamp': {
'postgres': 'TIMESTAMP',
'sqlserver': 'DATETIME2',
'bigquery': 'TIMESTAMP',
'snowflake': 'TIMESTAMP',
'databricks': 'TIMESTAMP',
'trino': 'TIMESTAMP'
'trino': 'TIMESTAMP',
'athena': 'TIMESTAMP'
}
} %}

Expand Down
34 changes: 34 additions & 0 deletions macros/upload_individual_datasets/upload_exposures.sql
Original file line number Diff line number Diff line change
Expand Up @@ -189,3 +189,37 @@
{{ return("") }}
{% endif %}
{%- endmacro %}

{% macro athena__get_exposures_dml_sql(exposures) -%}
{% if exposures != [] %}

{% set exposure_values %}
{% for exposure in exposures -%}
(
'{{ invocation_id }}', {# command_invocation_id #}
'{{ exposure.unique_id | replace("'","''") }}', {# node_id #}
TIMESTAMP '{{ run_started_at }}', {# run_started_at #}
'{{ exposure.name | replace("'","''") }}', {# name #}
'{{ exposure.type }}', {# type #}
'{{ tojson(exposure.owner) | replace("'","''") }}', {# owner #}
'{{ exposure.maturity }}', {# maturity #}
'{{ exposure.original_file_path }}', {# path #}
'{{ exposure.description | replace("'","''") }}', {# description #}
'{{ exposure.url }}', {# url #}
'{{ exposure.package_name }}', {# package_name #}
ARRAY {{ exposure.depends_on.nodes}}, {# depends_on_nodes #}
ARRAY {{ exposure.tags}}, {# tags #}
{% if var('dbt_artifacts_exclude_all_results', false) %}
null
{% else %}
'{{ tojson(exposure) | replace("'","''") }}' {# all_results #}
{% endif %}
)
{%- if not loop.last %},{%- endif %}
{%- endfor %}
{% endset %}
{{ exposure_values }}
{% else %}
{{ return("") }}
{% endif %}
{%- endmacro %}
61 changes: 61 additions & 0 deletions macros/upload_individual_datasets/upload_invocations.sql
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,67 @@

{% endmacro -%}

{% macro athena__get_invocations_dml_sql(invocation_args=invocation_args_dict) -%}
{% set invocation_values %}
(
'{{ invocation_id }}', {# command_invocation_id #}
'{{ dbt_version }}', {# dbt_version #}
'{{ project_name }}', {# project_name #}
TIMESTAMP '{{ run_started_at }}', {# run_started_at #}
'{{ flags.WHICH }}', {# dbt_command #}
{{ flags.FULL_REFRESH }}, {# full_refresh_flag #}
'{{ target.profile_name }}', {# target_profile_name #}
'{{ target.name }}', {# target_name #}
'{{ target.schema }}', {# target_schema #}
{{ target.threads }}, {# target_threads #}

'{{ env_var("DBT_CLOUD_PROJECT_ID", "") }}', {# dbt_cloud_project_id #}
'{{ env_var("DBT_CLOUD_JOB_ID", "") }}', {# dbt_cloud_job_id #}
'{{ env_var("DBT_CLOUD_RUN_ID", "") }}', {# dbt_cloud_run_id #}
'{{ env_var("DBT_CLOUD_RUN_REASON_CATEGORY", "") }}', {# dbt_cloud_run_reason_category #}
'{{ env_var('DBT_CLOUD_RUN_REASON', '') | replace("'","''") }}', {# dbt_cloud_run_reason #}

{% if var('env_vars', none) %}
{% set env_vars_dict = {} %}
{% for env_variable in var('env_vars') %}
{% do env_vars_dict.update({env_variable: (env_var(env_variable, ''))}) %}
{% endfor %}
'{{ tojson(env_vars_dict) | replace("'","''") }}', {# env_vars #}
{% else %}
null, {# env_vars #}
{% endif %}

{% if var('dbt_vars', none) %}
{% set dbt_vars_dict = {} %}
{% for dbt_var in var('dbt_vars') %}
{% do dbt_vars_dict.update({dbt_var: (var(dbt_var, ''))}) %}
{% endfor %}
'{{ tojson(dbt_vars_dict) | replace("'","''") }}', {# dbt_vars #}
{% else %}
null, {# dbt_vars #}
{% endif %}

{% if invocation_args_dict.vars %}
{# vars - different format for pre v1.5 (yaml vs list) #}
{% if invocation_args_dict.vars is string %}
{% set parsed_inv_args_vars = fromyaml(invocation_args_dict.vars) %}
{% do invocation_args_dict.update({'vars': parsed_inv_args_vars}) %}
{% endif %}
{% endif %}

'{{ invocation_args_dict | replace("'","''") }}', {# invocation_args #}

{% set metadata_env = {} %}
{% for key, value in dbt_metadata_envs.items() %}
{% do metadata_env.update({key: value}) %}
{% endfor %}
'{{ tojson(metadata_env) | replace("'","''") }}' {# dbt_custom_envs #}
)
{% endset %}
{{ invocation_values }}

{% endmacro -%}


{% macro sqlserver__get_invocations_dml_sql(invocation_args=invocation_args_dict) -%}
{% set invocation_values %}
Expand Down
49 changes: 49 additions & 0 deletions macros/upload_individual_datasets/upload_model_executions.sql
Original file line number Diff line number Diff line change
Expand Up @@ -295,3 +295,52 @@
{{ return("") }}
{% endif %}
{%- endmacro %}

{% macro athena__get_model_executions_dml_sql(models) -%}
{% if models != [] %}
{% set model_execution_values %}
{% for model in models -%}
(
'{{ invocation_id }}', {# command_invocation_id #}
'{{ model.node.unique_id }}', {# node_id #}
TIMESTAMP '{{ run_started_at }}', {# run_started_at #}

{% set config_full_refresh = model.node.config.full_refresh %}
{% if config_full_refresh is none %}
{% set config_full_refresh = flags.FULL_REFRESH %}
{% endif %}

{{ config_full_refresh }}, {# was_full_refresh #}
'{{ model.thread_id }}', {# thread_id #}
'{{ model.status }}', {# status #}

{% set compile_started_at = (model.timing | selectattr("name", "eq", "compile") | first | default({}))["started_at"] %}
{% if compile_started_at %}TIMESTAMP '{{ compile_started_at }}'{% else %}null{% endif %}, {# compile_started_at #}
{% set query_completed_at = (model.timing | selectattr("name", "eq", "execute") | first | default({}))["completed_at"] %}
{% if query_completed_at %}TIMESTAMP '{{ query_completed_at }}'{% else %}null{% endif %}, {# query_completed_at #}

{{ model.execution_time }}, {# total_node_runtime #}

{% if model.adapter_response.rows_affected is none or model.adapter_response.rows_affected is not defined %}

null
{% else %}
{{ model.adapter_response.rows_affected }}
{% endif %}
, {# rows_affected #}

'{{ model.node.config.materialized }}', {# materialization #}
'{{ model.node.schema }}', {# schema #}
'{{ model.node.name }}', {# name #}
'{{ model.node.alias }}', {# alias #}
'{{ model.message | replace("'", "''") }}', {# message #}
'{{ tojson(model.adapter_response) | replace("'", "''") }}' {# adapter_response #}
)
{%- if not loop.last %},{%- endif %}
{%- endfor %}
{% endset %}
{{ model_execution_values }}
{% else %}
{{ return("") }}
{% endif %}
{%- endmacro %}
35 changes: 35 additions & 0 deletions macros/upload_individual_datasets/upload_models.sql
Original file line number Diff line number Diff line change
Expand Up @@ -203,3 +203,38 @@
{{ return("") }}
{% endif %}
{%- endmacro %}

{% macro athena__get_models_dml_sql(models) -%}
{% if models != [] %}
{% set model_values %}
{% for model in models -%}
{% do model.pop('raw_code', None) %}
(
'{{ invocation_id }}', {# command_invocation_id #}
'{{ model.unique_id }}', {# node_id #}
TIMESTAMP '{{ run_started_at }}', {# run_started_at #}
'{{ model.database }}', {# database #}
'{{ model.schema }}', {# schema #}
'{{ model.name }}', {# name #}
ARRAY {{ model.depends_on.nodes }}, {# depends_on_nodes #}
'{{ model.package_name }}', {# package_name #}
'{{ model.original_file_path }}', {# path #}
'{{ model.checksum.checksum }}', {# checksum #}
'{{ model.config.materialized }}', {# materialization #}
ARRAY {{ model.tags }}, {# tags #}
'{{ tojson(model.config.meta) | replace("'", "''") }}', {# meta #}
'{{ model.alias }}', {# alias #}
{% if var('dbt_artifacts_exclude_all_results', false) %}
null
{% else %}
'{{ tojson(model) | replace("'", "''") }}' {# all_results #}
{% endif %}
)
{%- if not loop.last %},{%- endif %}
{%- endfor %}
{% endset %}
{{ model_values }}
{% else %}
{{ return("") }}
{% endif %}
{%- endmacro %}
48 changes: 48 additions & 0 deletions macros/upload_individual_datasets/upload_seed_executions.sql
Original file line number Diff line number Diff line change
Expand Up @@ -308,3 +308,51 @@
{{ return("") }}
{% endif %}
{% endmacro -%}

{% macro athena__get_seed_executions_dml_sql(seeds) -%}
{% if seeds != [] %}
{% set seed_execution_values %}
{% for model in seeds -%}
(
'{{ invocation_id }}', {# command_invocation_id #}
'{{ model.node.unique_id }}', {# node_id #}
TIMESTAMP '{{ run_started_at }}', {# run_started_at #}

{% set config_full_refresh = model.node.config.full_refresh %}
{% if config_full_refresh is none %}
{% set config_full_refresh = flags.FULL_REFRESH %}
{% endif %}
{{ config_full_refresh }}, {# was_full_refresh #}

'{{ model.thread_id }}', {# thread_id #}
'{{ model.status }}', {# status #}

{% set compile_started_at = (model.timing | selectattr("name", "eq", "compile") | first | default({}))["started_at"] %}
{% if compile_started_at %}TIMESTAMP '{{ compile_started_at }}'{% else %}null{% endif %}, {# compile_started_at #}
{% set query_completed_at = (model.timing | selectattr("name", "eq", "execute") | first | default({}))["completed_at"] %}
{% if query_completed_at %}TIMESTAMP '{{ query_completed_at }}'{% else %}null{% endif %}, {# query_completed_at #}

{{ model.execution_time }}, {# total_node_runtime #}

{% if model.adapter_response.rows_affected is none or model.adapter_response.rows_affected is not defined %}
null
{% else %}
{{ model.adapter_response.rows_affected }}
{% endif %}
, {# rows_affected #}

'{{ model.node.config.materialized }}', {# materialization #}
'{{ model.node.schema }}', {# schema #}
'{{ model.node.name }}', {# name #}
'{{ model.node.alias }}', {# alias #}
'{{ model.message | replace("'", "''") }}', {# message #}
'{{ tojson(model.adapter_response) | replace("'", "''") }}' {# adapter_response #}
)
{%- if not loop.last %},{%- endif %}
{%- endfor %}
{% endset %}
{{ seed_execution_values }}
{% else %}
{{ return("") }}
{% endif %}
{% endmacro -%}
Loading