diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index 410cdf6..dbbe015 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -25,7 +25,8 @@ jobs: - run: pdm install -G:all --lockfile pylock.toml # test - - run: pdm run test -k 'not test_srs' + #- run: pdm run test -k 'not test_srs' + - run: pdm run test test-minimal-deps: runs-on: ubuntu-latest @@ -41,7 +42,8 @@ jobs: - run: pdm install -G:all --lockfile pylock.minimal.toml # test - - run: pdm run test -k 'not test_srs' + #- run: pdm run test -k 'not test_srs' + - run: pdm run test test-maximal-deps: runs-on: ubuntu-latest @@ -57,4 +59,5 @@ jobs: - run: pdm install -G:all --lockfile pylock.maximal.toml # test - - run: pdm run test -k 'not test_srs' + #- run: pdm run test -k 'not test_srs' + - run: pdm run test diff --git a/src/ldlite/database/_duckdb.py b/src/ldlite/database/_duckdb.py index de0324d..f4268ea 100644 --- a/src/ldlite/database/_duckdb.py +++ b/src/ldlite/database/_duckdb.py @@ -41,13 +41,13 @@ def __init__(self, db: duckdb.DuckDBPyConnection) -> None: END ; -CREATE OR REPLACE FUNCTION jsonb_object_keys(j) AS TABLE - SELECT je.key as ld_key, id as "ordinality" FROM json_each(j) je ORDER BY je.id -; - CREATE OR REPLACE FUNCTION jsonb_array_elements(j) AS TABLE ( SELECT value as ld_value, rowid + 1 AS "ordinality" FROM main.json_each(j) ); + +CREATE OR REPLACE FUNCTION jsonb_each(j) AS TABLE ( + SELECT key, value, rowid AS "ordinality" FROM main.json_each(j) +) """, ) @@ -86,9 +86,6 @@ def ingest_records( return total - def source_table_cte_stmt(self, keep_source: bool) -> str: # noqa: ARG002 - return "WITH ld_source AS (SELECT * FROM {source_table})" - # DuckDB has some strong opinions about cursors that are different than postgres # https://github.com/duckdb/duckdb/issues/11018 diff --git a/src/ldlite/database/_expansion/__init__.py b/src/ldlite/database/_expansion/__init__.py index 0d1fe41..bb33ded 100644 --- a/src/ldlite/database/_expansion/__init__.py +++ b/src/ldlite/database/_expansion/__init__.py @@ -3,165 +3,94 @@ from collections import deque from typing import TYPE_CHECKING -from psycopg import sql +if TYPE_CHECKING: + from collections.abc import Callable, Iterator + from typing import NoReturn + + from psycopg import sql + from tqdm import tqdm + + +from .node import Conn, Node +from .recursive_nodes import ArrayNode, ObjectNode, RootNode + + +def _non_srs_statements( + conn: Conn, + source_table: sql.Identifier, + output_table: Callable[[str | None], tuple[str, sql.Identifier]], + json_depth: int, + scan_progress: tqdm[NoReturn], +) -> Iterator[tuple[str, sql.Composed]]: + # Here be dragons! The nodes have inner state manipulations + # that violate the space/time continuum: + # * o.load_columns + # * a.make_temp + # * t.specify_type + # These all are expected to be called before generating the sql + # as they load/prepare database information. + # Because building up to the transformation statements takes a long time + # we're doing all that work up front to keep the time that + # a transaction is opened to a minimum (which is a leaky abstraction). + scan_progress.total = scan_progress.total if scan_progress.total is not None else 1 + + root = RootNode(source_table, output_table) + onodes: deque[ObjectNode] = deque([root]) + while onodes: + o = onodes.popleft() + + if o.depth >= json_depth: + o.make_jsonb() + scan_progress.update(1) + continue -from .nodes import ArrayNode, ObjectNode + o.load_columns(conn) + scan_progress.total += len(o.direct(Node)) + scan_progress.update(1) -if TYPE_CHECKING: - from .context import ExpandContext + onodes.extend(o.direct(ObjectNode)) + anodes = deque(o.direct(ArrayNode)) + while anodes: + a = anodes.popleft() + + if a.depth >= json_depth: + a.make_jsonb() + scan_progress.update(1) + continue + + if n := a.make_temp(conn): + if isinstance(n, ObjectNode): + onodes.append(n) + if isinstance(n, ArrayNode): + anodes.append(n) + scan_progress.total += 1 + else: + a.unparent() + scan_progress.update(1) -def expand_nonmarc( - root_name: str, - root_values: list[str], - ctx: ExpandContext, + for t in root.typed_nodes(): + t.specify_type(conn) + scan_progress.update(1) + + yield root.create_statement + for a in root.descendents(ArrayNode): + yield a.create_statement + + +def non_srs_statements( + conn: Conn, + source_table: sql.Identifier, + output_table: Callable[[str | None], tuple[str, sql.Identifier]], + json_depth: int, + scan_progress: tqdm[NoReturn], ) -> list[tuple[str, sql.Composed]]: - (_, tables_to_create) = _expand_nonmarc( - ObjectNode(root_name, "", None, root_values), - 0, - ctx, + return list( + _non_srs_statements( + conn, + source_table, + output_table, + json_depth, + scan_progress, + ), ) - return tables_to_create - - -def _expand_nonmarc( # noqa: PLR0915 - root: ObjectNode, - count: int, - ctx: ExpandContext, -) -> tuple[int, list[tuple[str, sql.Composed]]]: - ctx.scan_progress.total = (ctx.scan_progress.total or 0) + 1 - ctx.scan_progress.refresh() - ctx.transform_progress.total = (ctx.transform_progress.total or 0) + 1 - ctx.transform_progress.refresh() - initial_count = count - ctx.preprocess(ctx.conn, ctx.source_table, [root.identifier]) - has_rows = root.unnest( - ctx, - ctx.source_table, - ctx.get_transform_table(count), - ctx.source_cte(False), - ) - ctx.transform_progress.update(1) - if not has_rows: - return (0, []) - - with ctx.conn.cursor() as cur: - cur.execute( - sql.SQL("DROP TABLE {previous_table}") - .format(previous_table=ctx.source_table) - .as_string(), - ) - - expand_children_of = deque([root]) - while expand_children_of: - on = expand_children_of.popleft() - if ctx.transform_progress: - ctx.transform_progress.total += len(on.object_children) - ctx.transform_progress.refresh() - for c in on.object_children: - if len(c.parents) >= ctx.json_depth: - if c.parent is not None: - c.parent.values.append(c.name) - continue - ctx.preprocess(ctx.conn, ctx.get_transform_table(count), [c.identifier]) - c.unnest( - ctx, - ctx.get_transform_table(count), - ctx.get_transform_table(count + 1), - ctx.source_cte(False), - ) - with ctx.conn.cursor() as cur: - cur.execute( - sql.SQL("DROP TABLE {previous_table}") - .format(previous_table=ctx.get_transform_table(count)) - .as_string(), - ) - expand_children_of.append(c) - count += 1 - ctx.transform_progress.update(1) - - tables_to_create = [] - - new_source_table = ctx.get_transform_table(count) - arrays = root.descendents_oftype(ArrayNode) - ctx.transform_progress.total += len(arrays) - ctx.transform_progress.refresh() - ctx.preprocess(ctx.conn, new_source_table, [a.identifier for a in arrays]) - for an in arrays: - if len(an.parents) >= ctx.json_depth: - continue - values = an.explode( - ctx.conn, - new_source_table, - ctx.get_transform_table(count + 1), - ctx.source_cte(True), - ) - count += 1 - ctx.transform_progress.update(1) - - if an.is_object: - (sub_index, array_tables) = _expand_nonmarc( - ObjectNode( - an.name, - an.name, - None, - values, - ), - count + 1, - ctx.array_context( - ctx.get_transform_table(count), - ctx.json_depth - len(an.parents), - ), - ) - count += sub_index - tables_to_create.extend(array_tables) - else: - with ctx.conn.cursor() as cur: - (tname, tid) = ctx.get_output_table(an.name) - tables_to_create.append( - ( - tname, - sql.SQL( - """ -CREATE TABLE {dest_table} AS -""" - + ctx.source_cte(False) - + """ -SELECT {cols} FROM ld_source -""", - ).format( - dest_table=tid, - source_table=ctx.get_transform_table(count), - cols=sql.SQL("\n ,").join( - [sql.Identifier(v) for v in [*values, an.name]], - ), - ), - ), - ) - - stamped_values = [ - sql.Identifier(v) for n in root.descendents if n not in arrays for v in n.values - ] - - with ctx.conn.cursor() as cur: - (tname, tid) = ctx.get_output_table(root.path) - tables_to_create.append( - ( - tname, - sql.SQL( - """ -CREATE TABLE {dest_table} AS -""" - + ctx.source_cte(False) - + """ -SELECT {cols} FROM ld_source -""", - ).format( - dest_table=tid, - source_table=new_source_table, - cols=sql.SQL("\n ,").join(stamped_values), - ), - ), - ) - - return (count + 1 - initial_count, tables_to_create) diff --git a/src/ldlite/database/_expansion/context.py b/src/ldlite/database/_expansion/context.py deleted file mode 100644 index 7b9e29a..0000000 --- a/src/ldlite/database/_expansion/context.py +++ /dev/null @@ -1,52 +0,0 @@ -from __future__ import annotations - -from dataclasses import dataclass -from typing import TYPE_CHECKING, NoReturn - -if TYPE_CHECKING: - from collections.abc import Callable - - import duckdb - import psycopg - from psycopg import sql - from tqdm import tqdm - - -@dataclass -class ExpandContext: - conn: duckdb.DuckDBPyConnection | psycopg.Connection - source_table: sql.Identifier - json_depth: int - get_transform_table: Callable[[int], sql.Identifier] - get_output_table: Callable[[str], tuple[str, sql.Identifier]] - # This is necessary for Analyzing the table in pg before querying it - # I don't love how this is implemented - preprocess: Callable[ - [ - duckdb.DuckDBPyConnection | psycopg.Connection, - sql.Identifier, - list[sql.Identifier], - ], - None, - ] - # source_cte will go away when DuckDB implements CTAS RETURNING - source_cte: Callable[[bool], str] - scan_progress: tqdm[NoReturn] - transform_progress: tqdm[NoReturn] - - def array_context( - self, - new_source_table: sql.Identifier, - new_json_depth: int, - ) -> ExpandContext: - return ExpandContext( - self.conn, - new_source_table, - new_json_depth, - self.get_transform_table, - self.get_output_table, - self.preprocess, - self.source_cte, - self.scan_progress, - self.transform_progress, - ) diff --git a/src/ldlite/database/_expansion/fixed_nodes.py b/src/ldlite/database/_expansion/fixed_nodes.py new file mode 100644 index 0000000..3dbb505 --- /dev/null +++ b/src/ldlite/database/_expansion/fixed_nodes.py @@ -0,0 +1,181 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING, Literal + +from psycopg import sql + +if TYPE_CHECKING: + from typing import TypeAlias + + +from .node import Conn, Node + +JsonType: TypeAlias = Literal["array", "object", "string", "number", "boolean", "jsonb"] + + +class FixedValueNode(Node): + def __init__( + self, + source: sql.Identifier, + prop: str | None, + path: sql.Composable, + prefix: str, + ): + super().__init__(source, prop) + + self.path = path + self.prefix = prefix + + @property + def alias(self) -> str: + if len(self.prefix) == 0: + return self.snake_prop or "" + return self.prefix + ( + ("__" + self.snake_prop) if self.snake_prop is not None else "" + ) + + @property + def stmt(self) -> sql.Composable: + # this should be abstract but Python can't use ABCs as a generic + return sql.SQL("") + + +class TypedNode(FixedValueNode): + def __init__( + self, + source: sql.Identifier, + prop: str | None, + path: sql.Composable, + prefix: str, + json_types: tuple[JsonType, JsonType], + ): + super().__init__(source, prop, path, prefix) + + self.is_mixed = json_types[0] != json_types[1] + self.json_type: JsonType = "string" if self.is_mixed else json_types[0] + self.is_uuid = False + self.is_datetime = False + self.is_float = False + self.is_bigint = False + + @property + def json_string(self) -> sql.Composable: + if self.prop is None: + str_extract = ( + sql.SQL("""TRIM(BOTH '"' FROM (""") + self.path + sql.SQL(")::text)") + ) + else: + str_extract = self.path + sql.SQL("->>") + sql.Literal(self.prop) + + return sql.SQL("NULLIF(NULLIF(") + str_extract + sql.SQL(", ''), 'null')") + + @property + def stmt(self) -> sql.Composable: + type_extract: sql.Composable + if self.json_type == "jsonb": + type_extract = self.path + elif self.json_type == "number" and self.is_float: + type_extract = self.json_string + sql.SQL("::numeric") + elif self.json_type == "number" and self.is_bigint: + type_extract = self.json_string + sql.SQL("::bigint") + elif self.json_type == "number": + type_extract = self.json_string + sql.SQL("::integer") + elif self.json_type == "boolean": + type_extract = self.json_string + sql.SQL("::bool") + elif self.json_type == "string" and self.is_uuid: + type_extract = self.json_string + sql.SQL("::uuid") + elif self.json_type == "string" and self.is_datetime: + type_extract = self.json_string + sql.SQL("::timestamptz") + else: + type_extract = self.json_string + + return type_extract + sql.SQL(" AS ") + sql.Identifier(self.alias) + + def specify_type(self, conn: Conn) -> None: + if self.is_mixed or self.json_type not in ["string", "number"]: + return + + cte = ( + sql.SQL(""" +WITH string_values AS MATERIALIZED ( + SELECT """) + + self.json_string + + sql.SQL(""" AS string_value + FROM {source} +)""").format(source=self.source) + ) + + if self.json_type == "string": + with conn.cursor() as cur: + specify = cte + sql.SQL(""" +SELECT + NOT EXISTS( + SELECT 1 FROM string_values + WHERE + string_value IS NOT NULL AND + string_value NOT LIKE '________-____-____-____-____________' + ) AS is_uuid + ,NOT EXISTS( + SELECT 1 FROM string_values + WHERE + string_value IS NOT NULL AND + ( + string_value NOT LIKE '____-__-__T__:__:__.___' AND + string_value NOT LIKE '____-__-__T__:__:__.___+__:__' + ) + ) AS is_uuid;""") + + cur.execute(specify.as_string()) + if row := cur.fetchone(): + (self.is_uuid, self.is_datetime) = row + + if self.json_type == "number": + with conn.cursor() as cur: + specify = cte + sql.SQL(""" +SELECT + EXISTS( + SELECT 1 FROM string_values + WHERE + string_value IS NOT NULL AND + string_value::numeric % 1 <> 0 + ) AS is_float + ,EXISTS( + SELECT 1 FROM string_values + WHERE + string_value IS NOT NULL AND + string_value::numeric > 2147483647 + ) AS is_bigint;""") + + cur.execute(specify.as_string()) + if row := cur.fetchone(): + (self.is_float, self.is_bigint) = row + + +class OrdinalNode(FixedValueNode): + def __init__( + self, + source: sql.Identifier, + path: sql.Composable, + prefix: str, + ): + super().__init__(source, None, path, prefix) + + @property + def alias(self) -> str: + return self.prefix + "__o" + + @property + def stmt(self) -> sql.Composable: + return sql.Identifier("a", "__o") + sql.SQL(" AS {alias}").format( + alias=sql.Identifier(self.alias), + ) + + +class JsonbNode(TypedNode): + def __init__( + self, + source: sql.Identifier, + path: sql.Composable, + prefix: str, + ): + super().__init__(source, None, path, prefix, ("jsonb", "jsonb")) diff --git a/src/ldlite/database/_expansion/metadata.py b/src/ldlite/database/_expansion/metadata.py deleted file mode 100644 index 7fbc2ee..0000000 --- a/src/ldlite/database/_expansion/metadata.py +++ /dev/null @@ -1,141 +0,0 @@ -from __future__ import annotations - -from abc import ABC, abstractmethod -from typing import Literal - -from psycopg import sql - - -class Metadata(ABC): - def __init__(self, prop: str | None): - self.prop = prop - - @property - def snake(self) -> str: - if self.prop is None: - # this doesn't really come up in practice - return "$" - - snake = "".join("_" + c.lower() if c.isupper() else c for c in self.prop) - - # there's also sorts of weird edge cases here that don't come up in practice - if (naked := self.prop.lstrip("_")) and len(naked) > 0 and naked[0].isupper(): - snake = snake.removeprefix("_") - - return snake - - @property - @abstractmethod - def select_stmt(self) -> str: ... - - def select_column( - self, - json_col: sql.Identifier, - alias: str, - ) -> sql.Composed: - return sql.SQL(self.select_stmt + " AS {alias}").format( - json_col=json_col, - prop=self.prop, - alias=sql.Identifier(alias), - ) - - -class ObjectMeta(Metadata): - @property - def select_stmt(self) -> str: - return "{json_col}" if self.prop is None else "{json_col}->{prop}" - - -class ArrayMeta(Metadata): - @property - def select_stmt(self) -> str: - return "{json_col}" if self.prop is None else "{json_col}->{prop}" - - @abstractmethod - def unwrap(self) -> ObjectMeta | TypedMeta: ... - - -class TypedMeta(Metadata): - def __init__( # noqa: PLR0913 - self, - prop: str | None, - json_type: Literal["string", "number", "boolean"], - other_json_type: Literal["string", "number", "boolean"], - is_uuid: bool, - is_datetime: bool, - is_float: bool, - is_bigint: bool, - ): - super().__init__(prop) - - mixed_type = json_type != other_json_type - self.json_type: Literal["string", "number", "boolean"] = ( - json_type if not mixed_type else "string" - ) - self.is_uuid = is_uuid and not mixed_type - self.is_datetime = is_datetime and not mixed_type - self.is_float = is_float and not mixed_type - self.is_bigint = is_bigint and not mixed_type - - @property - def select_stmt(self) -> str: # noqa: PLR0911 - str_extract = ( - "{json_col}->>{prop}" - if self.prop is not None - else """TRIM(BOTH '"' FROM ({json_col})::text)""" - ) - str_extract = f"NULLIF(NULLIF({str_extract}, ''), 'null')" - - if self.json_type == "number" and self.is_float: - return f"{str_extract}::numeric" - if self.json_type == "number" and self.is_bigint: - return f"{str_extract}::bigint" - if self.json_type == "number": - return f"{str_extract}::integer" - if self.json_type == "boolean": - return f"{str_extract}::bool" - if self.json_type == "string" and self.is_uuid: - return f"{str_extract}::uuid" - if self.json_type == "string" and self.is_datetime: - return f"{str_extract}::timestamptz" - - return str_extract - - -class MixedMeta(TypedMeta): - def __init__( - self, - prop: str | None, - ): - super().__init__(prop, "string", "string", False, False, False, False) - - -class ObjectArrayMeta(ObjectMeta, ArrayMeta): - def unwrap(self) -> ObjectMeta: - return ObjectMeta(None) - - -class MixedArrayMeta(MixedMeta, ArrayMeta): - @property - def select_stmt(self) -> str: - return "{json_col}" if self.prop is None else "{json_col}->{prop}" - - def unwrap(self) -> MixedMeta: - return MixedMeta(None) - - -class TypedArrayMeta(TypedMeta, ArrayMeta): - @property - def select_stmt(self) -> str: - return "{json_col}" if self.prop is None else "{json_col}->{prop}" - - def unwrap(self) -> TypedMeta: - return TypedMeta( - None, - self.json_type, - self.json_type, - self.is_uuid, - self.is_datetime, - self.is_float, - self.is_bigint, - ) diff --git a/src/ldlite/database/_expansion/node.py b/src/ldlite/database/_expansion/node.py new file mode 100644 index 0000000..eb7d6be --- /dev/null +++ b/src/ldlite/database/_expansion/node.py @@ -0,0 +1,33 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +import duckdb +import psycopg + +if TYPE_CHECKING: + from typing import TypeAlias + + from psycopg import sql + +Conn: TypeAlias = duckdb.DuckDBPyConnection | psycopg.Connection + + +class Node: + def __init__(self, source: sql.Identifier, prop: str | None): + self.source = source + self.prop = prop + self.snake_prop: str | None = None + + if self.prop is not None: + self.snake_prop = "".join( + "_" + c.lower() if c.isupper() else c for c in self.prop + ) + + # there's also sorts of weird edge cases here that don't come up in practice + if ( + (naked := self.prop.lstrip("_")) + and len(naked) > 0 + and naked[0].isupper() + ): + self.snake_prop = self.snake_prop.removeprefix("_") diff --git a/src/ldlite/database/_expansion/nodes.py b/src/ldlite/database/_expansion/nodes.py deleted file mode 100644 index 32626b5..0000000 --- a/src/ldlite/database/_expansion/nodes.py +++ /dev/null @@ -1,435 +0,0 @@ -from __future__ import annotations - -from collections import deque -from typing import TYPE_CHECKING, TypeVar, cast - -from psycopg import sql - -if TYPE_CHECKING: - from collections.abc import Iterator - - import duckdb - import psycopg - - from .context import ExpandContext - -from .metadata import ( - ArrayMeta, - Metadata, - MixedArrayMeta, - MixedMeta, - ObjectArrayMeta, - ObjectMeta, - TypedArrayMeta, - TypedMeta, -) - -TNode = TypeVar("TNode", bound="ExpansionNode") - - -class ExpansionNode: - def __init__( - self, - name: str, - path: str, - parent: ExpansionNode | None, - values: list[str] | None = None, - ): - self.name = name - self.path = path - self.identifier = sql.Identifier(name) - self.parent = parent - self.values: list[str] = values or [] - self.children: list[ExpansionNode] = [] - - def add(self, meta: Metadata) -> str: - snake = meta.snake - prefixed_name = self.prefix + snake - - if isinstance(meta, ArrayMeta): - self.children.append(ArrayNode(prefixed_name, snake, self, meta)) - elif isinstance(meta, ObjectMeta): - self.children.append(ObjectNode(prefixed_name, snake, self)) - else: - prefixed_name = self.prefix + snake - self.values.append(prefixed_name) - - return prefixed_name - - def _parents(self) -> Iterator[ExpansionNode]: - n = self - while n.parent is not None: - yield n.parent - n = n.parent - - @property - def parents(self) -> list[ExpansionNode]: - return list(self._parents()) - - @property - def prefix(self) -> str: - if len(self.parents) == 0 and len(self.path) == 0: - return "" - - return ( - "__".join( - [*[p.path for p in self.parents if len(p.path) != 0], self.path], - ) - + "__" - ) - - @property - def root(self) -> ExpansionNode: - if self.parent is None: - return self - - root = [p for p in self.parents if p.parent is None] - return root[0] - - def _descendents(self, cls: type[TNode]) -> Iterator[TNode]: - to_visit = deque([self]) - while to_visit: - n = to_visit.pop() - if isinstance(n, cls): - yield n - - to_visit.extend(n.children) - - @property - def descendents(self) -> list[ExpansionNode]: - return list(self._descendents(ExpansionNode)) - - def descendents_oftype(self, cls: type[TNode]) -> list[TNode]: - return list(self._descendents(cls)) - - def __str__(self) -> str: - return "->".join([n.name for n in reversed([self, *self.parents])]) - - -class ObjectNode(ExpansionNode): - def __init__( - self, - name: str, - path: str, - parent: ExpansionNode | None, - values: list[str] | None = None, - ): - super().__init__(name, path, parent, values) - self.unnested = False - - def _object_children(self) -> Iterator[ObjectNode]: - for c in self.children: - if isinstance(c, ObjectNode): - yield c - - @property - def object_children(self) -> list[ObjectNode]: - return list(self._object_children()) - - def unnest( - self, - ctx: ExpandContext, - source_table: sql.Identifier, - dest_table: sql.Identifier, - source_cte: str, - ) -> bool: - self.unnested = True - create_columns: list[sql.Composable] = [ - sql.Identifier(v) for v in self.carryover - ] - - with ctx.conn.cursor() as cur: - cur.execute( - sql.SQL("SELECT 1 FROM {table} LIMIT 1;") - .format(table=source_table) - .as_string(), - ) - if not cur.fetchone(): - return False - - with ctx.conn.cursor() as cur: - cur.execute( - sql.SQL( - """ -SELECT k.ld_key -FROM - {source_table} t - ,jsonb_object_keys(t.{json_col}) WITH ORDINALITY k(ld_key, "ordinality") -WHERE t.{json_col} IS NOT NULL AND jsonb_typeof(t.{json_col}) = 'object' -GROUP BY k.ld_key -ORDER BY MAX(k.ordinality), COUNT(k.ordinality) -""", - ) - .format(source_table=source_table, json_col=self.identifier) - .as_string(), - ) - props = [prop[0] for prop in cur.fetchall()] - - ctx.scan_progress.total += len(props) * 3 - ctx.scan_progress.refresh() - ctx.scan_progress.update(1) - - metadata: list[Metadata] = [] - for prop in props: - with ctx.conn.cursor() as cur: - cur.execute( - sql.SQL( - """ -SELECT - BOOL_AND(json_type = 'array') AS only_array - ,BOOL_OR(json_type = 'array') AS some_array - ,BOOL_AND(json_type = 'object') AS only_object - ,BOOL_OR(json_type = 'object') AS some_object -FROM -( - SELECT jsonb_typeof(t.{json_col}->$1) AS json_type - FROM {table} t -) j -WHERE json_type <> 'null' -""", - ) - .format( - table=source_table, - json_col=self.identifier, - ) - .as_string(), - (prop,), - ) - (only_array, some_array, only_object, some_object) = cast( - "tuple[bool, bool, bool, bool]", - cur.fetchone(), - ) - - if (some_array and not only_array) or (some_object and not only_object): - metadata.append(MixedMeta(prop)) - ctx.scan_progress.update(3) - continue - - if only_object: - metadata.append(ObjectMeta(prop)) - ctx.scan_progress.total += 1 - ctx.scan_progress.update(3) - continue - - if only_array: - ctx.scan_progress.update(1) - cur.execute( - sql.SQL( - """ -SELECT - -- Technically arrays could be nested but I haven't seen any - BOOL_AND(json_type = 'object') AS only_object - ,BOOL_OR(json_type = 'object') AS some_object -FROM -( - SELECT a.json_type - FROM {table} t - CROSS JOIN LATERAL - ( - SELECT jsonb_typeof(ld_value) AS json_type - FROM jsonb_array_elements(t.{json_col}->$1) a(ld_value) - WHERE jsonb_typeof(t.{json_col}->$1) = 'array' - ) a -) j -WHERE json_type <> 'null' -""", - ) - .format( - table=source_table, - json_col=self.identifier, - ) - .as_string(), - (prop,), - ) - (inner_only_object, inner_some_object) = cast( - "tuple[bool, bool]", - cur.fetchone(), - ) - - if inner_some_object and not inner_only_object: - metadata.append(MixedArrayMeta(prop)) - ctx.scan_progress.update(2) - continue - - if inner_only_object: - metadata.append(ObjectArrayMeta(prop)) - ctx.scan_progress.total += 1 - ctx.scan_progress.update(2) - continue - - ctx.scan_progress.update(1) - typed_from_sql = """ -FROM {table} t -CROSS JOIN LATERAL -( - SELECT * - FROM jsonb_array_elements(t.{json_col}->$1) a(ld_value) - WHERE jsonb_typeof(t.{json_col}->$1) = 'array' - LIMIT 3 -) j""" - else: - ctx.scan_progress.update(2) - typed_from_sql = """ -FROM (SELECT t.{json_col}->$1 AS ld_value FROM {table} t) j -""" - with ctx.conn.cursor() as cur: - cur.execute( - sql.SQL( - """ -SELECT - MIN(json_type) AS json_type - ,MAX(json_type) AS other_json_type - ,BOOL_AND(CASE WHEN json_type = 'string' THEN (ld_value)::text LIKE '"________-____-____-____-____________"' ELSE FALSE END) AS is_uuid - ,BOOL_AND(CASE WHEN json_type = 'string' THEN (ld_value)::text LIKE '"____-__-__T__:__:__.___%"' ELSE FALSE END) AS is_datetime - ,BOOL_OR(CASE WHEN json_type = 'number' THEN (ld_value)::numeric % 1 <> 0 ELSE FALSE END) AS is_float - ,BOOL_OR(CASE WHEN json_type = 'number' THEN (ld_value)::numeric > 2147483647 ELSE FALSE END) AS is_bigint -FROM -( - SELECT - ld_value - ,jsonb_typeof(ld_value) json_type """ # noqa: E501 - + typed_from_sql - + """ - WHERE ld_value IS NOT NULL -) i -WHERE - ld_value IS NOT NULL AND - json_type <> 'null' AND - ( - json_type <> 'string' OR - (json_type = 'string' AND ld_value::text NOT IN ('"null"', '""')) - ) -""", - ) - .format( - table=source_table, - json_col=self.identifier, - ) - .as_string(), - (prop,), - ) - if (row := cur.fetchone()) is not None and all( - c is not None for c in row - ): - metadata.append( - TypedArrayMeta(prop, *row) - if only_array - else TypedMeta(prop, *row), - ) - ctx.scan_progress.update(1) - - create_columns.extend( - [meta.select_column(self.identifier, self.add(meta)) for meta in metadata], - ) - - with ctx.conn.cursor() as cur: - cur.execute( - sql.SQL( - """ -CREATE TEMP TABLE {dest_table} AS -""" - + source_cte - + """ -SELECT - {cols} -FROM ld_source -""", - ) - .format( - source_table=source_table, - dest_table=dest_table, - json_col=self.identifier, - cols=sql.SQL("\n ,").join(create_columns), - ) - .as_string(), - ) - - return True - - def _carryover(self) -> Iterator[str]: - for n in self.root.descendents: - if isinstance(n, ObjectNode) and not n.unnested and n.name != "jsonb": - yield n.name - if isinstance(n, ArrayNode): - yield n.name - yield from n.values - - @property - def carryover(self) -> list[str]: - return list(self._carryover()) - - -class ArrayNode(ExpansionNode): - def __init__( - self, - name: str, - path: str, - parent: ExpansionNode | None, - meta: ArrayMeta, - values: list[str] | None = None, - ): - super().__init__(name, path, parent, values) - self.meta = meta.unwrap() - - @property - def is_object(self) -> bool: - return isinstance(self.meta, ObjectMeta) - - def explode( - self, - conn: duckdb.DuckDBPyConnection | psycopg.Connection, - source_table: sql.Identifier, - dest_table: sql.Identifier, - source_cte: str, - ) -> list[str]: - with conn.cursor() as cur: - o_col = self.name + "__o" - create_columns: list[sql.Composable] = [ - sql.SQL( - "(ROW_NUMBER() OVER (ORDER BY (SELECT NULL)))::integer AS __id", - ), - *[sql.Identifier(v) for v in self.carryover], - sql.SQL( - """a."ordinality"::smallint AS {id_alias}""", - ).format( - id_alias=sql.Identifier(o_col), - ), - self.meta.select_column( - sql.Identifier("a", "ld_value"), - self.name, - ), - ] - - cur.execute( - sql.SQL( - """ -CREATE TEMP TABLE {dest_table} AS -""" - + source_cte - + """ -SELECT - {cols} -FROM - ld_source s - ,jsonb_array_elements(s.{json_col}) WITH ORDINALITY a(ld_value, "ordinality") -WHERE jsonb_typeof(s.{json_col}) = 'array' -""", - ) - .format( - source_table=source_table, - dest_table=dest_table, - cols=sql.SQL("\n ,").join(create_columns), - json_col=sql.Identifier(self.name), - ) - .as_string(), - ) - - return ["__id", *self.carryover, o_col] - - def _carryover(self) -> Iterator[str]: - for n in reversed(self.parents): - yield from [v for v in n.values if v not in ("__id", "jsonb")] - - @property - def carryover(self) -> list[str]: - return list(self._carryover()) diff --git a/src/ldlite/database/_expansion/recursive_nodes.py b/src/ldlite/database/_expansion/recursive_nodes.py new file mode 100644 index 0000000..c5faf26 --- /dev/null +++ b/src/ldlite/database/_expansion/recursive_nodes.py @@ -0,0 +1,387 @@ +from __future__ import annotations + +from abc import ABC, abstractmethod +from collections import deque +from typing import TYPE_CHECKING, TypeVar, cast +from uuid import uuid4 + +from psycopg import sql + +if TYPE_CHECKING: + from collections.abc import Callable, Iterator + + +from .fixed_nodes import FixedValueNode, JsonbNode, JsonType, OrdinalNode, TypedNode +from .node import Conn, Node + +TNode = TypeVar("TNode", bound="Node") +TRode = TypeVar("TRode", bound="RecursiveNode") + + +class RecursiveNode(Node): + def __init__( + self, + source: sql.Identifier, + prop: str | None, + column: sql.Identifier, + parent: RecursiveNode | None, + ): + super().__init__(source, prop) + + self.parent = parent + self.column = column + self._children: list[Node] = [] + + def _parents(self) -> Iterator[RecursiveNode]: + p = self.parent + while p is not None: + yield p + p = p.parent + + @property + def parents(self) -> list[RecursiveNode]: + return list(self._parents()) + + @property + def table_parent(self) -> RecursiveNode: + for p in self.parents: + if isinstance(p, (ArrayNode, RootNode)): + return p + + # There's "always" a root node + return None # type: ignore[return-value] + + @property + def depth(self) -> int: + depth = 0 + prev = None + for p in self.parents: + # arrays of objects only count for a single level of depth + if not (isinstance(p, ObjectNode) and isinstance(prev, ArrayNode)): + depth += 1 + prev = p + + return depth + + def replace(self, original: Node, replacement: Node | None) -> None: + if replacement is None: + self._children.remove(original) + return + + self._children = [(replacement if n == original else n) for n in self._children] + + def unparent(self) -> None: + cast("RecursiveNode", self.parent).replace(self, None) + + def make_jsonb(self) -> None: + cast("RecursiveNode", self.parent).replace( + self, + JsonbNode(self.source, self.path, self.prefix), + ) + + @property + def path(self) -> sql.Composable: + prop_accessor: sql.Composable + if self.prop is None: + prop_accessor = sql.SQL("") + else: + prop_accessor = sql.SQL("->") + sql.Literal(self.prop) + + path: list[str] = [] + for p in self.parents: + if isinstance(p, (ArrayNode, RootNode)): + break + if p.prop is not None: + path.append(p.prop) + + if len(path) == 0: + return self.column + prop_accessor + + return ( + self.column + + sql.SQL("->") + + sql.SQL("->").join([sql.Literal(p) for p in reversed(path)]) + + prop_accessor + ) + + @property + def prefix(self) -> str: + if len(self.parents) == 0 or isinstance(self.parents[0], RootNode): + return self.snake_prop or "" + + return "__".join( + [p.snake_prop for p in reversed(self.parents) if p.snake_prop is not None], + ) + (("__" + self.snake_prop) if self.snake_prop is not None else "") + + def _direct(self, cls: type[TNode]) -> Iterator[TNode]: + yield from [n for n in self._children if isinstance(n, cls)] + + def direct(self, cls: type[TNode]) -> list[TNode]: + return list(self._direct(cls)) + + def _descendents( + self, + cls: type[TRode], + to_cls: type[TRode] | None = None, + ) -> Iterator[TRode]: + to_visit = deque(self.direct(RecursiveNode)) + while to_visit: + n = to_visit.pop() + if isinstance(n, cls): + yield n + + if to_cls is not None and isinstance(n, to_cls): + continue + + to_visit.extend(n.direct(RecursiveNode)) + + def descendents( + self, + cls: type[TRode], + to_cls: type[TRode] | None = None, + ) -> list[TRode]: + return list(self._descendents(cls, to_cls)) + + def _typed_nodes(self) -> Iterator[TypedNode]: + yield from self.direct(TypedNode) + for n in self._descendents(RecursiveNode): + yield from n.direct(TypedNode) + + def typed_nodes(self) -> list[TypedNode]: + return list(self._typed_nodes()) + + +class ObjectNode(RecursiveNode): + def load_columns(self, conn: Conn) -> None: + with conn.cursor() as cur: + key_discovery = ( + sql.SQL(""" +SELECT + json_key + ,MIN(json_type) AS json_type + ,MAX(json_type) AS other_json_type +FROM +( + SELECT + k."key" AS json_key + ,jsonb_typeof(k."value") AS json_type + ,k.ord + FROM + ( + SELECT """) + + self.path + + sql.SQL(""" AS ld_value FROM {source_table} + ) j + CROSS JOIN LATERAL jsonb_each(j.ld_value) WITH ORDINALITY k("key", "value", ord) + WHERE jsonb_typeof(j.ld_value) = 'object' +) key_discovery +WHERE json_type <> 'null' +GROUP BY json_key +ORDER BY MAX(ord), COUNT(*); +""").format(source_table=self.source) + ) + + cur.execute(key_discovery.as_string()) + for row in cur.fetchall(): + (key, jt, ojt) = cast("tuple[str, JsonType, JsonType]", row) + if jt == "array" and ojt == "array": + anode = ArrayNode(self.source, key, self.column, self) + self._children.append(anode) + elif jt == "object" and ojt == "object": + onode = ObjectNode(self.source, key, self.column, self) + self._children.append(onode) + else: + tnode = TypedNode( + self.source, + key, + self.path, + self.prefix, + (jt, ojt), + ) + self._children.append(tnode) + + +class StampableTable(ABC): + @property + @abstractmethod + def create_statement(self) -> tuple[str, sql.Composed]: ... + + +class RootNode(ObjectNode, StampableTable): + def __init__( + self, + source: sql.Identifier, + get_output_table: Callable[[str | None], tuple[str, sql.Identifier]], + ): + super().__init__( + source, + None, + sql.Identifier("jsonb"), + None, + ) + self.get_output_table = get_output_table + + @property + def create_statement(self) -> tuple[str, sql.Composed]: + (output_table_name, output_table) = self.get_output_table(None) + + return ( + output_table_name, + sql.SQL(""" +CREATE TABLE {output_table} AS +SELECT + """).format(output_table=output_table) + + sql.SQL("\n ,").join( + [ + sql.Identifier("__id"), + *[t.stmt for t in self.direct(TypedNode)], + *[ + t.stmt + for o in self.descendents(ObjectNode, ArrayNode) + for t in o.direct(TypedNode) + ], + ], + ) + + sql.SQL(""" +FROM {source_table}; +ANALYZE {output_table} (__id);""").format( + source_table=self.source, + output_table=output_table, + ), + ) + + +class ArrayNode(RecursiveNode, StampableTable): + def __init__( + self, + source: sql.Identifier, + prop: str | None, + column: sql.Identifier, + parent: RecursiveNode | None, + ): + super().__init__(source, prop, column, parent) + self.temp = sql.Identifier(str(uuid4()).split("-")[0]) + + def make_temp(self, conn: Conn) -> Node | None: + with conn.cursor() as cur: + expansion = ( + sql.SQL(""" +CREATE TEMPORARY TABLE {temp} AS +SELECT + __id AS p__id + ,(ROW_NUMBER() OVER (ORDER BY (SELECT NULL)))::integer AS __id + ,ord::smallint AS __o + ,array_jsonb + ,json_type +FROM ( + SELECT + j.__id + ,a."value" AS array_jsonb + ,jsonb_typeof(a."value") AS json_type + ,a.ord + FROM + ( + SELECT """).format(temp=self.temp) + + self.path + + sql.SQL(""" AS ld_value, __id FROM {source} + ) j + CROSS JOIN LATERAL jsonb_array_elements(j.ld_value) WITH ORDINALITY a("value", ord) + WHERE jsonb_typeof(j.ld_value) = 'array' +) expansion +WHERE json_type <> 'null'; +ANALYZE {temp} (p__id, array_jsonb, json_type); +""").format(source=self.source, temp=self.temp) + ) + cur.execute(expansion.as_string()) + + type_discovery = sql.SQL(""" +SELECT + MIN(json_type) AS json_type + ,MAX(json_type) AS other_json_type +FROM {temp}""").format(temp=self.temp) + + cur.execute(type_discovery.as_string()) + self._children.append(OrdinalNode(self.temp, self.path, self.prefix)) + if row := cur.fetchone(): + (jt, ojt) = cast("tuple[JsonType, JsonType]", row) + node: Node + if jt == "array" and ojt == "array": + node = ArrayNode( + self.temp, + None, + sql.Identifier("array_jsonb"), + self, + ) + elif jt == "object" and ojt == "object": + node = ObjectNode( + self.temp, + None, + sql.Identifier("array_jsonb"), + self, + ) + else: + node = TypedNode( + self.temp, + None, + sql.Identifier("array_jsonb"), + self.prefix, + (jt, ojt), + ) + self._children.append(node) + return node + + return None + + @property + def create_statement(self) -> tuple[str, sql.Composed]: + table_parent: RecursiveNode = self + parents: list[RecursiveNode] = [] + for p in self.parents: + if table_parent == self and isinstance(p, (RootNode, ArrayNode)): + table_parent = p + parents.append(p) + + root = cast("RootNode", parents[-1]) + (output_table_name, output_table) = root.get_output_table(self.prefix) + if not isinstance(table_parent, ArrayNode): + (_, parent_table) = root.get_output_table(None) + else: + (_, parent_table) = root.get_output_table(table_parent.prefix) + + return ( + output_table_name, + ( + sql.SQL( + """ +CREATE TABLE {output_table} AS +SELECT + """, + ).format(output_table=output_table) + + sql.SQL("\n ,").join( + [ + sql.Identifier("a", "__id"), + *[ + sql.Identifier("p", t.alias) + for p in reversed(parents) + for t in p.direct(FixedValueNode) + ], + *[t.stmt for t in self.direct(FixedValueNode)], + *[ + t.stmt + for o in self.descendents(ObjectNode, ArrayNode) + for t in o.direct(TypedNode) + ], + ], + ) + + sql.SQL(""" +FROM {source_table} a +JOIN {parent_table} p ON + a.p__id = p.__id; +ANALYZE {output_table} (__id); +""").format( + source_table=self.temp, + parent_table=parent_table, + output_table=output_table, + ) + ), + ) diff --git a/src/ldlite/database/_postgres.py b/src/ldlite/database/_postgres.py index 88cbc42..93ca50c 100644 --- a/src/ldlite/database/_postgres.py +++ b/src/ldlite/database/_postgres.py @@ -80,30 +80,13 @@ def ingest_records( rb.extend(r) copy.write_row((next(pkey).to_bytes(4, "big"), rb)) + with conn.cursor() as cur: + cur.execute( + sql.SQL("ANALYZE {table} (jsonb);").format(table=pfx.raw_table.id), + ) + total = next(pkey) - 1 self._download_complete(conn, pfx, total, download_started) conn.commit() return next(pkey) - 1 - - def preprocess_source_table( - self, - conn: psycopg.Connection, - table_name: sql.Identifier, - column_names: list[sql.Identifier], - ) -> None: - if len(column_names) == 0: - return - - with conn.cursor() as cur: - cur.execute( - sql.SQL("ANALYZE {table_name} ({column_name})").format( - table_name=table_name, - column_name=sql.SQL(",").join(column_names), - ), - ) - - def source_table_cte_stmt(self, keep_source: bool) -> str: - if keep_source: - return "WITH ld_source AS (SELECT * FROM {source_table})" - return "WITH ld_source AS (DELETE FROM {source_table} RETURNING *)" diff --git a/src/ldlite/database/_prefix.py b/src/ldlite/database/_prefix.py index a93dd81..456083e 100644 --- a/src/ldlite/database/_prefix.py +++ b/src/ldlite/database/_prefix.py @@ -34,9 +34,9 @@ def raw_table(self) -> PrefixedTable: def _output_table(self) -> str: return self._prefix + "__t" - def output_table(self, prefix: str) -> PrefixedTable: + def output_table(self, prefix: str | None) -> PrefixedTable: return self._prefixed_table( - self._output_table + ("" if len(prefix) == 0 else "__" + prefix), + self._output_table + ("" if prefix is None else "__" + prefix), ) @property diff --git a/src/ldlite/database/_typed_database.py b/src/ldlite/database/_typed_database.py index 6560c1b..41b10e6 100644 --- a/src/ldlite/database/_typed_database.py +++ b/src/ldlite/database/_typed_database.py @@ -1,4 +1,3 @@ -# pyright: reportArgumentType=false from __future__ import annotations from abc import abstractmethod @@ -12,8 +11,7 @@ from tqdm import tqdm from . import Database -from ._expansion import expand_nonmarc -from ._expansion.context import ExpandContext +from ._expansion import non_srs_statements from ._prefix import Prefix if TYPE_CHECKING: @@ -182,19 +180,6 @@ def _prepare_raw_table( ).as_string(), ) - def preprocess_source_table( - self, - conn: DB, - table_name: sql.Identifier, - column_names: list[sql.Identifier], - ) -> None: ... - - # TODO: Refactor this to use DELETE RETURNING when DuckDb resolves - # https://github.com/duckdb/duckdb/issues/3417 - # Only postgres supports it which is why we have an abstraction here - @abstractmethod - def source_table_cte_stmt(self, keep_source: bool) -> str: ... - def expand_prefix( self, prefix: str, @@ -216,83 +201,67 @@ def expand_prefix( return [] with closing(self._conn_factory(False)) as conn: - with conn.cursor() as cur: - cur.execute( - sql.SQL( - """ -CREATE TEMP TABLE {dest_table} AS -""" - + self.source_table_cte_stmt(keep_source=keep_raw) - + """ -SELECT * from ld_source; -""", - ) - .format( - dest_table=pfx.origin_table, - source_table=pfx.raw_table.id, - ) - .as_string(), - ) + tables_to_create = non_srs_statements( + conn, + pfx.raw_table[1], + pfx.output_table, + json_depth, + scan_progress + if scan_progress is not None + else tqdm(disable=True, total=0), + ) - tables_to_create = expand_nonmarc( - "jsonb", - ["__id"], - ExpandContext( - conn, - pfx.origin_table, - json_depth, - pfx.transform_table, - pfx.output_table, - self.preprocess_source_table, # type: ignore [arg-type] - self.source_table_cte_stmt, - scan_progress if scan_progress is not None else tqdm(disable=True), - transform_progress - if transform_progress is not None - else tqdm(disable=True), - ), + transform_progress = ( + transform_progress + if transform_progress is not None + else tqdm(disable=True, total=0) + ) + transform_progress.total = ( + ( + transform_progress.total + if transform_progress.total is not None + else 0 + ) + + len(tables_to_create) + + 1 ) + transform_progress.update(1) with self._begin(conn): self._drop_extracted_tables(conn, pfx) + with conn.cursor() as cur: + for _, table in tables_to_create: + cur.execute(table.as_string()) + transform_progress.update(1) + if not keep_raw: self._drop_raw_table(conn, pfx) - with conn.cursor() as cur: - for table in tables_to_create: - cur.execute(table[1].as_string()) + total = 0 with conn.cursor() as cur: - cur.execute( - sql.SQL( - """ -CREATE TABLE {catalog_table} ( - table_name text -) -""", - ) - .format(catalog_table=pfx.catalog_table.id) - .as_string(), - ) - total = 0 + create_catalog = sql.SQL( + """CREATE TABLE {catalog_table} (table_name text)""", + ).format(catalog_table=pfx.catalog_table.id) + cur.execute(create_catalog.as_string()) if len(tables_to_create) > 0: + insert_catalog = sql.SQL( + "INSERT INTO {catalog_table} VALUES ($1)", + ).format(catalog_table=pfx.catalog_table.id) cur.executemany( - sql.SQL("INSERT INTO {catalog_table} VALUES ($1)") - .format( - catalog_table=pfx.catalog_table.id, - ) - .as_string(), + insert_catalog.as_string(), [(pfx.catalog_table_row(t[0]),) for t in tables_to_create], ) - cur.execute( - sql.SQL("SELECT COUNT(*) FROM {table}") - .format(table=pfx.output_table("").id) - .as_string(), + count = sql.SQL("SELECT COUNT(*) FROM {table}").format( + table=pfx.output_table(None).id, ) + cur.execute(count.as_string()) total = cast("tuple[int]", cur.fetchone())[0] + transform_progress.update(1) self._transform_complete(conn, pfx, total, transform_started) - return [t[0] for t in tables_to_create] + return [pfx.catalog_table_row(t[0]) for t in tables_to_create] def index_prefix(self, prefix: str, progress: tqdm[NoReturn] | None = None) -> None: pfx = Prefix(prefix) @@ -422,6 +391,7 @@ def _transform_complete( "final_rowcount" = $2 ,"transform_complete" = $3 ,"transform_time" = $4 + ,"index_time" = NULL ,"data_refresh_start" = "load_start" ,"data_refresh_end" = "download_complete" WHERE "table_prefix" = $1 diff --git a/tests/test_expansion.py b/tests/test_expansion.py index 094dfa4..9efed25 100644 --- a/tests/test_expansion.py +++ b/tests/test_expansion.py @@ -52,6 +52,7 @@ def case_typed_columns() -> ExpansionTC: "timestamptz": "2028-01-23T00:00:00.000+00:00", "integer": 1, "numeric": 1.2, + "bigint": 1774374169585, "text": "value", "boolean": false, "uuid": "88888888-8888-1888-8888-888888888888" @@ -60,9 +61,10 @@ def case_typed_columns() -> ExpansionTC: b""" { "id": "id2", - "timestamptz": "2025-06-20T17:37:58.675+00:00", + "timestamptz": "2025-06-20T17:37:58.675", "integer": 2, - "numeric": 2.3, + "numeric": 2, + "bigint": 2, "text": "00000000-0000-1000-A000-000000000000", "boolean": false, "uuid": "11111111-1111-1111-8111-111111111111" @@ -82,6 +84,7 @@ def case_typed_columns() -> ExpansionTC: for a in [ ("integer", "integer", "INTEGER"), ("numeric", "numeric", "DECIMAL(18,3)"), + ("bigint", "bigint", "BIGINT"), ("text", "text", "VARCHAR"), ("uuid", "uuid", "UUID"), ("boolean", "boolean", "BOOLEAN"), diff --git a/tests/test_json_operators.py b/tests/test_json_operators.py index ac43355..75a2672 100644 --- a/tests/test_json_operators.py +++ b/tests/test_json_operators.py @@ -27,7 +27,7 @@ class JsonTC: assertion_params: tuple[Any, ...] -def case_jsonb_object_keys() -> JsonTC: +def case_jsonb_each() -> JsonTC: return JsonTC( """ {assertion} @@ -35,7 +35,7 @@ def case_jsonb_object_keys() -> JsonTC: FROM (SELECT 'k1' jkey UNION SELECT 'k2' jkey) as e FULL OUTER JOIN ( SELECT k.ld_key as jkey - FROM j, jsonb_object_keys(j.jc->'obj') k(ld_key) + FROM j, jsonb_each(j.jc->'obj') k(ld_key) ) as a USING (jkey) WHERE e.jkey IS NULL or a.jkey IS NULL) as q;""",