Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
d2de294
Begin the rewrite with new Node class and implement the fixed value
brycekbargar Mar 23, 2026
d8dcbda
Implement getting the keys and types for objects
brycekbargar Mar 23, 2026
25ae3e5
Remove the identity node type
brycekbargar Mar 23, 2026
1c460d6
Implement staging the array as a temp file and getting the type
brycekbargar Mar 24, 2026
c1c2356
Implement the new simplified transform algorithm
brycekbargar Mar 24, 2026
755d631
Track scan progress
brycekbargar Mar 24, 2026
0427c60
Refactor sql statement generation location
brycekbargar Mar 24, 2026
a016de6
Share more of the json object traversal code
brycekbargar Mar 24, 2026
dd40621
Skeleton out the create table sql statements
brycekbargar Mar 24, 2026
154644f
Build sql with output tables and columns
brycekbargar Mar 24, 2026
8e00886
Clear the indexing time when the transform finishes
brycekbargar Mar 24, 2026
b3bed23
Refactor to use the rewrite expansion
brycekbargar Mar 24, 2026
04b41c9
Use postgres sql format properly
brycekbargar Mar 24, 2026
3cf5217
Add a duckdb jsonb_each shim
brycekbargar Mar 24, 2026
60d158e
Various syntax fixes
brycekbargar Mar 24, 2026
df0f2f8
Fix aliasing for basic datatypes column test
brycekbargar Mar 24, 2026
72449e9
Fix snake case naming
brycekbargar Mar 24, 2026
bc7c490
Fix basic object expansion
brycekbargar Mar 24, 2026
5d55799
WIP: Making array expansion work
brycekbargar Mar 25, 2026
1f521f2
Infer NodeContext from Node structure
brycekbargar Mar 25, 2026
a7dc2f2
Implement json_depth
brycekbargar Mar 25, 2026
2552364
Replace legacy version with the rewrite
brycekbargar Mar 25, 2026
7d25e53
Remove delete from returning construct
brycekbargar Mar 26, 2026
351c9fa
Inline the ANALYZE statements necessary for the transform
brycekbargar Mar 26, 2026
7955d21
Invert json_depth check and simplify unparenting empty arrays
brycekbargar Mar 26, 2026
870dc1c
Analyze important column in raw table for postgres
brycekbargar Mar 26, 2026
e2bec99
Re-enable flaky source records CI test
brycekbargar Mar 26, 2026
82a7ffd
Cleanup transform_progress
brycekbargar Mar 26, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ jobs:
- run: pdm install -G:all --lockfile pylock.toml

# test
- run: pdm run test -k 'not test_srs'
#- run: pdm run test -k 'not test_srs'
- run: pdm run test

test-minimal-deps:
runs-on: ubuntu-latest
Expand All @@ -41,7 +42,8 @@ jobs:
- run: pdm install -G:all --lockfile pylock.minimal.toml

# test
- run: pdm run test -k 'not test_srs'
#- run: pdm run test -k 'not test_srs'
- run: pdm run test

test-maximal-deps:
runs-on: ubuntu-latest
Expand All @@ -57,4 +59,5 @@ jobs:
- run: pdm install -G:all --lockfile pylock.maximal.toml

# test
- run: pdm run test -k 'not test_srs'
#- run: pdm run test -k 'not test_srs'
- run: pdm run test
11 changes: 4 additions & 7 deletions src/ldlite/database/_duckdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,13 @@ def __init__(self, db: duckdb.DuckDBPyConnection) -> None:
END
;

CREATE OR REPLACE FUNCTION jsonb_object_keys(j) AS TABLE
SELECT je.key as ld_key, id as "ordinality" FROM json_each(j) je ORDER BY je.id
;

CREATE OR REPLACE FUNCTION jsonb_array_elements(j) AS TABLE (
SELECT value as ld_value, rowid + 1 AS "ordinality" FROM main.json_each(j)
);

CREATE OR REPLACE FUNCTION jsonb_each(j) AS TABLE (
SELECT key, value, rowid AS "ordinality" FROM main.json_each(j)
)
""",
)

Expand Down Expand Up @@ -86,9 +86,6 @@ def ingest_records(

return total

def source_table_cte_stmt(self, keep_source: bool) -> str: # noqa: ARG002
return "WITH ld_source AS (SELECT * FROM {source_table})"


# DuckDB has some strong opinions about cursors that are different than postgres
# https://github.com/duckdb/duckdb/issues/11018
Expand Down
241 changes: 85 additions & 156 deletions src/ldlite/database/_expansion/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,165 +3,94 @@
from collections import deque
from typing import TYPE_CHECKING

from psycopg import sql
if TYPE_CHECKING:
from collections.abc import Callable, Iterator
from typing import NoReturn

from psycopg import sql
from tqdm import tqdm


from .node import Conn, Node
from .recursive_nodes import ArrayNode, ObjectNode, RootNode


def _non_srs_statements(
conn: Conn,
source_table: sql.Identifier,
output_table: Callable[[str | None], tuple[str, sql.Identifier]],
json_depth: int,
scan_progress: tqdm[NoReturn],
) -> Iterator[tuple[str, sql.Composed]]:
# Here be dragons! The nodes have inner state manipulations
# that violate the space/time continuum:
# * o.load_columns
# * a.make_temp
# * t.specify_type
# These all are expected to be called before generating the sql
# as they load/prepare database information.
# Because building up to the transformation statements takes a long time
# we're doing all that work up front to keep the time that
# a transaction is opened to a minimum (which is a leaky abstraction).
scan_progress.total = scan_progress.total if scan_progress.total is not None else 1

root = RootNode(source_table, output_table)
onodes: deque[ObjectNode] = deque([root])
while onodes:
o = onodes.popleft()

if o.depth >= json_depth:
o.make_jsonb()
scan_progress.update(1)
continue

from .nodes import ArrayNode, ObjectNode
o.load_columns(conn)
scan_progress.total += len(o.direct(Node))
scan_progress.update(1)

if TYPE_CHECKING:
from .context import ExpandContext
onodes.extend(o.direct(ObjectNode))
anodes = deque(o.direct(ArrayNode))
while anodes:
a = anodes.popleft()

if a.depth >= json_depth:
a.make_jsonb()
scan_progress.update(1)
continue

if n := a.make_temp(conn):
if isinstance(n, ObjectNode):
onodes.append(n)
if isinstance(n, ArrayNode):
anodes.append(n)
scan_progress.total += 1
else:
a.unparent()

scan_progress.update(1)

def expand_nonmarc(
root_name: str,
root_values: list[str],
ctx: ExpandContext,
for t in root.typed_nodes():
t.specify_type(conn)
scan_progress.update(1)

yield root.create_statement
for a in root.descendents(ArrayNode):
yield a.create_statement


def non_srs_statements(
conn: Conn,
source_table: sql.Identifier,
output_table: Callable[[str | None], tuple[str, sql.Identifier]],
json_depth: int,
scan_progress: tqdm[NoReturn],
) -> list[tuple[str, sql.Composed]]:
(_, tables_to_create) = _expand_nonmarc(
ObjectNode(root_name, "", None, root_values),
0,
ctx,
return list(
_non_srs_statements(
conn,
source_table,
output_table,
json_depth,
scan_progress,
),
)
return tables_to_create


def _expand_nonmarc( # noqa: PLR0915
root: ObjectNode,
count: int,
ctx: ExpandContext,
) -> tuple[int, list[tuple[str, sql.Composed]]]:
ctx.scan_progress.total = (ctx.scan_progress.total or 0) + 1
ctx.scan_progress.refresh()
ctx.transform_progress.total = (ctx.transform_progress.total or 0) + 1
ctx.transform_progress.refresh()
initial_count = count
ctx.preprocess(ctx.conn, ctx.source_table, [root.identifier])
has_rows = root.unnest(
ctx,
ctx.source_table,
ctx.get_transform_table(count),
ctx.source_cte(False),
)
ctx.transform_progress.update(1)
if not has_rows:
return (0, [])

with ctx.conn.cursor() as cur:
cur.execute(
sql.SQL("DROP TABLE {previous_table}")
.format(previous_table=ctx.source_table)
.as_string(),
)

expand_children_of = deque([root])
while expand_children_of:
on = expand_children_of.popleft()
if ctx.transform_progress:
ctx.transform_progress.total += len(on.object_children)
ctx.transform_progress.refresh()
for c in on.object_children:
if len(c.parents) >= ctx.json_depth:
if c.parent is not None:
c.parent.values.append(c.name)
continue
ctx.preprocess(ctx.conn, ctx.get_transform_table(count), [c.identifier])
c.unnest(
ctx,
ctx.get_transform_table(count),
ctx.get_transform_table(count + 1),
ctx.source_cte(False),
)
with ctx.conn.cursor() as cur:
cur.execute(
sql.SQL("DROP TABLE {previous_table}")
.format(previous_table=ctx.get_transform_table(count))
.as_string(),
)
expand_children_of.append(c)
count += 1
ctx.transform_progress.update(1)

tables_to_create = []

new_source_table = ctx.get_transform_table(count)
arrays = root.descendents_oftype(ArrayNode)
ctx.transform_progress.total += len(arrays)
ctx.transform_progress.refresh()
ctx.preprocess(ctx.conn, new_source_table, [a.identifier for a in arrays])
for an in arrays:
if len(an.parents) >= ctx.json_depth:
continue
values = an.explode(
ctx.conn,
new_source_table,
ctx.get_transform_table(count + 1),
ctx.source_cte(True),
)
count += 1
ctx.transform_progress.update(1)

if an.is_object:
(sub_index, array_tables) = _expand_nonmarc(
ObjectNode(
an.name,
an.name,
None,
values,
),
count + 1,
ctx.array_context(
ctx.get_transform_table(count),
ctx.json_depth - len(an.parents),
),
)
count += sub_index
tables_to_create.extend(array_tables)
else:
with ctx.conn.cursor() as cur:
(tname, tid) = ctx.get_output_table(an.name)
tables_to_create.append(
(
tname,
sql.SQL(
"""
CREATE TABLE {dest_table} AS
"""
+ ctx.source_cte(False)
+ """
SELECT {cols} FROM ld_source
""",
).format(
dest_table=tid,
source_table=ctx.get_transform_table(count),
cols=sql.SQL("\n ,").join(
[sql.Identifier(v) for v in [*values, an.name]],
),
),
),
)

stamped_values = [
sql.Identifier(v) for n in root.descendents if n not in arrays for v in n.values
]

with ctx.conn.cursor() as cur:
(tname, tid) = ctx.get_output_table(root.path)
tables_to_create.append(
(
tname,
sql.SQL(
"""
CREATE TABLE {dest_table} AS
"""
+ ctx.source_cte(False)
+ """
SELECT {cols} FROM ld_source
""",
).format(
dest_table=tid,
source_table=new_source_table,
cols=sql.SQL("\n ,").join(stamped_values),
),
),
)

return (count + 1 - initial_count, tables_to_create)
52 changes: 0 additions & 52 deletions src/ldlite/database/_expansion/context.py

This file was deleted.

Loading
Loading