diff --git a/.gitignore b/.gitignore index e888a89..a863140 100644 --- a/.gitignore +++ b/.gitignore @@ -4,3 +4,4 @@ build/ *.egg-info/ __pycache__/ .pyc +.env diff --git a/dist/neonutilities-1.2.2-py3-none-any.whl b/dist/neonutilities-1.2.2-py3-none-any.whl deleted file mode 100644 index ac14149..0000000 Binary files a/dist/neonutilities-1.2.2-py3-none-any.whl and /dev/null differ diff --git a/dist/neonutilities-1.2.2.tar.gz b/dist/neonutilities-1.2.2.tar.gz deleted file mode 100644 index 8b68501..0000000 Binary files a/dist/neonutilities-1.2.2.tar.gz and /dev/null differ diff --git a/dist/neonutilities-2.0.0-py3-none-any.whl b/dist/neonutilities-2.0.0-py3-none-any.whl new file mode 100644 index 0000000..d7ef6af Binary files /dev/null and b/dist/neonutilities-2.0.0-py3-none-any.whl differ diff --git a/dist/neonutilities-2.0.0.tar.gz b/dist/neonutilities-2.0.0.tar.gz new file mode 100644 index 0000000..d1bc98f Binary files /dev/null and b/dist/neonutilities-2.0.0.tar.gz differ diff --git a/pyproject.toml b/pyproject.toml index 26e5b3a..fcc3056 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "neonutilities" -version = "1.2.2" +version = "2.0.0" authors = [ {name="Claire Lunch", email="clunch@battelleecology.org"}, {name="Bridget Hass", email="bhass@battelleecology.org"}, @@ -15,6 +15,7 @@ description="A package for accessing and wrangling data generated and published readme = "README.md" requires-python = ">=3.10" dependencies = [ + "duckdb", "importlib-resources", "pandas", "parameterized", diff --git a/requirements.txt b/requirements.txt index b03ce9a..2f4c4dd 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,4 @@ +duckdb>=1.5.0 importlib_resources>=6.1.1 pandas>=2.1.4 parameterized>=0.9.0 diff --git a/src/neonutilities/__resources__/shared_aquatic.csv b/src/neonutilities/__resources__/shared_aquatic.csv index 012c8dc..04a742d 100644 --- a/src/neonutilities/__resources__/shared_aquatic.csv +++ b/src/neonutilities/__resources__/shared_aquatic.csv @@ -1,37 +1,61 @@ site,towerSite,product +HOPB,HARV,DP1.00044.001 +HOPB,HARV,DP1.00045.001 HOPB,HARV,DP1.00006.001 HOPB,HARV,DP1.00038.001 HOPB,HARV,DP1.00013.001 +POSE,SCBI,DP1.00044.001 +POSE,SCBI,DP1.00045.001 POSE,SCBI,DP1.00006.001 POSE,SCBI,DP1.00038.001 POSE,SCBI,DP1.00013.001 +LEWI,BLAN,DP1.00044.001 +LEWI,BLAN,DP1.00045.001 LEWI,BLAN,DP1.00006.001 LEWI,BLAN,DP1.00038.001 LEWI,BLAN,DP1.00013.001 +BARC,OSBS,DP1.00044.001 +BARC,OSBS,DP1.00045.001 BARC,OSBS,DP1.00006.001 BARC,OSBS,DP1.00038.001 BARC,OSBS,DP1.00013.001 +SUGG,OSBS,DP1.00044.001 +SUGG,OSBS,DP1.00045.001 SUGG,OSBS,DP1.00006.001 SUGG,OSBS,DP1.00038.001 SUGG,OSBS,DP1.00013.001 +FLNT,JERC,DP1.00044.001 +FLNT,JERC,DP1.00045.001 FLNT,JERC,DP1.00006.001 FLNT,JERC,DP1.00038.001 FLNT,JERC,DP1.00013.001 +CRAM,UNDE,DP1.00044.001 +CRAM,UNDE,DP1.00045.001 CRAM,UNDE,DP1.00006.001 CRAM,UNDE,DP1.00038.001 CRAM,UNDE,DP1.00013.001 +LIRO,UNDE,DP1.00044.001 +LIRO,UNDE,DP1.00045.001 LIRO,UNDE,DP1.00006.001 LIRO,UNDE,DP1.00038.001 LIRO,UNDE,DP1.00013.001 +KING,KONZ,DP1.00044.001 +KING,KONZ,DP1.00045.001 KING,KONZ,DP1.00006.001 KING,KONZ,DP1.00038.001 KING,KONZ,DP1.00013.001 +WALK,ORNL,DP1.00044.001 +WALK,ORNL,DP1.00045.001 WALK,ORNL,DP1.00006.001 WALK,ORNL,DP1.00038.001 WALK,ORNL,DP1.00013.001 +LECO,GRSM,DP1.00044.001 +LECO,GRSM,DP1.00045.001 LECO,GRSM,DP1.00006.001 LECO,GRSM,DP1.00038.001 LECO,GRSM,DP1.00013.001 +MAYF,TALL,DP1.00044.001 +MAYF,TALL,DP1.00045.001 MAYF,TALL,DP1.00006.001 MAYF,TALL,DP1.00038.001 MAYF,TALL,DP1.00013.001 @@ -41,6 +65,8 @@ BLWA,DELA,DP1.00024.001 BLWA,DELA,DP1.00098.001 BLWA,DELA,DP1.00002.001 BLWA,DELA,DP1.00023.001 +BLWA,DELA,DP1.00044.001 +BLWA,DELA,DP1.00045.001 BLWA,DELA,DP1.00006.001 BLWA,DELA,DP1.00038.001 BLWA,DELA,DP1.00013.001 @@ -50,30 +76,48 @@ TOMB,LENO,DP1.00024.001 TOMB,LENO,DP1.00098.001 TOMB,LENO,DP1.00002.001 TOMB,LENO,DP1.00023.001 +TOMB,LENO,DP1.00044.001 +TOMB,LENO,DP1.00045.001 TOMB,LENO,DP1.00006.001 TOMB,LENO,DP1.00038.001 TOMB,LENO,DP1.00013.001 +PRPO,WOOD,DP1.00044.001 +PRPO,WOOD,DP1.00045.001 PRPO,WOOD,DP1.00006.001 PRPO,WOOD,DP1.00038.001 PRPO,WOOD,DP1.00013.001 +PRLA,DCFS,DP1.00044.001 +PRLA,DCFS,DP1.00045.001 PRLA,DCFS,DP1.00006.001 PRLA,DCFS,DP1.00038.001 PRLA,DCFS,DP1.00013.001 +BLDE,YELL,DP1.00044.001 +BLDE,YELL,DP1.00045.001 BLDE,YELL,DP1.00006.001 BLDE,YELL,DP1.00038.001 BLDE,YELL,DP1.00013.001 +COMO,NIWO,DP1.00044.001 +COMO,NIWO,DP1.00045.001 COMO,NIWO,DP1.00006.001 COMO,NIWO,DP1.00038.001 COMO,NIWO,DP1.00013.001 +MART,WREF,DP1.00044.001 +MART,WREF,DP1.00045.001 MART,WREF,DP1.00006.001 MART,WREF,DP1.00038.001 MART,WREF,DP1.00013.001 +TECR,TEAK,DP1.00044.001 +TECR,TEAK,DP1.00045.001 TECR,TEAK,DP1.00006.001 TECR,TEAK,DP1.00038.001 TECR,TEAK,DP1.00013.001 +OKSR,TOOL,DP1.00044.001 +OKSR,TOOL,DP1.00045.001 OKSR,TOOL,DP1.00006.001 OKSR,TOOL,DP1.00038.001 OKSR,TOOL,DP1.00013.001 +CARI,BONA,DP1.00044.001 +CARI,BONA,DP1.00045.001 CARI,BONA,DP1.00006.001 CARI,BONA,DP1.00038.001 CARI,BONA,DP1.00013.001 diff --git a/src/neonutilities/aop_download.py b/src/neonutilities/aop_download.py index fa8283d..ab5d69c 100644 --- a/src/neonutilities/aop_download.py +++ b/src/neonutilities/aop_download.py @@ -31,14 +31,14 @@ import re # import time -from time import sleep from tqdm import tqdm # local imports from . import __resources__ from .helper_mods.api_helpers import get_api -from .helper_mods.api_helpers import token_check +from .helper_mods.api_helpers import token_check, auth_check from .helper_mods.api_helpers import download_file, calculate_crc32c +from .helper_mods.api_helpers import baseurl from .helper_mods.metadata_helpers import convert_byte_size from .get_issue_log import get_issue_log from .citation import get_citation @@ -221,7 +221,7 @@ def get_neon_sites(): def get_data_product_name(dpid): - dpid_api_response = get_api(f"https://data.neonscience.org/api/v0/products/{dpid}") + dpid_api_response = get_api(f"{baseurl}products/{dpid}") product_name = dpid_api_response.json()["data"]["productName"] return product_name @@ -307,7 +307,7 @@ def get_aop_dpids(): >>> active_dpids = get_active_dpids() # This will return a list of all active NEON data product IDs. """ - response = get_api("https://data.neonscience.org/api/v0/products") + response = get_api(baseurl + "products") response_dict = response.json() # all_neon_dpids = [item["productCode"] for item in response_dict["data"]] @@ -522,7 +522,7 @@ def list_available_dates(dpid, site): >>> list_available_dates('DP1.10098.001','HOPB') ValueError: There are no data available for the data product DP1.10098.001 at the site HOPB. """ - product_url = "https://data.neonscience.org/api/v0/products/" + dpid + product_url = baseurl + "products/" + dpid response = get_api(api_url=product_url) # add input for token? # raise value error and print message if dpid isn't formatted as expected @@ -692,7 +692,7 @@ def get_aop_tile_extents(dpid, site, year, token=None): token = None # query the products endpoint for the product requested - response = get_api("https://data.neonscience.org/api/v0/products/" + dpid, token) + response = get_api(baseurl + "products/" + dpid, token) # exit function if response is None (eg. if no internet connection) if response is None: @@ -880,8 +880,13 @@ def by_file_aop( if token is not None: token = token_check(token) + # check for access to the data query endpoint to test whether token is valid + authcheck = auth_check(token=token) + if authcheck is None: + pass + # query the products endpoint for the product requested - response = get_api("https://data.neonscience.org/api/v0/products/" + dpid, token) + response = get_api(baseurl + "products/" + dpid, token) # exit function if response is None (eg. if no internet connection) if response is None: @@ -1170,9 +1175,11 @@ def by_file_aop( logging.info("Skipped overwriting files with mismatched checksums.") else: + tstart = datetime.now() for file in tqdm(files): download_file( - url=file, savepath=download_path, chunk_size=chunk_size, token=token + url=file, savepath=download_path, chunk_size=chunk_size, + token=token, tstart=tstart ) # download issue log table @@ -1404,8 +1411,13 @@ def by_tile_aop( if token is not None: token = token_check(token) + # check for access to the data query endpoint to test whether token is valid + authcheck = auth_check(token=token) + if authcheck is None: + pass + # query the products endpoint for the product requested - response = get_api("https://data.neonscience.org/api/v0/products/" + dpid, token) + response = get_api(baseurl + "products/" + dpid, token) # exit function if response is None (eg. if no internet connection) if response is None: @@ -1685,6 +1697,7 @@ def get_buffer_coords(easting, northing, buffer): logging.info(f" {f}") # Download files that do not exist locally + tstart = datetime.now() for _, row in tqdm( files_to_download.iterrows(), total=len(files_to_download) ): @@ -1693,6 +1706,7 @@ def get_buffer_coords(easting, northing, buffer): savepath=download_path, chunk_size=chunk_size, token=token, + tstart=tstart ) if not files_to_skip.empty: @@ -1750,6 +1764,7 @@ def get_buffer_coords(easting, northing, buffer): if response.lower() == "y": logging.info("Overwriting these files with the latest available data.") + tstart = datetime.now() for idx, row in tqdm( mismatched_files.iterrows(), total=len(mismatched_files) ): @@ -1758,13 +1773,16 @@ def get_buffer_coords(easting, northing, buffer): savepath=download_path, chunk_size=chunk_size, token=token, + tstart=tstart ) else: logging.info("Skipped overwriting files with mismatched checksums.") else: # if skip_if_exists=False (default behavior) + tstart = datetime.now() for file in tqdm(files): download_file( - url=file, savepath=download_path, chunk_size=chunk_size, token=token + url=file, savepath=download_path, chunk_size=chunk_size, token=token, + tstart=tstart ) # download issue log table diff --git a/src/neonutilities/citation.py b/src/neonutilities/citation.py index 080aeaa..ea4e56d 100644 --- a/src/neonutilities/citation.py +++ b/src/neonutilities/citation.py @@ -2,6 +2,7 @@ # -*- coding: utf-8 -*- from datetime import datetime import requests +import logging def get_citation(dpid, release): @@ -55,7 +56,7 @@ def get_citation(dpid, release): relinfo = next((i for i in rels if i["release"] == release), None) if relinfo is None: - print("There are no data with dpid=" + dpid + " and release=" + release) + logging.info("No citation found for dpid=" + dpid + " and release=" + release) return relinfo else: diff --git a/src/neonutilities/get_issue_log.py b/src/neonutilities/get_issue_log.py index 581c4a4..10504ab 100644 --- a/src/neonutilities/get_issue_log.py +++ b/src/neonutilities/get_issue_log.py @@ -17,6 +17,7 @@ import logging import pandas as pd from .helper_mods.api_helpers import get_api +from .helper_mods.api_helpers import baseurl logging.basicConfig(level=logging.INFO, format="%(message)s") @@ -58,7 +59,7 @@ def get_change_log_df(dpid, token=None): 'issue', 'resolution' """ req = get_api( - api_url=f"https://data.neonscience.org/api/v0/products/{dpid}", token=token + api_url=f"{baseurl}products/{dpid}", token=token ) if req is None: logging.info(f"Error in metadata retrieval for {dpid}. Issue log not found.") diff --git a/src/neonutilities/helper_mods/api_helpers.py b/src/neonutilities/helper_mods/api_helpers.py index 0a90f08..86a1f13 100644 --- a/src/neonutilities/helper_mods/api_helpers.py +++ b/src/neonutilities/helper_mods/api_helpers.py @@ -15,6 +15,7 @@ import warnings import pandas as pd from tqdm import tqdm +from datetime import datetime from .metadata_helpers import get_recent logging.basicConfig(level=logging.INFO, format="%(message)s") @@ -26,6 +27,29 @@ usera = f"neonutilities/{vers} Python/{plat} {osplat}" +# Set the base URL for the NEON API +baseurl = os.environ.get("NEON_API_URL") +if baseurl is None: + baseurl = "https://data.neonscience.org/api/v0/" + +def print_base_url(): + """ + Prints the base URL for the NEON API. This is useful for debugging and ensuring the correct API endpoint is being accessed. + + Parameters + -------- + None + + Return + -------- + The currently set base URL for the NEON API. + + Created on June 22 2026 + + @author: Claire Lunch + """ + print(baseurl) + def token_date(token, rval="string"): """ @@ -90,11 +114,50 @@ def token_check(token): return(token) else: if time.time() > expdate: - logging.info("API token has expired. Function will proceed using public access rate. Go to your NEON user account to generate a new token.") + logging.info("API token has expired. Go to your NEON user account to generate a new token.") token = None return(token) +def auth_check(token): + """ + + Check whether the user can connect to NEON data files. + + Parameters + -------- + token: User specific API token (generated within data.neonscience.org user accounts). + + Return + -------- + Silent if authentication succeeds. If authentication fails and no token was provided, prompts the user to generate a token. + + Created on May 18 2026 + + @author: Claire Lunch + """ + + # check for access to the data query endpoint to test whether token is valid + # this query should have data, but would be ok if it didn't - still get status code 200 if auth is good + authcheck = requests.get( + url=baseurl + "data/query?productCode=DP1.10003.001&siteCode=BART&startDateMonth=2023-01&endDateMonth=2023-12&release=RELEASE-2025", + headers={ + "X-API-TOKEN": token, + "accept": "application/json", + "User-Agent": usera, + }) + if authcheck.status_code != 200: + if token is None: + raise ConnectionError( + "API token was not provided, was invalid, or has expired. As of June 2026, NEON requires an API token for data download. To get a token, go to your user account at neonscience.org" + ) + else: + logging.info( + "There was a problem connecting to the NEON API. Code will attempt to proceed but data access may fail." + ) + return None + + def get_api(api_url, token=None): """ @@ -126,7 +189,7 @@ def get_status_code_meaning(status_code): # Check internet connection try: check_connection = requests.get( - "https://data.neonscience.org/api/v0/products/DP1.00001.001", headers={"User-Agent": usera} + baseurl + "products/DP1.00001.001", headers={"User-Agent": usera} ) if check_connection.status_code != 200: status_code = check_connection.status_code @@ -226,7 +289,7 @@ def get_status_code_meaning(status_code): # Check internet connection try: check_connection = requests.head( - "https://data.neonscience.org/api/v0/products/DP1.00001.001", headers={"User-Agent": usera} + baseurl + "products/DP1.00001.001", headers={"User-Agent": usera} ) if check_connection.status_code != 200: status_code = check_connection.status_code @@ -385,7 +448,8 @@ def get_zip_urls( # get file sizes szr = re.compile(package) - flszs = [siz["size"] for siz in m_di["data"]["files"] if szr.search(siz["url"])] + fltmp = [flt for flt in m_di["data"]["files"] if "size" in flt.keys()] + flszs = [siz["size"] for siz in fltmp if szr.search(siz["url"])] flszi = sum(flszs) # return url, file name, file size, and release @@ -632,6 +696,8 @@ def download_urls(url_set, outpath, token=None, progress=True): if progress: logging.info("Downloading files") + failset = [] + for i in tqdm(range(0, len(url_set["z"])), disable=not progress): if len(outpath + url_set["flnm"][i]) > 260 and platform.system() == "Windows": raise OSError( @@ -691,11 +757,12 @@ def download_urls(url_set, outpath, token=None, progress=True): except Exception: logging.info( - f"File {url_set['flnm'][i]} could not be downloaded and was skipped. If this issue persists, check your network connection and check the NEON Data Portal for outage alerts." + f"File {url_set['flnm'][i]} could not be downloaded and was skipped." ) + failset.append(url_set['flnm'][i]) pass - return None + return failset # def calculate_crc32c(filename): @@ -737,7 +804,7 @@ def calculate_crc32c(filename, chunk_size=1024 * 1024): return crc32c.digest().hex().zfill(8) -def download_file(url, savepath, chunk_size=1024, token=None): +def download_file(url, savepath, chunk_size=1024, token=None, tstart=None): """ This function downloads a single file from a Google Cloud Storage URL to a user-specified directory. @@ -755,6 +822,9 @@ def download_file(url, savepath, chunk_size=1024, token=None): token: str, optional User-specific API token generated within neon.datascience user accounts. If provided, it will be used for authentication. + tsart: datetime, optional + The time data download started. + Returns -------- None @@ -845,9 +915,15 @@ def download_file(url, savepath, chunk_size=1024, token=None): logging.info( f"File {os.path.basename(url)} could not be downloaded and was skipped or partially downloaded. If this issue persists, check your network connection and check the NEON Data Portal for outage alerts." ) - # print(e) + if tstart is not None: + tdur = datetime.now() - tstart + if tdur.seconds > 604800: + logging.info( + "File download URLs expire after one week; if your download has been running for more than a week, refresh the URLs for the remaining files. But also consider a different download strategy, and contact NEON if you are regularly downloading datasets that take many days to complete." + ) + # print(e) pass - + return diff --git a/src/neonutilities/read_table_neon.py b/src/neonutilities/read_table_neon.py index f21f734..f727e5f 100644 --- a/src/neonutilities/read_table_neon.py +++ b/src/neonutilities/read_table_neon.py @@ -1,11 +1,12 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- +import logging import pandas as pd import numpy as np import pyarrow as pa +import re from pyarrow import dataset -import logging logging.basicConfig(level=logging.INFO, format="%(message)s") @@ -73,6 +74,94 @@ def get_variables(v): return vschema +def get_variables_duck(v): + """ + + Get correct data types for each field in a data table + + Parameters + -------- + v: A pandas table containing variable definitions + + Return + -------- + A duckdb schema for data types based on the variables file + + Created on Apr 27 2026 + + @author: Claire Lunch + """ + + # function assumes variables are loaded as a pandas data frame. + + # create duckdb schema by translating NEON data types to SQL types + vschema = dict() + timetypes = [] + for i in v.index: + nm = v.fieldName[i] + typ = "VARCHAR" + if v.dataType[i] == "real": + typ = "DOUBLE" + if v.dataType[i] in ["integer", "unsigned integer", "signed integer"]: + typ = "BIGINT" + if v.dataType[i] == "dateTime": + if v.pubFormat[i] in [ + "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'", + "yyyy-MM-dd'T'HH:mm:ss'Z'(floor)", + "yyyy-MM-dd'T'HH:mm:ss'Z'", + "yyyy-MM-dd'T'HH:mm:ss'Z'(round)" + ]: + typ = "TIMESTAMPTZ" + timetypes.append(v.pubFormat[i]) + else: + if v.pubFormat[i] in [ + "yyyy-MM-dd'T'HH:mm'Z'(floor)", + "yyyy-MM-dd'T'HH:mm'Z'", + "yyyy-MM-dd'T'HH:mm'Z'(round)", + "yyyy-MM-dd'T'HH'Z'(floor)", + "yyyy-MM-dd'T'HH'Z'", + "yyyy-MM-dd'T'HH'Z'(round)" + ]: + typ = "VARCHAR" + logging.info( + f"Field {nm} is in format {v.pubFormat[i]}, which duckdb currently does not parse correctly. Data will be read as string." + ) + + else: + if v.pubFormat[i] in ["yyyy-MM-dd(floor)", "yyyy-MM-dd"]: + typ = "DATE" + else: + if v.pubFormat[i] in ["yyyy(floor)", "yyyy(round)"]: + typ = "BIGINT" + else: + typ = "VARCHAR" + + vschema[nm] = typ + + # set timestamp format + ttyps = set(timetypes) + tformat = "VARCHAR" + if len(ttyps)==0: + tform = "yyyy-MM-dd'T'HH:mm'Z'" + else: + tset = [re.sub(pattern="[(]floor[)]|[(]round[)]", repl="", string=t) for t in ttyps] + tform = [f for f in tset if len(f)==max([len(t) for t in tset])][0] + tchar = [f for f in tset if len(f)!=max([len(t) for t in tset])] + for i in vschema.keys(): + if vschema[i] in tchar: + vschema[i] = "VARCHAR" + if tform == "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'": + tformat = "'%Y-%m-%dT%H:%M:%S.%gZ'" + if tform == "yyyy-MM-dd'T'HH:mm:ss'Z'": + tformat = "'%Y-%m-%dT%H:%M:%SZ'" + if tform == "yyyy-MM-dd'T'HH:mm'Z'": + tformat = "'%Y-%m-%dT%H:%MZ'" + if tform == "yyyy-MM-dd'T'HH'Z'": + tformat = "'%Y-%m-%dT%HZ'" + + return [vschema, tformat] + + def read_table_neon(data_file, var_file): """ diff --git a/src/neonutilities/tabular_download.py b/src/neonutilities/tabular_download.py index f03da2a..f7967d6 100644 --- a/src/neonutilities/tabular_download.py +++ b/src/neonutilities/tabular_download.py @@ -7,10 +7,11 @@ import pandas as pd import logging from .helper_mods.api_helpers import get_api -from .helper_mods.api_helpers import token_check +from .helper_mods.api_helpers import token_check, auth_check from .helper_mods.api_helpers import get_zip_urls from .helper_mods.api_helpers import get_tab_urls from .helper_mods.api_helpers import download_urls +from .helper_mods.api_helpers import baseurl from .helper_mods.metadata_helpers import convert_byte_size from . import __resources__ @@ -115,7 +116,8 @@ def query_files( # construct full query url and run query qurl = ( - "https://data.neonscience.org/api/v0/data/query?productCode=" + baseurl + + "data/query?productCode=" + dpid + sitesurl + dateurl @@ -141,13 +143,7 @@ def query_files( fdict = packdict[j].get("files") for k in range(0, len(fdict)): flurl.append(fdict[k].get("url")) - releasedict[ - re.sub( - pattern="https://storage.googleapis.com/", - repl="", - string=fdict[k].get("url"), - ) - ] = rdict + releasedict[fdict[k].get("url")] = rdict # if timeindex or tabl are set, subset the list of files if timeindex == "all" and tabl == "all": @@ -276,7 +272,7 @@ def zips_by_product( ) # error message if package is not basic or expanded - if not package in ["basic", "expanded"]: + if package not in ["basic", "expanded"]: raise ValueError( f"{package} is not a valid NEON download package name. Package must be basic or expanded" ) @@ -376,7 +372,7 @@ def zips_by_product( siter = [] indx = 0 for s in site: - if s in shared_aquatic_df.index: + if s in shared_aquatic_df.index and dpid in shared_aquatic_df["product"].values: if s in ["BLWA"] and release not in ["RELEASE-2021","RELEASE-2022","RELEASE-2023","RELEASE-2024","RELEASE-2025"]: indx = indx + 1 if "DELA" not in site: @@ -440,15 +436,20 @@ def zips_by_product( if token is not None: token = token_check(token) + # check for access to the data query endpoint to test whether token is valid + authcheck = auth_check(token=token) + if authcheck is None: + pass + # end of error and exception handling, start the work # query the /products endpoint for the product requested if release == "current" or release == "PROVISIONAL": prodreq = get_api( - api_url="https://data.neonscience.org/api/v0/products/" + dpid, token=token + api_url=baseurl + "products/" + dpid, token=token ) else: prodreq = get_api( - api_url="https://data.neonscience.org/api/v0/products/" + api_url=baseurl + "products/" + dpid + "?release=" + release, @@ -464,7 +465,7 @@ def zips_by_product( else: if release != "current" and release != "PROVISIONAL": rels = get_api( - api_url="https://data.neonscience.org/api/v0/releases/", token=token + api_url=baseurl + "releases/", token=token ) if rels is None: raise ConnectionError( @@ -632,6 +633,42 @@ def zips_by_product( os.makedirs(outpath + f) # download data from each url - download_urls(url_set=durls, outpath=outpath, token=token, progress=progress) + ds = download_urls(url_set=durls, outpath=outpath, token=token, progress=progress) + + # if all downloads failed, re-generate urls once + if len(ds) > 0: + logging.info( + "Some files failed to download. Urls may have expired; re-generating." + ) + + if timeindex == "all" and tabl == "all": + durls = get_zip_urls( + url_set=end_urls, + package=package, + release=release, + include_provisional=include_provisional, + token=token, + progress=progress, + ) + else: + durls = get_tab_urls( + url_set=end_urls, + package=package, + release=release, + include_provisional=include_provisional, + timeindex=timeindex, + tabl=tabl, + token=token, + progress=progress, + ) + + durlsub = [fl for fl in durls if fl["flnm"] in ds] + + ds = download_urls(url_set=durlsub, outpath=outpath, token=token, progress=progress) + if len(ds) > 0: + logging.info( + "Some files failed to download on second attempt, re-generating urls did not fix the issue. Check for connection issues or NEON API outages." + ) + return None return None diff --git a/src/neonutilities/unzip_and_stack.py b/src/neonutilities/unzip_and_stack.py index 8f933f8..cddb4a8 100644 --- a/src/neonutilities/unzip_and_stack.py +++ b/src/neonutilities/unzip_and_stack.py @@ -6,6 +6,7 @@ import pyarrow as pa from pyarrow import dataset from pyarrow import fs +import duckdb import zipfile import platform import glob @@ -18,7 +19,8 @@ from .citation import get_citation from .helper_mods.api_helpers import readme_url from .helper_mods.api_helpers import get_api -from .read_table_neon import get_variables, cast_table_neon +from .helper_mods.api_helpers import baseurl +from .read_table_neon import get_variables, get_variables_duck, cast_table_neon from . import __resources__ import logging @@ -305,8 +307,8 @@ def table_type_formats(flname): @author: Claire Lunch """ - flen = len(flname) - if flen <= 6: + dpr = re.compile("^D[0-9]{2}$") + if not any([f for f in flname if dpr.search(f)]): return "lab" else: ymr = re.compile("[0-9]{4}-[0-9]{2}") @@ -340,7 +342,7 @@ def find_table_types(datatables): for i in range(0, len(splitnames)): for j in range(2, len(splitnames[i])): s = splitnames[i][j] - if not s == "sensor_positions" and not s == "science_review_flags": + if not s == "sensor_positions" and not s == "science_review_flags" and len(s) <= 50: if "_" not in s: continue else: @@ -483,11 +485,14 @@ def align_sp_cols(sptab): "referenceElevation": "locationReferenceElevation", } for k in list(oldcols.keys()): - if all(sptab[k].isna()): - sptab.drop(columns=k, inplace=True) + if k not in sptab.columns: + continue else: - sptab[oldcols[k]] = sptab[oldcols[k]].fillna(sptab[k]) - sptab.drop(columns=k, inplace=True) + if all(sptab[k].isna()): + sptab.drop(columns=k, inplace=True) + else: + sptab[oldcols[k]] = sptab[k].fillna(sptab[k]) + sptab.drop(columns=k, inplace=True) return sptab @@ -742,6 +747,145 @@ def format_readme(readmetab, tables): return rd +def stack_files_duck(dpid, + j, + variables, + progress, + sensor_positions_internal_variables, + package, + filepaths, + table_types, + datasetq + ): + """ + + Join data files by table using duckdb for stacking + + Parameters + -------- + dpid: Data product ID of product to stack. + j: Table name, passed from loop in stack_data_files_parallel() + variables: Variables table for the data product. + progress: Should a progress bar be displayed? + package: basic or expanded data package + sensor_positions_internal_variables: Variable names and types to use for sensor positions files + filepaths: List of paths to files to be stacked + table_types: Type for each table in the set to be stacked + datasetq: Return a duckdb dataset for a single table? + + Return + -------- + A single data table based on the input data and criteria. + + Created on Apr 27 2026 + + @author: Claire Lunch + """ + + stringset = False + + # create schema from variables file, for only this table and package + vtab = variables[variables["table"] == j] + if len(vtab) == 0: + vtab = variables[variables["table"] == j + "_pub"] + if j == "sensor_positions": + vtab = sensor_positions_internal_variables + + if package == "basic": + tablepkgvar = vtab[vtab["downloadPkg"]=="basic"] + else: + tablepkgvar = vtab + + if len(tablepkgvar) == 0: + # set to string if variables file can't be found + tableschema = None + else: + tableschema = get_variables_duck(tablepkgvar) + + # subset the list of files to the relevant table + tabler = re.compile("[.]" + j + "[.]|[.]" + j + "_pub[.]") + tablepaths = [f for f in filepaths if tabler.search(f)] + + # subset the list of files for lab-specific tables: + # get the most recent file from each lab + if table_types[j] == "lab": + labs = find_lab_names(tablepaths) + labrecent = list() + for k in labs: + labr = re.compile(k) + labpaths = [f for f in tablepaths if labr.search(f)] + labrecent.append(get_recent_publication(labpaths)[0]) + tablepaths = labrecent + + # subset the list of files for site-all tables: + # get the most recent file from each site + if table_types[j] == "site-all": + sites = find_sites(tablepaths) + siterecent = list() + for k in sites: + sr = re.compile(k) + sitepaths = [f for f in tablepaths if sr.search(f)] + siterecent.append(get_recent_publication(sitepaths)[0]) + tablepaths = siterecent + + # subset the schema for sensor positions files + # using read_csv() instead of sniff_csv() because the output is better and these files are tiny anyway + if j == "sensor_positions": + sniffsp = duckdb.sql(f"SELECT * FROM read_csv({[tablepaths[0]]}, header=true)") + spcols = sniffsp.columns + spschema = tableschema[0] + spsub = {k: spschema[k] for k in spcols} + tableschema[0] = spsub + + # read data and append file names + # try: stacking with schema, then inferring, then setting all fields to string + duckdb.sql("SET TimeZone = 'UTC'") + try: + dat = duckdb.sql(f"SELECT * FROM read_csv({tablepaths}, columns={tableschema[0]}, header=true, filename=true, timestampformat={tableschema[1]})") + except Exception: + logging.info( + f"Stacking failed using variables file to set schema for table {j}. Data types will be inferred if possible." + ) + try: + dat = duckdb.read_csv(tablepaths, + union_by_name=True, + filename=True) + except Exception: + if datasetq: + logging.info( + f"Inferring data types failed for table {j}. All variable types will be set to string." + ) + else: + logging.info( + f"Inferring data types failed for table {j}. All variable types will be set to string. Data type casting will be attempted after stacking step." + ) + stringset = True + try: + dat = duckdb.read_csv(tablepaths, + all_varchar=True, + filename=True) + except Exception: + logging.info( + f"Failed to stack table {j}. Check input data and variables file." + ) + return None + + # for dataset query, done here + if datasetq: + return dat + else: + pdat = dat.df() + + if stringset: + try: + pdat = cast_table_neon(pdat, tablepkgvar) + except Exception: + logging.info( + f"Data type casting failed for table {j}. Variable types set to string." + ) + + return pdat + def stack_data_files_parallel(folder, package, dpid, @@ -777,7 +921,6 @@ def stack_data_files_parallel(folder, if cloud_mode: filenames = [os.path.basename(f) for f in folder[0]] filepaths = folder[0] - gcs = fs.GcsFileSystem(anonymous=True) else: # Get filenames without full path filenames = find_datatables(folder=folder, f_names=False) @@ -875,14 +1018,8 @@ def stack_data_files_parallel(folder, [path for path in filepaths if "variables.20" in path] )[0] if cloud_mode: - vp = dataset.dataset( - source=re.sub("https://storage.googleapis.com/", "", varpath), - filesystem=gcs, - format="csv", - schema=varschema, - ) - va = vp.to_table() - v = va.to_pandas() + vp = duckdb.read_csv(varpath) + v = vp.df() else: v = pd.read_csv(varpath, sep=",") @@ -899,14 +1036,14 @@ def stack_data_files_parallel(folder, v = pd.concat([v, science_review_variables], ignore_index=True) # if sensor positions are present but missing from variables file, add variables + sensor_positions_map = ( + importlib_resources.files(__resources__) + / "sensor_positions_variables_mapping.csv" + ) + sensor_positions_internal_variables = pd.read_csv( + sensor_positions_map, index_col=None + ) if any("sensor_positions" in path for path in filepaths): - sensor_positions_map = ( - importlib_resources.files(__resources__) - / "sensor_positions_variables_mapping.csv" - ) - sensor_positions_internal_variables = pd.read_csv( - sensor_positions_map, index_col=None - ) if "sensor_positions" not in list(v["table"]): sensor_positions_file = ( importlib_resources.files(__resources__) @@ -927,14 +1064,8 @@ def stack_data_files_parallel(folder, [path for path in filepaths if "validation" in path] )[0] if cloud_mode: - vp = dataset.dataset( - source=re.sub("https://storage.googleapis.com/", "", valpath), - filesystem=gcs, - format="csv", - schema=None, - ) - va = vp.to_table() - val = va.to_pandas() + vp = duckdb.read_csv(valpath) + val = vp.df() else: val = pd.read_csv(valpath, sep=",") stacklist[f"validation_{dpnum}"] = val @@ -945,14 +1076,8 @@ def stack_data_files_parallel(folder, [path for path in filepaths if "categoricalCodes" in path] )[0] if cloud_mode: - cp = dataset.dataset( - source=re.sub("https://storage.googleapis.com/", "", ccpath), - filesystem=gcs, - format="csv", - schema=None, - ) - ca = cp.to_table() - cc = ca.to_pandas() + cp = duckdb.read_csv(ccpath) + cc = cp.df() else: cc = pd.read_csv(ccpath, sep=",") stacklist[f"categoricalCodes_{dpnum}"] = cc @@ -988,214 +1113,212 @@ def stack_data_files_parallel(folder, if progress: logging.info("Stacking data files") arrowvars = pa.Table.from_pandas(stacklist[f"variables_{dpnum}"]) + for j in tqdm(tables, disable=not progress): - # create schema from variables file, for only this table and package - vtab = arrowvars.filter(pa.compute.field("table") == j) - if len(vtab) == 0: - vtab = arrowvars.filter(pa.compute.field("table") == j + "_pub") - if j == "sensor_positions": - vtab = pa.Table.from_pandas(sensor_positions_internal_variables) - - if package == "basic": - vtabpkg = vtab.filter(pa.compute.field("downloadPkg") == "basic") - else: - vtabpkg = vtab - - tablepkgvar = vtabpkg.to_pandas() - if len(tablepkgvar) == 0: - # set to string if variables file can't be found - tableschema = None - logging.info( - f"NEON {dpid} variables file not found for table {j}. Data types will be inferred if possible." - ) - else: - tableschema = get_variables(tablepkgvar) - - # subset the list of files to the relevant table - tabler = re.compile("[.]" + j + "[.]|[.]" + j + "_pub[.]") - tablepaths = [f for f in filepaths if tabler.search(f)] - - # subset the list of files for lab-specific tables: - # get the most recent file from each lab - if table_types[j] == "lab": - labs = find_lab_names(tablepaths) - labrecent = list() - for k in labs: - labr = re.compile(k) - labpaths = [f for f in tablepaths if labr.search(f)] - labrecent.append(get_recent_publication(labpaths)[0]) - tablepaths = labrecent - - # subset the list of files for site-all tables: - # get the most recent file from each site - if table_types[j] == "site-all": - sites = find_sites(tablepaths) - siterecent = list() - for k in sites: - sr = re.compile(k) - sitepaths = [f for f in tablepaths if sr.search(f)] - siterecent.append(get_recent_publication(sitepaths)[0]) - tablepaths = siterecent - - # read data and append file names if cloud_mode: - tablebuckets = [ - re.sub(pattern="https://storage.googleapis.com/", repl="", string=b) - for b in tablepaths - ] - dat = dataset.dataset( - source=tablebuckets, filesystem=gcs, format="csv", schema=tableschema - ) + pdat = stack_files_duck(dpid=dpid, + j=j, + variables=stacklist[f"variables_{dpnum}"], + progress=progress, + sensor_positions_internal_variables=sensor_positions_internal_variables, + package=package, + filepaths=filepaths, + table_types=table_types, + datasetq=datasetq) else: + # create schema from variables file, for only this table and package + vtab = arrowvars.filter(pa.compute.field("table") == j) + if len(vtab) == 0: + vtab = arrowvars.filter(pa.compute.field("table") == j + "_pub") + if j == "sensor_positions": + vtab = pa.Table.from_pandas(sensor_positions_internal_variables) + + if package == "basic": + vtabpkg = vtab.filter(pa.compute.field("downloadPkg") == "basic") + else: + vtabpkg = vtab + + tablepkgvar = vtabpkg.to_pandas() + if len(tablepkgvar) == 0: + # set to string if variables file can't be found + tableschema = None + logging.info( + f"NEON {dpid} variables file not found for table {j}. Data types will be inferred if possible." + ) + else: + tableschema = get_variables(tablepkgvar) + + # subset the list of files to the relevant table + tabler = re.compile("[.]" + j + "[.]|[.]" + j + "_pub[.]") + tablepaths = [f for f in filepaths if tabler.search(f)] + + # subset the list of files for lab-specific tables: + # get the most recent file from each lab + if table_types[j] == "lab": + labs = find_lab_names(tablepaths) + labrecent = list() + for k in labs: + labr = re.compile(k) + labpaths = [f for f in tablepaths if labr.search(f)] + labrecent.append(get_recent_publication(labpaths)[0]) + tablepaths = labrecent + + # subset the list of files for site-all tables: + # get the most recent file from each site + if table_types[j] == "site-all": + sites = find_sites(tablepaths) + siterecent = list() + for k in sites: + sr = re.compile(k) + sitepaths = [f for f in tablepaths if sr.search(f)] + siterecent.append(get_recent_publication(sitepaths)[0]) + tablepaths = siterecent + + # read data and append file names dat = dataset.dataset(source=tablepaths, format="csv", schema=tableschema) - - if tableschema is None: - cols = dat.head(num_rows=0).column_names - else: - cols = tableschema.names - cols.append("__filename") - - # attempt to stack to table. if it fails, stack as all string fields and warn - stringset = False - try: - dattab = dat.to_table(columns=cols) - except Exception: + + if tableschema is None: + cols = dat.head(num_rows=0).column_names + else: + cols = tableschema.names + cols.append("__filename") + + # attempt to stack to table. if it fails, stack as all string fields and warn + stringset = False try: - if tableschema is None: - stringschema = unknown_string_schema( - dat.head(num_rows=0).column_names - ) - else: - stringschema = string_schema(tablepkgvar) - if cloud_mode: - dat = dataset.dataset( - source=tablebuckets, - filesystem=gcs, - format="csv", - schema=stringschema, - ) - else: + dattab = dat.to_table(columns=cols) + except Exception: + try: + if tableschema is None: + stringschema = unknown_string_schema( + dat.head(num_rows=0).column_names + ) + else: + stringschema = string_schema(tablepkgvar) dat = dataset.dataset( source=tablepaths, format="csv", schema=stringschema ) - dattab = dat.to_table(columns=cols) - logging.info( - f"Table {j} schema did not match data; all variable types set to string. Data type casting will be attempted after stacking step." - ) - stringset = True - except Exception: - logging.info( - f"Failed to stack table {j}. Check input data and variables file." - ) - continue - - # for dataset query, done here - if datasetq: - return dat - else: + dattab = dat.to_table(columns=cols) + logging.info( + f"Table {j} schema did not match data; all variable types set to string. Data type casting will be attempted after stacking step." + ) + stringset = True + except Exception: + logging.info( + f"Failed to stack table {j}. Check input data and variables file." + ) + continue + pdat = dattab.to_pandas() + + if stringset: + try: + pdat = cast_table_neon(pdat, tablepkgvar) + except Exception: + logging.info( + f"Data type casting failed for table {j}. Variable types set to string." + ) - if stringset: - try: - pdat = cast_table_neon(pdat, tablepkgvar) - except Exception: - logging.info( - f"Data type casting failed for table {j}. Variable types set to string." - ) - - # append publication date - pubr = re.compile("20[0-9]{6}T[0-9]{6}Z") - pubval = [pubr.search(os.path.basename(p)).group(0) for p in pdat["__filename"]] - pdat = pdat.assign(publicationDate=pubval) - - # append release tag - if cloud_mode: - pdat["release"] = pdat["__filename"].map(folder[1]) - releases.append(list(set(folder[1].values()))) - else: - pubrelr = re.compile("20[0-9]{6}T[0-9]{6}Z\\..*\\/") - pubrelval = [pubrelr.search(p).group(0) for p in pdat["__filename"]] - relval = [re.sub(".*\\.", "", s) for s in pubrelval] - relval = [re.sub("\\/", "", s) for s in relval] - pdat = pdat.assign(release=relval) - releases.append(list(set(relval))) - - # append fields to variables file - if f"variables_{dpnum}" in stacklist.keys(): - added_fields_file = ( - importlib_resources.files(__resources__) / "added_fields.csv" - ) - added_fields = pd.read_csv(added_fields_file, index_col=None) - added_fields_all = added_fields[-2:] - added_fields_all.insert(0, "table", j) - try: - vlist[j] = pd.concat([vlist[j], added_fields_all], ignore_index=True) - except Exception: - pass - - # for IS products, append domainID, siteID, HOR, VER - if "siteID" not in pdat.columns.to_list() and not table_types[j] == "lab": - dr = re.compile("D[0-2]{1}[0-9]{1}") - domval = [dr.search(d).group(0) for d in pdat["__filename"]] - pdat.insert(0, "domainID", domval) - - sr = re.compile("D[0-9]{2}[.][A-Z]{4}[.]") - sitel = [sr.search(s).group(0) for s in pdat["__filename"]] - siteval = [ - re.sub(pattern="D[0-9]{2}[.]|[.]", repl="", string=s) for s in sitel - ] - pdat.insert(1, "siteID", siteval) - - if j != "sensor_positions": - locr = re.compile("[.][0-9]{3}[.][0-9]{3}[.][0-9]{3}[.][0-9]{3}[.]|[.][0-9]{3}[.][0-9]{3}[.][0-9]{3}[.][0-9]{2}[A-Z]{1}[.]") - indtemp = [locr.search(l) for l in pdat["__filename"]] - if None in indtemp: - pdat = sort_dat(pdat) - else: - indxs = [lt.group(0) for lt in indtemp] - hor = [indx[5:8] for indx in indxs] - ver = [indx[9:12] for indx in indxs] - pdat.insert(2, "horizontalPosition", hor) - pdat.insert(3, "verticalPosition", ver) - - # sort table rows - pdat = sort_dat(pdat) - - # append fields to variables file - if f"variables_{dpnum}" in stacklist.keys(): - added_fields_IS = added_fields[0:4] - added_fields_IS.insert(0, "table", j) - try: - vlist[j] = pd.concat( - [added_fields_IS, vlist[j]], ignore_index=True - ) - except Exception: - pass - - else: - # for OS tables, sort by site and date - pdat = sort_dat(pdat) - - # for SRF files, remove duplicates and modified records - if j == "science_review_flags": - pdat = remove_srf_dups(pdat) - - # for sensor position files, align column names - if j == "sensor_positions": - pdat = align_sp_cols(pdat) - - # remove filename column - pdat = pdat.drop(columns=["__filename"]) - - # add table to list - if j == "science_review_flags" or j == "sensor_positions": - stacklist[f"{j}_{dpnum}"] = pdat + if datasetq: + return pdat else: - stacklist[j] = pdat - + + if not cloud_mode: + pdat.rename(columns={"__filename": "filename"}, inplace=True) + + # append publication date + pubr = re.compile("20[0-9]{6}T[0-9]{6}Z") + pubval = [pubr.search(os.path.basename(p)).group(0) for p in pdat["filename"]] + pdat = pdat.assign(publicationDate=pubval) + + # append release tag + if cloud_mode: + pdat["release"] = pdat["filename"].map(folder[1]) + releases.append(list(set(folder[1].values()))) + else: + pubrelr = re.compile("20[0-9]{6}T[0-9]{6}Z\\..*\\/") + pubrelval = [pubrelr.search(p).group(0) for p in pdat["filename"]] + relval = [re.sub(".*\\.", "", s) for s in pubrelval] + relval = [re.sub("\\/", "", s) for s in relval] + pdat = pdat.assign(release=relval) + releases.append(list(set(relval))) + + # append fields to variables file + if f"variables_{dpnum}" in stacklist.keys(): + added_fields_file = ( + importlib_resources.files(__resources__) / "added_fields.csv" + ) + added_fields = pd.read_csv(added_fields_file, index_col=None) + added_fields_all = added_fields[-2:] + added_fields_all.insert(0, "table", j) + try: + vlist[j] = pd.concat([vlist[j], added_fields_all], ignore_index=True) + except Exception: + pass + + # for IS products, append domainID, siteID, HOR, VER + if "siteID" not in pdat.columns.to_list() and not table_types[j] == "lab": + dr = re.compile("D[0-2]{1}[0-9]{1}") + domval = [dr.search(d).group(0) for d in pdat["filename"]] + pdat.insert(0, "domainID", domval) + + sr = re.compile("D[0-9]{2}[.][A-Z]{4}[.]") + sitel = [sr.search(s).group(0) for s in pdat["filename"]] + siteval = [ + re.sub(pattern="D[0-9]{2}[.]|[.]", repl="", string=s) for s in sitel + ] + pdat.insert(1, "siteID", siteval) + + if j != "sensor_positions": + locr = re.compile("[.][0-9]{3}[.][0-9]{3}[.][0-9]{3}[.][0-9]{3}[.]|[.][0-9]{3}[.][0-9]{3}[.][0-9]{3}[.][0-9]{2}[A-Z]{1}[.]") + indtemp = [locr.search(lz) for lz in pdat["filename"]] + if None in indtemp: + pdat = sort_dat(pdat) + else: + indxs = [lt.group(0) for lt in indtemp] + hor = [indx[5:8] for indx in indxs] + ver = [indx[9:12] for indx in indxs] + pdat.insert(2, "horizontalPosition", hor) + pdat.insert(3, "verticalPosition", ver) + + # sort table rows + pdat = sort_dat(pdat) + + # append fields to variables file + if f"variables_{dpnum}" in stacklist.keys(): + added_fields_IS = added_fields[0:4] + added_fields_IS.insert(0, "table", j) + try: + vlist[j] = pd.concat( + [added_fields_IS, vlist[j]], ignore_index=True + ) + except Exception: + pass + + else: + # for OS tables, sort by site and date + pdat = sort_dat(pdat) + + # for SRF files, remove duplicates and modified records + if j == "science_review_flags": + pdat = remove_srf_dups(pdat) + + # for sensor position files, align column names + if j == "sensor_positions": + pdat = align_sp_cols(pdat) + + # remove filename column + pdat = pdat.drop(columns=["filename"]) + + # add table to list + if j == "science_review_flags" or j == "sensor_positions": + stacklist[f"{j}_{dpnum}"] = pdat + else: + stacklist[j] = pdat + # final variables file stacklist[f"variables_{dpnum}"] = pd.concat(vlist, ignore_index=True) - + # get issue log table # token omitted here since it's not otherwise used in stacking functions # consider a runLocal option, like in R stackEddy() @@ -1203,7 +1326,7 @@ def stack_data_files_parallel(folder, stacklist[f"issueLog_{dpnum}"] = get_issue_log(dpid=dpid, token=None) except Exception: pass - + # get relevant citation(s) try: releases = sum(releases, []) @@ -1227,7 +1350,7 @@ def stack_data_files_parallel(folder, ) except Exception: pass - + return stacklist @@ -1641,7 +1764,7 @@ def dataset_query( """ This function uses the query endpoint of the NEON API to find the full list of files for a given data product, release, site(s), and date range, - then turns them into an arrow dataset. + then turns them into a duckdb dataset. Parameters ---------------- @@ -1684,7 +1807,7 @@ def dataset_query( Return --------------- - An arrow dataset for the data requested. + A duckdb lazy dataset for the data requested. Example --------------- @@ -1706,11 +1829,11 @@ def dataset_query( # query the /products endpoint for the product requested if release == "current" or release == "PROVISIONAL": prodreq = get_api( - api_url="https://data.neonscience.org/api/v0/products/" + dpid, token=token + api_url=baseurl + "products/" + dpid, token=token ) else: prodreq = get_api( - api_url="https://data.neonscience.org/api/v0/products/" + api_url=baseurl + "products/" + dpid + "?release=" + release, @@ -1743,9 +1866,10 @@ def dataset_query( ) if pubtype in ["TOS Data Product Type","TOS-structured TIS Data Product Type"]: - raise ValueError( - f"{dpid} is an observational data product. hor and ver are not valid inputs for this data product type." - ) + if hor is not None or ver is not None: + raise ValueError( + f"{dpid} is an observational data product. hor and ver are not valid inputs for this data product type." + ) if pubtype in ["TIS Data Product Type","AIS Data Product Type"]: if dpid in ["DP4.00200.001",