Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -160,3 +160,8 @@ cython_debug/
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/

# Data Load Hub
.sources
.config
.secrets
8 changes: 8 additions & 0 deletions cz.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
commitizen:
major_version_zero: true
name: cz_conventional_commits
tag_format: $version
update_changelog_on_bump: true
version_provider: scm
version_scheme: semver
28 changes: 28 additions & 0 deletions dlt_hub/.dlt/.sources
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
engine_version: 1
sources:
shopify_dlt:
is_dirty: false
last_commit_sha: e7c6683aeab6db7ebe1ce7d16bf5f6a4cff57325
last_commit_timestamp: '2024-10-02T18:46:27+02:00'
files:
shopify_dlt/__init__.py:
commit_sha: e7c6683aeab6db7ebe1ce7d16bf5f6a4cff57325
git_sha: 788d8fdd227d87bf4cdad3e258e4de1f390efc10
sha3_256: 66fa8ce2165da8d01aac2dd3e4b76a908d8c0aba5849755a6eab0ccbf86b61fd
shopify_dlt/README.md:
commit_sha: e7c6683aeab6db7ebe1ce7d16bf5f6a4cff57325
git_sha: 67e95e660cf9018cc3cac5defcc3a6d24590c8ab
sha3_256: 4b6c4ebe1bc0e7bca9506d7b2d5bc68bdfcc68f969d8371631e5db490ebb3abf
shopify_dlt/settings.py:
commit_sha: e7c6683aeab6db7ebe1ce7d16bf5f6a4cff57325
git_sha: 0d4cef7e5723f7515c90098358f980d5b4ea4877
sha3_256: a1b47666183b7d03050ee67103affb3f93584d02ba21b4a4f3bd6a61176b9dc9
shopify_dlt/exceptions.py:
commit_sha: e7c6683aeab6db7ebe1ce7d16bf5f6a4cff57325
git_sha: c3acfb9c6f4100d7b7a5cf34f1308a4b6d3efe0f
sha3_256: 2956c1577e5bcebb8f4ebed710e699571e0cad76b9b91f47d1dc7a07df1b5496
shopify_dlt/helpers.py:
commit_sha: e7c6683aeab6db7ebe1ce7d16bf5f6a4cff57325
git_sha: f4775eee97b448c81d1db739d54853ad4d07ff39
sha3_256: b679034734eeb71651f849ad61938fd0d9351a0251305a03b9094c401a5517f6
dlt_version_constraint: '>=0.5.1'
6 changes: 6 additions & 0 deletions dlt_hub/.dlt/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
[sources.shopify_dlt]
shop_url = "https://sam-cook-ltd.myshopify.com/" # please set me up!
organization_id = 3920015 # please set me up!

[runtime]
dlthub_telemetry = true
10 changes: 10 additions & 0 deletions dlt_hub/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# ignore secrets, virtual environments and typical python compilation artifacts
secrets.toml
# ignore basic python artifacts
.env
**/__pycache__/
**/*.py[cod]
**/*$py.class
# ignore duckdb
*.duckdb
*.wal
1 change: 1 addition & 0 deletions dlt_hub/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
dlt[duckdb]>=0.5.1
61 changes: 61 additions & 0 deletions dlt_hub/shopify_dlt/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# Shopify

Shopify is an easy-to-use e-commerce solution that allows anyone to create and manage their own online store. This `dlt` verified source is designed to efficiently load data from multiple endpoints that are:

| Endpoint | Description |
| --- | --- |
| customers | individuals or entities who have created accounts on a Shopify-powered online store |
| orders | transactions made by customers on an online store |
| products | the individual items or goods that are available for sale |

## Initialize the pipeline

```bash
dlt init shopify_dlt duckdb
```

Here, we chose Duckdb as the destination. Alternatively, you can also choose redshift, duckdb, or any of the other [destinations](https://dlthub.com/docs/dlt-ecosystem/destinations/).

## Grab credentials

To grab your Shopify credentials and initialise your verified source, see the [full documentation here](https://dlthub.com/docs/dlt-ecosystem/verified-sources/shopify).

## Add credentials

1. Open `.dlt/secrets.toml`.
2. Enter the API access token:

```toml
[sources.shopify]
private_app_password=" Please set me up !" #Admin API access token
```

3. Enter credentials for your chosen destination as per the [docs.](https://dlthub.com/docs/dlt-ecosystem/destinations/)
4. Inside the `.dlt` folder, you'll find another file called `config.toml`, where you can store your Shopify URL. The `config.toml` looks like this:
```toml
[sources.shopify_dlt]
shop_url = "Please set me up!
```

5. Replace `shop_url` with the URL of your Shopify store. For example "https://shop-123.myshopify.com/".

## Running the pipeline

1. Before running the pipeline, ensure that you have installed all the necessary dependencies by running the command:
```bash
pip install -r requirements.txt
```

2. Now the pipeline can be run by using the command:
```bash
python3 shopify_dlt_pipeline.py
```
3. To make sure that everything is loaded as expected, use the command:
```bash
dlt pipeline <pipeline_name> show
```

For example, the pipeline_name for the above pipeline is `shopify`, you may also use any custom name instead.

💡 To explore additional customizations for this pipeline, we recommend referring to the official DLT Shopify verified documentation. It provides comprehensive information and guidance on how to further customize and tailor the pipeline to suit your specific needs. You can find the DLT Shopify documentation in [Setup Guide: Shopify](https://dlthub.com/docs/dlt-ecosystem/verified-sources/shopify).

228 changes: 228 additions & 0 deletions dlt_hub/shopify_dlt/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
"""Fetches Shopify Orders and Products."""

from typing import Any, Dict, Iterator, Iterator, Optional, Iterable

import dlt

from dlt.sources import DltResource
from dlt.common.typing import TDataItem, TAnyDateTime
from dlt.common.time import ensure_pendulum_datetime
from dlt.common import pendulum
from dlt.common import jsonpath as jp

from .settings import (
DEFAULT_API_VERSION,
FIRST_DAY_OF_MILLENNIUM,
DEFAULT_ITEMS_PER_PAGE,
DEFAULT_PARTNER_API_VERSION,
)
from .helpers import ShopifyApi, TOrderStatus, ShopifyPartnerApi


@dlt.source(name="shopify")
def shopify_source(
private_app_password: str = dlt.secrets.value,
api_version: str = DEFAULT_API_VERSION,
shop_url: str = dlt.config.value,
start_date: TAnyDateTime = FIRST_DAY_OF_MILLENNIUM,
end_date: Optional[TAnyDateTime] = None,
created_at_min: TAnyDateTime = FIRST_DAY_OF_MILLENNIUM,
items_per_page: int = DEFAULT_ITEMS_PER_PAGE,
order_status: TOrderStatus = "any",
) -> Iterable[DltResource]:
"""
The source for the Shopify pipeline. Available resources are products, orders, and customers.

`start_time` argument can be used on its own or together with `end_time`. When both are provided
data is limited to items updated in that time range.
The range is "half-open", meaning elements equal and newer than `start_time` and elements older than `end_time` are included.
All resources opt-in to use Airflow scheduler if run as Airflow task

Args:
private_app_password: The app password to the app on your shop.
api_version: The API version to use (e.g. 2023-01).
shop_url: The URL of your shop (e.g. https://my-shop.myshopify.com).
items_per_page: The max number of items to fetch per page. Defaults to 250.
start_date: Items updated on or after this date are imported. Defaults to 2000-01-01.
If end date is not provided, this is used as the initial value for incremental loading and after the initial run, only new data will be retrieved.
Accepts any `date`/`datetime` object or a date/datetime string in ISO 8601 format.
end_time: The end time of the range for which to load data.
Should be used together with `start_date` to limit the data to items updated in that time range.
If end time is not provided, the incremental loading will be enabled and after initial run, only new data will be retrieved
created_at_min: The minimum creation date of items to import. Items created on or after this date are loaded. Defaults to 2000-01-01.
order_status: The order status to filter by. Can be 'open', 'closed', 'cancelled', or 'any'. Defaults to 'any'.

Returns:
Iterable[DltResource]: A list of DltResource objects representing the data resources.
"""

# build client
client = ShopifyApi(shop_url, private_app_password, api_version)

start_date_obj = ensure_pendulum_datetime(start_date)
end_date_obj = ensure_pendulum_datetime(end_date) if end_date else None
created_at_min_obj = ensure_pendulum_datetime(created_at_min)

# define resources
@dlt.resource(primary_key="id", write_disposition="merge")
def products(
updated_at: dlt.sources.incremental[
pendulum.DateTime
] = dlt.sources.incremental(
"updated_at",
initial_value=start_date_obj,
end_value=end_date_obj,
allow_external_schedulers=True,
),
created_at_min: pendulum.DateTime = created_at_min_obj,
items_per_page: int = items_per_page,
) -> Iterable[TDataItem]:
"""
The resource for products on your shop, supports incremental loading and pagination.

Args:
updated_at: The saved state of the last 'updated_at' value.

Returns:
Iterable[TDataItem]: A generator of products.
"""
params = dict(
updated_at_min=updated_at.last_value.isoformat(),
limit=items_per_page,
order="updated_at asc",
created_at_min=created_at_min.isoformat(),
)
if updated_at.end_value is not None:
params["updated_at_max"] = updated_at.end_value.isoformat()
yield from client.get_pages("products", params)

@dlt.resource(primary_key="id", write_disposition="merge")
def orders(
updated_at: dlt.sources.incremental[
pendulum.DateTime
] = dlt.sources.incremental(
"updated_at",
initial_value=start_date_obj,
end_value=end_date_obj,
allow_external_schedulers=True,
),
created_at_min: pendulum.DateTime = created_at_min_obj,
items_per_page: int = items_per_page,
status: TOrderStatus = order_status,
) -> Iterable[TDataItem]:
"""
The resource for orders on your shop, supports incremental loading and pagination.

Args:
updated_at: The saved state of the last 'updated_at' value.

Returns:
Iterable[TDataItem]: A generator of orders.
"""
params = dict(
updated_at_min=updated_at.last_value.isoformat(),
limit=items_per_page,
status=status,
order="updated_at asc",
created_at_min=created_at_min.isoformat(),
)
if updated_at.end_value is not None:
params["updated_at_max"] = updated_at.end_value.isoformat()
yield from client.get_pages("orders", params)

@dlt.resource(primary_key="id", write_disposition="merge")
def customers(
updated_at: dlt.sources.incremental[
pendulum.DateTime
] = dlt.sources.incremental(
"updated_at",
initial_value=start_date_obj,
end_value=end_date_obj,
allow_external_schedulers=True,
),
created_at_min: pendulum.DateTime = created_at_min_obj,
items_per_page: int = items_per_page,
) -> Iterable[TDataItem]:
"""
The resource for customers on your shop, supports incremental loading and pagination.

Args:
updated_at: The saved state of the last 'updated_at' value.

Returns:
Iterable[TDataItem]: A generator of customers.
"""
params = dict(
updated_at_min=updated_at.last_value.isoformat(),
limit=items_per_page,
order="updated_at asc",
created_at_min=created_at_min.isoformat(),
)
if updated_at.end_value is not None:
params["updated_at_max"] = updated_at.end_value.isoformat()
yield from client.get_pages("customers", params)

return (products, orders, customers)


@dlt.resource
def shopify_partner_query(
query: str,
data_items_path: jp.TJsonPath,
pagination_cursor_path: jp.TJsonPath,
pagination_variable_name: str = "after",
variables: Optional[Dict[str, Any]] = None,
access_token: str = dlt.secrets.value,
organization_id: str = dlt.config.value,
api_version: str = DEFAULT_PARTNER_API_VERSION,
) -> Iterable[TDataItem]:
"""
Resource for getting paginated results from the Shopify Partner GraphQL API.

This resource will run the given GraphQL query and extract a list of data items from the result.
It will then run the query again with a pagination cursor to get the next page of results.

Example:
query = '''query Transactions($after: String) {
transactions(after: $after, first: 100) {
edges {
cursor
node {
id
}
}
}
}'''

partner_query_pages(
query,
data_items_path="data.transactions.edges[*].node",
pagination_cursor_path="data.transactions.edges[-1].cursor",
pagination_variable_name="after",
)

Args:
query: The GraphQL query to run.
data_items_path: The JSONPath to the data items in the query result. Should resolve to array items.
pagination_cursor_path: The JSONPath to the pagination cursor in the query result, will be piped to the next query via variables.
pagination_variable_name: The name of the variable to pass the pagination cursor to.
variables: Mapping of extra variables used in the query.
access_token: The Partner API Client access token, created in the Partner Dashboard.
organization_id: Your Organization ID, found in the Partner Dashboard.
api_version: The API version to use (e.g. 2024-01). Use `unstable` for the latest version.
Returns:
Iterable[TDataItem]: A generator of the query results.
"""
client = ShopifyPartnerApi(
access_token=access_token,
organization_id=organization_id,
api_version=api_version,
)

yield from client.get_graphql_pages(
query,
data_items_path=data_items_path,
pagination_cursor_path=pagination_cursor_path,
pagination_variable_name=pagination_variable_name,
variables=variables,
)
2 changes: 2 additions & 0 deletions dlt_hub/shopify_dlt/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
class ShopifyPartnerApiError(Exception):
pass
Loading