Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ jobs:
- run: pdm install -G:all --lockfile pylock.toml

# test
- run: pdm run test
- run: pdm run test -k 'not test_srs'

test-minimal-deps:
runs-on: ubuntu-latest
Expand All @@ -41,7 +41,7 @@ jobs:
- run: pdm install -G:all --lockfile pylock.minimal.toml

# test
- run: pdm run test
- run: pdm run test -k 'not test_srs'

test-maximal-deps:
runs-on: ubuntu-latest
Expand All @@ -57,4 +57,4 @@ jobs:
- run: pdm install -G:all --lockfile pylock.maximal.toml

# test
- run: pdm run test
- run: pdm run test -k 'not test_srs'
292 changes: 146 additions & 146 deletions pylock.lint.toml

Large diffs are not rendered by default.

88 changes: 44 additions & 44 deletions pylock.maximal.toml

Large diffs are not rendered by default.

62 changes: 29 additions & 33 deletions pylock.minimal.toml

Large diffs are not rendered by default.

292 changes: 146 additions & 146 deletions pylock.toml

Large diffs are not rendered by default.

5 changes: 3 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ authors = [
{ name = "Nassib Nassar", email = "nassib@indexdata.com" },
]
dependencies = [
# 1.3 introduces json each
"duckdb>=1.3",
# 1.4 introduces WITH ORDINALITY
# 1.4.1 fixes https://github.com/duckdb/duckdb-python/issues/78
"duckdb>=1.4.1",
# pytz is required for timestamptz columns in duckdb
# 2021.3 was released with 3.10
"pytz>=2021.3",
Expand Down
46 changes: 11 additions & 35 deletions src/ldlite/database/_duckdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,17 @@ class DuckDbDatabase(TypedDatabase[duckdb.DuckDBPyConnection]):
def __init__(self, db: duckdb.DuckDBPyConnection) -> None:
# See the notes below for why we're monkey patching DuckDB
super().__init__(
lambda: cast(
lambda _: cast(
"duckdb.DuckDBPyConnection",
_MonkeyDBPyConnection(db.cursor()),
),
)

@staticmethod
def _setup_jfuncs(conn: duckdb.DuckDBPyConnection) -> None:
with conn.cursor() as cur:
with self._conn_factory(True) as cur:
cur.execute(
r"""
CREATE OR REPLACE FUNCTION ldlite_system.jtype_of(j) AS
-- These are shims to be able to use the postgres native operations.
CREATE OR REPLACE FUNCTION jsonb_typeof(j) AS
CASE coalesce(main.json_type(j), 'NULL')
WHEN 'VARCHAR' THEN 'string'
WHEN 'BIGINT' THEN 'number'
Expand All @@ -42,38 +41,14 @@ def _setup_jfuncs(conn: duckdb.DuckDBPyConnection) -> None:
END
;

CREATE OR REPLACE FUNCTION ldlite_system.jobject_keys(j) AS TABLE
SELECT je.key as ld_key FROM json_each(j) je ORDER BY je.id
CREATE OR REPLACE FUNCTION jsonb_object_keys(j) AS TABLE
SELECT je.key as ld_key, id as "ordinality" FROM json_each(j) je ORDER BY je.id
;

CREATE OR REPLACE FUNCTION ldlite_system.jis_uuid(j) AS
regexp_full_match(main.json_extract_string(j, '$'), '(?i)^[a-f0-9]{8}-[a-f0-9]{4}-[1-5][a-f0-9]{3}-[89abAB][a-f0-9]{3}-[a-f0-9]{12}$')
;

CREATE OR REPLACE FUNCTION ldlite_system.jis_datetime(j) AS
regexp_full_match(main.json_extract_string(j, '$'), '^\d{4}-[01]\d-[0123]\dT[012]\d:[012345]\d:[012345]\d\.\d{3}(\+\d{2}:\d{2})?$')
;

CREATE OR REPLACE FUNCTION ldlite_system.jis_float(j) AS
main.json_type(j) == 'DOUBLE'
;

CREATE OR REPLACE FUNCTION ldlite_system.jis_bigint(j) AS
COALESCE(TRY_CAST(j AS NUMERIC), 0) > 2147483647
;

CREATE OR REPLACE FUNCTION ldlite_system.jis_null(j) AS
j IS NULL OR j == 'null'::JSON OR main.json_extract_string(j, '$') IN ('NULL', 'null', '', '{}', '[]')
;

CREATE OR REPLACE FUNCTION ldlite_system.jexplode(j) AS TABLE (
SELECT value as ld_value FROM main.json_each(j)
CREATE OR REPLACE FUNCTION jsonb_array_elements(j) AS TABLE (
SELECT value as ld_value, rowid + 1 AS "ordinality" FROM main.json_each(j)
);

CREATE OR REPLACE FUNCTION ldlite_system.jself_string(j) AS
main.json_extract_string(j, '$')
;
""", # noqa: E501
""",
)

@property
Expand All @@ -92,7 +67,7 @@ def ingest_records(
pfx = Prefix(prefix)
download_started = datetime.now(timezone.utc)
pkey = count(1)
with self._conn_factory() as conn:
with self._conn_factory(False) as conn:
self._prepare_raw_table(conn, pfx)

insert_sql = (
Expand Down Expand Up @@ -132,6 +107,7 @@ def close(self) -> None:
def execute(self, *args, **kwargs) -> duckdb.DuckDBPyConnection:
print(args[0])
return self._cur.execute(*args, **kwargs)

"""

def __enter__(self) -> "Self":
Expand Down
85 changes: 49 additions & 36 deletions src/ldlite/database/_expansion/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,20 @@ def expand_nonmarc(
root_name: str,
root_values: list[str],
ctx: ExpandContext,
) -> list[str]:
(_, created_tables) = _expand_nonmarc(
) -> list[tuple[str, sql.Composed]]:
(_, tables_to_create) = _expand_nonmarc(
ObjectNode(root_name, "", None, root_values),
0,
ctx,
)
return created_tables
return tables_to_create


def _expand_nonmarc(
def _expand_nonmarc( # noqa: PLR0915
root: ObjectNode,
count: int,
ctx: ExpandContext,
) -> tuple[int, list[str]]:
) -> tuple[int, list[tuple[str, sql.Composed]]]:
ctx.scan_progress.total = (ctx.scan_progress.total or 0) + 1
ctx.scan_progress.refresh()
ctx.transform_progress.total = (ctx.transform_progress.total or 0) + 1
Expand All @@ -45,6 +45,13 @@ def _expand_nonmarc(
if not has_rows:
return (0, [])

with ctx.conn.cursor() as cur:
cur.execute(
sql.SQL("DROP TABLE {previous_table}")
.format(previous_table=ctx.source_table)
.as_string(),
)

expand_children_of = deque([root])
while expand_children_of:
on = expand_children_of.popleft()
Expand All @@ -63,11 +70,17 @@ def _expand_nonmarc(
ctx.get_transform_table(count + 1),
ctx.source_cte(False),
)
with ctx.conn.cursor() as cur:
cur.execute(
sql.SQL("DROP TABLE {previous_table}")
.format(previous_table=ctx.get_transform_table(count))
.as_string(),
)
expand_children_of.append(c)
count += 1
ctx.transform_progress.update(1)

created_tables = []
tables_to_create = []

new_source_table = ctx.get_transform_table(count)
arrays = root.descendents_oftype(ArrayNode)
Expand All @@ -86,7 +99,7 @@ def _expand_nonmarc(
count += 1
ctx.transform_progress.update(1)

if an.meta.is_object:
if an.is_object:
(sub_index, array_tables) = _expand_nonmarc(
ObjectNode(
an.name,
Expand All @@ -101,29 +114,29 @@ def _expand_nonmarc(
),
)
count += sub_index
created_tables.extend(array_tables)
tables_to_create.extend(array_tables)
else:
with ctx.conn.cursor() as cur:
(tname, tid) = ctx.get_output_table(an.name)
created_tables.append(tname)
cur.execute(
sql.SQL(
"""
tables_to_create.append(
(
tname,
sql.SQL(
"""
CREATE TABLE {dest_table} AS
"""
+ ctx.source_cte(False)
+ """
+ ctx.source_cte(False)
+ """
SELECT {cols} FROM ld_source
""",
)
.format(
dest_table=tid,
source_table=ctx.get_transform_table(count),
cols=sql.SQL("\n ,").join(
[sql.Identifier(v) for v in [*values, an.name]],
).format(
dest_table=tid,
source_table=ctx.get_transform_table(count),
cols=sql.SQL("\n ,").join(
[sql.Identifier(v) for v in [*values, an.name]],
),
),
)
.as_string(),
),
)

stamped_values = [
Expand All @@ -132,23 +145,23 @@ def _expand_nonmarc(

with ctx.conn.cursor() as cur:
(tname, tid) = ctx.get_output_table(root.path)
created_tables.append(tname)
cur.execute(
sql.SQL(
"""
tables_to_create.append(
(
tname,
sql.SQL(
"""
CREATE TABLE {dest_table} AS
"""
+ ctx.source_cte(False)
+ """
+ ctx.source_cte(False)
+ """
SELECT {cols} FROM ld_source
""",
)
.format(
dest_table=tid,
source_table=new_source_table,
cols=sql.SQL("\n ,").join(stamped_values),
)
.as_string(),
).format(
dest_table=tid,
source_table=new_source_table,
cols=sql.SQL("\n ,").join(stamped_values),
),
),
)

return (count + 1 - initial_count, created_tables)
return (count + 1 - initial_count, tables_to_create)
Loading
Loading