diff --git a/.gitignore b/.gitignore index 82f9275..fa27993 100644 --- a/.gitignore +++ b/.gitignore @@ -160,3 +160,8 @@ cython_debug/ # and can be added to the global gitignore or merged into this file. For a more nuclear # option (not recommended) you can uncomment the following to ignore the entire idea folder. #.idea/ + +# Data Load Hub +.sources +.config +.secrets \ No newline at end of file diff --git a/cz.yaml b/cz.yaml new file mode 100644 index 0000000..3207b2e --- /dev/null +++ b/cz.yaml @@ -0,0 +1,8 @@ +--- +commitizen: + major_version_zero: true + name: cz_conventional_commits + tag_format: $version + update_changelog_on_bump: true + version_provider: scm + version_scheme: semver diff --git a/dlt_hub/.dlt/.sources b/dlt_hub/.dlt/.sources new file mode 100644 index 0000000..b5ed48c --- /dev/null +++ b/dlt_hub/.dlt/.sources @@ -0,0 +1,28 @@ +engine_version: 1 +sources: + shopify_dlt: + is_dirty: false + last_commit_sha: e7c6683aeab6db7ebe1ce7d16bf5f6a4cff57325 + last_commit_timestamp: '2024-10-02T18:46:27+02:00' + files: + shopify_dlt/__init__.py: + commit_sha: e7c6683aeab6db7ebe1ce7d16bf5f6a4cff57325 + git_sha: 788d8fdd227d87bf4cdad3e258e4de1f390efc10 + sha3_256: 66fa8ce2165da8d01aac2dd3e4b76a908d8c0aba5849755a6eab0ccbf86b61fd + shopify_dlt/README.md: + commit_sha: e7c6683aeab6db7ebe1ce7d16bf5f6a4cff57325 + git_sha: 67e95e660cf9018cc3cac5defcc3a6d24590c8ab + sha3_256: 4b6c4ebe1bc0e7bca9506d7b2d5bc68bdfcc68f969d8371631e5db490ebb3abf + shopify_dlt/settings.py: + commit_sha: e7c6683aeab6db7ebe1ce7d16bf5f6a4cff57325 + git_sha: 0d4cef7e5723f7515c90098358f980d5b4ea4877 + sha3_256: a1b47666183b7d03050ee67103affb3f93584d02ba21b4a4f3bd6a61176b9dc9 + shopify_dlt/exceptions.py: + commit_sha: e7c6683aeab6db7ebe1ce7d16bf5f6a4cff57325 + git_sha: c3acfb9c6f4100d7b7a5cf34f1308a4b6d3efe0f + sha3_256: 2956c1577e5bcebb8f4ebed710e699571e0cad76b9b91f47d1dc7a07df1b5496 + shopify_dlt/helpers.py: + commit_sha: e7c6683aeab6db7ebe1ce7d16bf5f6a4cff57325 + git_sha: f4775eee97b448c81d1db739d54853ad4d07ff39 + sha3_256: b679034734eeb71651f849ad61938fd0d9351a0251305a03b9094c401a5517f6 + dlt_version_constraint: '>=0.5.1' diff --git a/dlt_hub/.dlt/config.toml b/dlt_hub/.dlt/config.toml new file mode 100644 index 0000000..e7d0bca --- /dev/null +++ b/dlt_hub/.dlt/config.toml @@ -0,0 +1,6 @@ +[sources.shopify_dlt] +shop_url = "https://sam-cook-ltd.myshopify.com/" # please set me up! +organization_id = 3920015 # please set me up! + +[runtime] +dlthub_telemetry = true diff --git a/dlt_hub/.gitignore b/dlt_hub/.gitignore new file mode 100644 index 0000000..3b28aa3 --- /dev/null +++ b/dlt_hub/.gitignore @@ -0,0 +1,10 @@ +# ignore secrets, virtual environments and typical python compilation artifacts +secrets.toml +# ignore basic python artifacts +.env +**/__pycache__/ +**/*.py[cod] +**/*$py.class +# ignore duckdb +*.duckdb +*.wal \ No newline at end of file diff --git a/dlt_hub/requirements.txt b/dlt_hub/requirements.txt new file mode 100644 index 0000000..6e0256c --- /dev/null +++ b/dlt_hub/requirements.txt @@ -0,0 +1 @@ +dlt[duckdb]>=0.5.1 \ No newline at end of file diff --git a/dlt_hub/shopify_dlt/README.md b/dlt_hub/shopify_dlt/README.md new file mode 100644 index 0000000..33d69a5 --- /dev/null +++ b/dlt_hub/shopify_dlt/README.md @@ -0,0 +1,61 @@ +# Shopify + +Shopify is an easy-to-use e-commerce solution that allows anyone to create and manage their own online store. This `dlt` verified source is designed to efficiently load data from multiple endpoints that are: + +| Endpoint | Description | +| --- | --- | +| customers | individuals or entities who have created accounts on a Shopify-powered online store | +| orders | transactions made by customers on an online store | +| products | the individual items or goods that are available for sale | + +## Initialize the pipeline + +```bash +dlt init shopify_dlt duckdb +``` + +Here, we chose Duckdb as the destination. Alternatively, you can also choose redshift, duckdb, or any of the otherĀ [destinations](https://dlthub.com/docs/dlt-ecosystem/destinations/). + +## Grab credentials + +To grab your Shopify credentials and initialise your verified source, see the [full documentation here](https://dlthub.com/docs/dlt-ecosystem/verified-sources/shopify). + +## Add credentials + +1. Open `.dlt/secrets.toml`. +2. Enter the API access token: + + ```toml + [sources.shopify] + private_app_password=" Please set me up !" #Admin API access token + ``` + +3. Enter credentials for your chosen destination as per the [docs.](https://dlthub.com/docs/dlt-ecosystem/destinations/) +4. Inside the `.dlt` folder, you'll find another file called `config.toml`, where you can store your Shopify URL. The `config.toml` looks like this: + ```toml + [sources.shopify_dlt] + shop_url = "Please set me up! + ``` + +5. Replace `shop_url` with the URL of your Shopify store. For example "https://shop-123.myshopify.com/". + +## Running the pipeline + +1. Before running the pipeline, ensure that you have installed all the necessary dependencies by running the command: + ```bash + pip install -r requirements.txt + ``` + +2. Now the pipeline can be run by using the command: + ```bash + python3 shopify_dlt_pipeline.py + ``` +3. To make sure that everything is loaded as expected, use the command: + ```bash + dlt pipeline show + ``` + + For example, the pipeline_name for the above pipeline is `shopify`, you may also use any custom name instead. + +šŸ’” To explore additional customizations for this pipeline, we recommend referring to the official DLT Shopify verified documentation. It provides comprehensive information and guidance on how to further customize and tailor the pipeline to suit your specific needs. You can find the DLT Shopify documentation in [Setup Guide: Shopify](https://dlthub.com/docs/dlt-ecosystem/verified-sources/shopify). + diff --git a/dlt_hub/shopify_dlt/__init__.py b/dlt_hub/shopify_dlt/__init__.py new file mode 100644 index 0000000..788d8fd --- /dev/null +++ b/dlt_hub/shopify_dlt/__init__.py @@ -0,0 +1,228 @@ +"""Fetches Shopify Orders and Products.""" + +from typing import Any, Dict, Iterator, Iterator, Optional, Iterable + +import dlt + +from dlt.sources import DltResource +from dlt.common.typing import TDataItem, TAnyDateTime +from dlt.common.time import ensure_pendulum_datetime +from dlt.common import pendulum +from dlt.common import jsonpath as jp + +from .settings import ( + DEFAULT_API_VERSION, + FIRST_DAY_OF_MILLENNIUM, + DEFAULT_ITEMS_PER_PAGE, + DEFAULT_PARTNER_API_VERSION, +) +from .helpers import ShopifyApi, TOrderStatus, ShopifyPartnerApi + + +@dlt.source(name="shopify") +def shopify_source( + private_app_password: str = dlt.secrets.value, + api_version: str = DEFAULT_API_VERSION, + shop_url: str = dlt.config.value, + start_date: TAnyDateTime = FIRST_DAY_OF_MILLENNIUM, + end_date: Optional[TAnyDateTime] = None, + created_at_min: TAnyDateTime = FIRST_DAY_OF_MILLENNIUM, + items_per_page: int = DEFAULT_ITEMS_PER_PAGE, + order_status: TOrderStatus = "any", +) -> Iterable[DltResource]: + """ + The source for the Shopify pipeline. Available resources are products, orders, and customers. + + `start_time` argument can be used on its own or together with `end_time`. When both are provided + data is limited to items updated in that time range. + The range is "half-open", meaning elements equal and newer than `start_time` and elements older than `end_time` are included. + All resources opt-in to use Airflow scheduler if run as Airflow task + + Args: + private_app_password: The app password to the app on your shop. + api_version: The API version to use (e.g. 2023-01). + shop_url: The URL of your shop (e.g. https://my-shop.myshopify.com). + items_per_page: The max number of items to fetch per page. Defaults to 250. + start_date: Items updated on or after this date are imported. Defaults to 2000-01-01. + If end date is not provided, this is used as the initial value for incremental loading and after the initial run, only new data will be retrieved. + Accepts any `date`/`datetime` object or a date/datetime string in ISO 8601 format. + end_time: The end time of the range for which to load data. + Should be used together with `start_date` to limit the data to items updated in that time range. + If end time is not provided, the incremental loading will be enabled and after initial run, only new data will be retrieved + created_at_min: The minimum creation date of items to import. Items created on or after this date are loaded. Defaults to 2000-01-01. + order_status: The order status to filter by. Can be 'open', 'closed', 'cancelled', or 'any'. Defaults to 'any'. + + Returns: + Iterable[DltResource]: A list of DltResource objects representing the data resources. + """ + + # build client + client = ShopifyApi(shop_url, private_app_password, api_version) + + start_date_obj = ensure_pendulum_datetime(start_date) + end_date_obj = ensure_pendulum_datetime(end_date) if end_date else None + created_at_min_obj = ensure_pendulum_datetime(created_at_min) + + # define resources + @dlt.resource(primary_key="id", write_disposition="merge") + def products( + updated_at: dlt.sources.incremental[ + pendulum.DateTime + ] = dlt.sources.incremental( + "updated_at", + initial_value=start_date_obj, + end_value=end_date_obj, + allow_external_schedulers=True, + ), + created_at_min: pendulum.DateTime = created_at_min_obj, + items_per_page: int = items_per_page, + ) -> Iterable[TDataItem]: + """ + The resource for products on your shop, supports incremental loading and pagination. + + Args: + updated_at: The saved state of the last 'updated_at' value. + + Returns: + Iterable[TDataItem]: A generator of products. + """ + params = dict( + updated_at_min=updated_at.last_value.isoformat(), + limit=items_per_page, + order="updated_at asc", + created_at_min=created_at_min.isoformat(), + ) + if updated_at.end_value is not None: + params["updated_at_max"] = updated_at.end_value.isoformat() + yield from client.get_pages("products", params) + + @dlt.resource(primary_key="id", write_disposition="merge") + def orders( + updated_at: dlt.sources.incremental[ + pendulum.DateTime + ] = dlt.sources.incremental( + "updated_at", + initial_value=start_date_obj, + end_value=end_date_obj, + allow_external_schedulers=True, + ), + created_at_min: pendulum.DateTime = created_at_min_obj, + items_per_page: int = items_per_page, + status: TOrderStatus = order_status, + ) -> Iterable[TDataItem]: + """ + The resource for orders on your shop, supports incremental loading and pagination. + + Args: + updated_at: The saved state of the last 'updated_at' value. + + Returns: + Iterable[TDataItem]: A generator of orders. + """ + params = dict( + updated_at_min=updated_at.last_value.isoformat(), + limit=items_per_page, + status=status, + order="updated_at asc", + created_at_min=created_at_min.isoformat(), + ) + if updated_at.end_value is not None: + params["updated_at_max"] = updated_at.end_value.isoformat() + yield from client.get_pages("orders", params) + + @dlt.resource(primary_key="id", write_disposition="merge") + def customers( + updated_at: dlt.sources.incremental[ + pendulum.DateTime + ] = dlt.sources.incremental( + "updated_at", + initial_value=start_date_obj, + end_value=end_date_obj, + allow_external_schedulers=True, + ), + created_at_min: pendulum.DateTime = created_at_min_obj, + items_per_page: int = items_per_page, + ) -> Iterable[TDataItem]: + """ + The resource for customers on your shop, supports incremental loading and pagination. + + Args: + updated_at: The saved state of the last 'updated_at' value. + + Returns: + Iterable[TDataItem]: A generator of customers. + """ + params = dict( + updated_at_min=updated_at.last_value.isoformat(), + limit=items_per_page, + order="updated_at asc", + created_at_min=created_at_min.isoformat(), + ) + if updated_at.end_value is not None: + params["updated_at_max"] = updated_at.end_value.isoformat() + yield from client.get_pages("customers", params) + + return (products, orders, customers) + + +@dlt.resource +def shopify_partner_query( + query: str, + data_items_path: jp.TJsonPath, + pagination_cursor_path: jp.TJsonPath, + pagination_variable_name: str = "after", + variables: Optional[Dict[str, Any]] = None, + access_token: str = dlt.secrets.value, + organization_id: str = dlt.config.value, + api_version: str = DEFAULT_PARTNER_API_VERSION, +) -> Iterable[TDataItem]: + """ + Resource for getting paginated results from the Shopify Partner GraphQL API. + + This resource will run the given GraphQL query and extract a list of data items from the result. + It will then run the query again with a pagination cursor to get the next page of results. + + Example: + query = '''query Transactions($after: String) { + transactions(after: $after, first: 100) { + edges { + cursor + node { + id + } + } + } + }''' + + partner_query_pages( + query, + data_items_path="data.transactions.edges[*].node", + pagination_cursor_path="data.transactions.edges[-1].cursor", + pagination_variable_name="after", + ) + + Args: + query: The GraphQL query to run. + data_items_path: The JSONPath to the data items in the query result. Should resolve to array items. + pagination_cursor_path: The JSONPath to the pagination cursor in the query result, will be piped to the next query via variables. + pagination_variable_name: The name of the variable to pass the pagination cursor to. + variables: Mapping of extra variables used in the query. + access_token: The Partner API Client access token, created in the Partner Dashboard. + organization_id: Your Organization ID, found in the Partner Dashboard. + api_version: The API version to use (e.g. 2024-01). Use `unstable` for the latest version. + Returns: + Iterable[TDataItem]: A generator of the query results. + """ + client = ShopifyPartnerApi( + access_token=access_token, + organization_id=organization_id, + api_version=api_version, + ) + + yield from client.get_graphql_pages( + query, + data_items_path=data_items_path, + pagination_cursor_path=pagination_cursor_path, + pagination_variable_name=pagination_variable_name, + variables=variables, + ) diff --git a/dlt_hub/shopify_dlt/exceptions.py b/dlt_hub/shopify_dlt/exceptions.py new file mode 100644 index 0000000..c3acfb9 --- /dev/null +++ b/dlt_hub/shopify_dlt/exceptions.py @@ -0,0 +1,2 @@ +class ShopifyPartnerApiError(Exception): + pass diff --git a/dlt_hub/shopify_dlt/helpers.py b/dlt_hub/shopify_dlt/helpers.py new file mode 100644 index 0000000..f4775ee --- /dev/null +++ b/dlt_hub/shopify_dlt/helpers.py @@ -0,0 +1,146 @@ +"""Shopify source helpers""" +from urllib.parse import urljoin + +from dlt.common.time import ensure_pendulum_datetime +from dlt.sources.helpers import requests +from dlt.common.typing import TDataItem, TDataItems, Dict, DictStrAny +from dlt.common import jsonpath +from typing import Any, Iterable, Optional, Literal + +from .settings import DEFAULT_API_VERSION, DEFAULT_PARTNER_API_VERSION +from .exceptions import ShopifyPartnerApiError + +TOrderStatus = Literal["open", "closed", "cancelled", "any"] + + +class ShopifyApi: + """ + A Shopify API client that can be used to get pages of data from Shopify. + """ + + def __init__( + self, + shop_url: str, + private_app_password: str, + api_version: str = DEFAULT_API_VERSION, + ) -> None: + """ + Args: + shop_url: The URL of your shop (e.g. https://my-shop.myshopify.com). + private_app_password: The private app password to the app on your shop. + api_version: The API version to use (e.g. 2023-01) + """ + self.shop_url = shop_url + self.private_app_password = private_app_password + self.api_version = api_version + + def get_pages( + self, resource: str, params: Optional[Dict[str, Any]] = None + ) -> Iterable[TDataItems]: + """Get all pages from shopify using requests. + Iterates through all pages and yield each page items. + + Args: + resource: The resource to get pages for (e.g. products, orders, customers). + params: Query params to include in the request. + + Yields: + List of data items from the page + """ + url = urljoin(self.shop_url, f"/admin/api/{self.api_version}/{resource}.json") + + headers = {"X-Shopify-Access-Token": self.private_app_password} + while url: + response = requests.get(url, params=params, headers=headers) + response.raise_for_status() + json = response.json() + # Get item list from the page + yield [self._convert_datetime_fields(item) for item in json[resource]] + url = response.links.get("next", {}).get("url") + # Query params are included in subsequent page URLs + params = None + + def _convert_datetime_fields(self, item: Dict[str, Any]) -> Dict[str, Any]: + """Convert timestamp fields in the item to pendulum datetime objects + + The item is modified in place. + + Args: + item: The item to convert + + Returns: + The same data item (for convenience) + """ + fields = ["created_at", "updated_at"] + for field in fields: + if field in item: + item[field] = ensure_pendulum_datetime(item[field]) + return item + + +class ShopifyPartnerApi: + """Client for Shopify Partner grapql API""" + + def __init__( + self, + access_token: str, + organization_id: str, + api_version: str = DEFAULT_PARTNER_API_VERSION, + ) -> None: + """ + Args: + access_token: The access token to use + organization_id: The organization id to query + api_version: The API version to use (e.g. 2023-01) + """ + self.access_token = access_token + self.organization_id = organization_id + self.api_version = api_version + + @property + def graphql_url(self) -> str: + return f"https://partners.shopify.com/{self.organization_id}/api/{self.api_version}/graphql.json" + + def run_graphql_query( + self, query: str, variables: Optional[DictStrAny] = None + ) -> DictStrAny: + """Run a graphql query against the Shopify Partner API + + Args: + query: The query to run + variables: The variables to include in the query + + Returns: + The response JSON + """ + headers = {"X-Shopify-Access-Token": self.access_token} + response = requests.post( + self.graphql_url, + json={"query": query, "variables": variables}, + headers=headers, + ) + data = response.json() + if data.get("errors"): + raise ShopifyPartnerApiError(response.text) + return data # type: ignore[no-any-return] + + def get_graphql_pages( + self, + query: str, + data_items_path: jsonpath.TJsonPath, + pagination_cursor_path: jsonpath.TJsonPath, + pagination_variable_name: str, + variables: Optional[DictStrAny] = None, + ) -> Iterable[TDataItems]: + variables = dict(variables or {}) + while True: + data = self.run_graphql_query(query, variables) + print(data) + data_items = jsonpath.find_values(data_items_path, data) + if not data_items: + break + yield data_items + cursors = jsonpath.find_values(pagination_cursor_path, data) + if not cursors: + break + variables[pagination_variable_name] = cursors[-1] diff --git a/dlt_hub/shopify_dlt/settings.py b/dlt_hub/shopify_dlt/settings.py new file mode 100644 index 0000000..0d4cef7 --- /dev/null +++ b/dlt_hub/shopify_dlt/settings.py @@ -0,0 +1,5 @@ +FIRST_DAY_OF_MILLENNIUM = "2000-01-01" +DEFAULT_API_VERSION = "2023-10" +DEFAULT_ITEMS_PER_PAGE = 250 + +DEFAULT_PARTNER_API_VERSION = "2024-01" diff --git a/dlt_hub/shopify_dlt_pipeline.py b/dlt_hub/shopify_dlt_pipeline.py new file mode 100644 index 0000000..cf8bc61 --- /dev/null +++ b/dlt_hub/shopify_dlt_pipeline.py @@ -0,0 +1,116 @@ +"""Pipeline to load shopify data into BigQuery. +""" + +import dlt +from dlt.common import pendulum +from typing import List, Tuple +from shopify_dlt import shopify_source, TAnyDateTime, shopify_partner_query + + +def load_all_resources(resources: List[str], start_date: TAnyDateTime) -> None: + """Execute a pipeline that will load the given Shopify resources incrementally beginning at the given start date. + Subsequent runs will load only items updated since the previous run. + """ + + pipeline = dlt.pipeline( + pipeline_name="shopify", destination='duckdb', dataset_name="shopify_data" + ) + load_info = pipeline.run( + shopify_source(start_date=start_date).with_resources(*resources), + ) + print(load_info) + + +def incremental_load_with_backloading() -> None: + """Load past orders from Shopify in chunks of 1 week each using the start_date and end_date parameters. + This can useful to reduce the potiential failure window when loading large amounts of historic data. + Chunks and incremental load can also be run in parallel to speed up the initial load. + """ + + pipeline = dlt.pipeline( + pipeline_name="shopify", destination='duckdb', dataset_name="shopify_data" + ) + + # Load all orders from 2023-01-01 to now + min_start_date = current_start_date = pendulum.datetime(2023, 1, 1) + max_end_date = pendulum.now() + + # Create a list of time ranges of 1 week each, we'll use this to load the data in chunks + ranges: List[Tuple[pendulum.DateTime, pendulum.DateTime]] = [] + while current_start_date < max_end_date: + end_date = min(current_start_date.add(weeks=1), max_end_date) + ranges.append((current_start_date, end_date)) + current_start_date = end_date + + # Run the pipeline for each time range created above + for start_date, end_date in ranges: + print(f"Load orders between {start_date} and {end_date}") + # Create the source with start and end date set according to the current time range to filter + # created_at_min lets us set a cutoff to exclude orders created before the initial date of (2023-01-01) + # even if they were updated after that date + data = shopify_source( + start_date=start_date, end_date=end_date, created_at_min=min_start_date + ).with_resources("orders") + + load_info = pipeline.run(data) + print(load_info) + + # Continue loading new data incrementally starting at the end of the last range + # created_at_min still filters out items created before 2023-01-01 + load_info = pipeline.run( + shopify_source( + start_date=max_end_date, created_at_min=min_start_date + ).with_resources("orders") + ) + print(load_info) + + +def load_partner_api_transactions() -> None: + """Load transactions from the Shopify Partner API. + The partner API uses GraphQL and this example loads all transactions from the beginning paginated. + + The `shopify_partner_query` resource can be used to run custom GraphQL queries to load paginated data. + """ + + pipeline = dlt.pipeline( + pipeline_name="shopify_partner", + destination='duckdb', + dataset_name="shopify_partner_data", + ) + + # Construct query to load transactions 100 per page, the `$after` variable is used to paginate + query = """query Transactions($after: String, first: 100) { + transactions(after: $after) { + edges { + cursor + node { + id + } + } + } + } + """ + + # Configure the resource with the query and json paths to extract the data and pagination cursor + resource = shopify_partner_query( + query, + # JSON path pointing to the data item in the results + data_items_path="data.transactions.edges[*].node", + # JSON path pointing to the highest page cursor in the results + pagination_cursor_path="data.transactions.edges[-1].cursor", + # The variable name used for pagination + pagination_variable_name="after", + ) + + load_info = pipeline.run(resource) + print(load_info) + + +if __name__ == "__main__": + # Add your desired resources to the list... + resources = ["products", "orders", "customers"] + load_all_resources(resources, start_date="2000-01-01") + + # incremental_load_with_backloading() + + # load_partner_api_transactions()