Skip to content

Comments

#692 Refactor the usage of bookeeping database with Slick framework so it can be DB engine agnostic#709

Merged
yruslan merged 9 commits intomainfrom
feature/692-use-slick-profile-per-db-engine
Feb 20, 2026
Merged

#692 Refactor the usage of bookeeping database with Slick framework so it can be DB engine agnostic#709
yruslan merged 9 commits intomainfrom
feature/692-use-slick-profile-per-db-engine

Conversation

@yruslan
Copy link
Collaborator

@yruslan yruslan commented Feb 17, 2026

Closes #692

Summary by CodeRabbit

  • Refactor
    • Broadened database profile and table abstractions for wider JDBC compatibility; internal instantiation updated with no user-facing behavior changes.
  • Bug Fixes
    • Fixed HTML-escaping in pipeline notification emails to prevent rendering issues.
    • Improved shutdown sequence to ensure bookkeeping resources are closed reliably.
  • Tests
    • Added SQLite-backed token lock integration tests to increase coverage.

@coderabbitai
Copy link

coderabbitai bot commented Feb 17, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review

Walkthrough

Refactors JDBC bookkeeping to be profile-parameterized: replaces Postgres-specific table singletons with JdbcProfile-bound table traits and per-profile SlickUtils; updates constructors to accept PramenDb/slickProfile, migrates table access to wrapper instances, and updates tests and factories accordingly.

Changes

Cohort / File(s) Summary
Table trait abstractions
pramen/core/src/main/scala/.../bookkeeper/model/BookkeepingTable.scala, .../bookkeeper/model/MetadataTable.scala, .../bookkeeper/model/OffsetTable.scala, .../bookkeeper/model/SchemaTable.scala, .../journal/model/JournalTable.scala, .../lock/model/LockTicketTable.scala
Introduce profile-parameterized table traits (inner Table classes + lazy val records) and replace fixed Postgres imports with profile.api._.
Removed legacy table mappings
pramen/core/src/main/scala/.../bookkeeper/model/BookkeepingRecords.scala, .../MetadataRecords.scala, .../OffsetRecords.scala, .../journal/model/JournalTasks.scala
Remove standalone Slick table mapping objects and TableQuery singletons; functionality moved to trait-based wrappers.
JDBC components & factories
pramen/core/src/main/scala/.../bookkeeper/Bookkeeper.scala, .../bookkeeper/BookkeeperJdbc.scala, .../bookkeeper/OffsetManagerJdbc.scala, .../journal/JournalJdbc.scala, .../metadata/MetadataManagerJdbc.scala, .../lock/TokenLockFactoryJdbc.scala, .../lock/TokenLockJdbc.scala
Constructors updated to accept slickProfile: JdbcProfile or PramenDb; components bind per-profile SlickUtils and use wrapper .records/table instances; BookkeeperJdbc adds fromPramenDb/fromJdbcConfig factories and conditional DB close behavior.
Pramen DB & wrappers
pramen/core/src/main/scala/.../rdb/PramenDb.scala
PramenDb now holds profile-bound wrapper instances (bookkeepingTable, offsetTable, journalTable, lockTicketTable, metadataTable, schemaTable), uses slickProfile for migrations and exposes wrappers for consumers.
Slick utilities
pramen/core/src/main/scala/.../utils/SlickUtils.scala
Convert SlickUtils from singleton to instance class SlickUtils(profile: JdbcProfile); move timing constant to companion object; make DB helpers profile-scoped instance methods.
Tests and call sites
pramen/core/src/test/.../IncrementalPipelineLongFixture.scala, .../metadata/MetadataManagerJdbcSuite.scala, .../tests/bookkeeper/BookkeeperJdbcSuite.scala, .../tests/bookkeeper/OffsetManagerJdbcSuite.scala, .../tests/journal/JournalJdbcSuite.scala, .../tests/lock/TokenLockJdbcSuite.scala, .../tests/lock/TokenLockSQLiteSuite.scala
Update constructor usages to pass pramenDb.slickProfile and table wrappers (or use BookkeeperJdbc.fromPramenDb); add SQLite token-lock integration test and adjust helpers.
Small behavioral / misc changes
pramen/core/src/main/scala/.../state/PipelineStateImpl.scala, pramen/core/src/main/scala/.../app/AppContextImpl.scala, pramen/core/src/main/scala/.../notify/pipeline/PipelineNotificationBuilderHtml.scala
Replace IllegalStateException with OsSignalException for unexpected exit paths; ensure bookkeeper closed during shutdown; HTML-escape fields in notification HTML.

Sequence Diagram(s)

sequenceDiagram
  participant App as PramenApp
  participant PramenDb as PramenDb
  participant Component as Bookkeeper/Offset/TokenLock/Journal
  participant DB as JDBC_DB

  App->>PramenDb: create(jdbcConfig, slickDb, slickProfile)
  PramenDb->>Component: construct via fromPramenDb / new Component(pramenDb,...)
  Component->>Component: instantiate SlickUtils(slickProfile) & bind TableWrapper(slickProfile)
  Component->>DB: slickUtils.ensureDbConnected(slickDb)
  Component->>DB: run query / action via profile.api._
  DB-->>Component: results / status
  Component-->>App: return state / data
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

Poem

🐰 I nibble code and hop about,

Traits and profiles leap about,
Tables wrapped, Slick set free,
Cross-vendor DBs — whee for me!

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately summarizes the main objective: refactoring bookkeeping database usage to be DB engine agnostic by using Slick profiles dynamically.
Linked Issues check ✅ Passed All coding requirements from issue #692 are met: Slick profile is now dynamically selected based on JDBC driver, PostgreSQL-only assumptions removed, and bookkeeping, locking, journal, and offset management components updated to be DB-engine agnostic.
Out of Scope Changes check ✅ Passed All changes are directly aligned with the refactoring objective. Minor changes to PipelineStateImpl, AppContextImpl, and PipelineNotificationBuilderHtml are supportive to the main refactoring effort.
Docstring Coverage ✅ Passed No functions found in the changed files to evaluate docstring coverage. Skipping docstring coverage check.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch feature/692-use-slick-profile-per-db-engine

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockJdbc.scala (1)

76-81: ⚠️ Potential issue | 🟡 Minor

PostgreSQL-specific exception handler inconsistent with multi-database support.

Line 78 catches org.postgresql.util.PSQLException, introducing a PostgreSQL-specific dependency in code designed to support multiple JDBC databases (PostgreSQL, MySQL, SQLite, JTDS). While it won't cause runtime failures on other databases—the pattern simply won't match and execution falls through to the generic fallback on line 80—it creates an architectural inconsistency and requires the PostgreSQL driver on the classpath even when using only other databases.

The fallback on line 80 (ex.getMessage.contains("constraint")) provides coverage for other databases, but relies on message string matching rather than a proper exception type. Consider consolidating to a database-agnostic exception handling strategy, or document why the PostgreSQL dependency is retained.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockJdbc.scala`
around lines 76 - 81, The code currently matches
org.postgresql.util.PSQLException (introducing a Postgres-only dependency);
instead remove that specific case and handle database-agnostic constraint
violations by matching java.sql.SQLException and checking SQLState (SQLState
starting with "23" indicates integrity constraint violation) or falling back to
the existing message-contains("constraint") check; update the match in
TokenLockJdbc (replace the case Failure(_: org.postgresql.util.PSQLException)
with case Failure(ex: SQLException) if ex.getSQLState != null &&
ex.getSQLState.startsWith("23") => tryAcquireExistingTicket()) so no PostgreSQL
driver class is referenced while preserving behavior.
pramen/core/src/main/scala/za/co/absa/pramen/core/rdb/PramenDb.scala (1)

138-149: ⚠️ Potential issue | 🟠 Major

addColumn uses hardcoded double-quoted identifiers incompatible with MySQL and SQL Server.

The addColumn method manually constructs SQL with double-quoted identifiers ("table", "column"), but the recent refactor (#682) added support for MySQL and SQL Server via getProfile(). These databases require different quoting:

  • MySQL: backticks (`column`) by default; double quotes work only with ANSI_QUOTES mode enabled
  • SQL Server: square brackets ([column]) by default; double quotes work only with SET QUOTED_IDENTIFIER ON

Since initTable() correctly uses Slick's profile-aware schema generation (schema.createIfNotExists), addColumn() should similarly leverage the profile's quoting mechanism instead of manual sqlu interpolation.

This will prevent database initialization on MySQL and SQL Server during schema migrations (lines 96–122).

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pramen/core/src/main/scala/za/co/absa/pramen/core/rdb/PramenDb.scala` around
lines 138 - 149, The addColumn method builds SQL with hardcoded double quotes
which breaks MySQL/SQL Server; update addColumn to use the configured Slick
profile's identifier quoting instead of literal " characters: obtain the active
profile via getProfile(), use its quoting helper (or implement a small helper
that matches getProfile() to return the correct quote chars — backtick for
MySQL, square brackets for SQL Server, double quotes for Postgres/others) to
wrap table and column names, then build and run the ALTER TABLE statement with
db.run(sqlu"...") using those properly quoted identifiers (keep references to
addColumn, getProfile(), db.run and sqlu). Ensure the thrown RuntimeException
remains but the SQL uses profile-aware quoting.
🧹 Nitpick comments (8)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerJdbc.scala (1)

104-125: commitRerun performs update-then-delete without a transaction boundary.

The update (lines 112-117) and the subsequent delete of previous batches (lines 120-124) are executed as separate db.run calls. If the delete fails after the update succeeds, stale offset records for previous batches remain. This is pre-existing behavior and not introduced by this PR, but given that the method is meant to atomically rewrite offsets for a rerun, wrapping both operations in a single .transactionally block would be more robust — especially now that multiple DB engines are in scope.

♻️ Suggested transactional wrapping
     slickUtils.ensureDbConnected(db)
     db.run(
-      offsetTable.records
+      (offsetTable.records
         .filter(r => r.pramenTableName === request.tableName && r.infoDate === request.infoDate.toString && r.createdAt === request.createdAt.toEpochMilli)
         .map(r => (r.minOffset, r.maxOffset, r.committedAt))
-        .update((minOffset.valueString, maxOffset.valueString, Some(committedAt)))
-    ).execute()
-
-    // Cleaning up previous batches
-    db.run(
-      offsetTable.records
+        .update((minOffset.valueString, maxOffset.valueString, Some(committedAt)))
+      ).andThen(
+      offsetTable.records
         .filter(r => r.pramenTableName === request.tableName && r.infoDate === request.infoDate.toString && r.createdAt =!= request.createdAt.toEpochMilli)
         .delete
+      ).transactionally
     ).execute()
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerJdbc.scala`
around lines 104 - 125, commitRerun currently issues two separate db.run calls
(an update then a delete) which can leave stale records if the delete fails;
change it to run both operations in a single transactional DBIO by composing the
two queries (the update on offsetTable.records filtered by
request.tableName/infoDate/createdAt and the delete on offsetTable.records
filtered by request.tableName/infoDate with createdAt =!=
request.createdAt.toEpochMilli) into a DBIO.sequence or DBIO.seq and call
.transactionally before executing, remove the separate .execute() calls and run
the single transactional DBIO so both update and cleanup succeed or roll back
together.
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/OffsetTable.scala (1)

29-30: Same cross-DB portability note for minOffset and maxOffset as with MetadataTable.value.

The minOffset and maxOffset string columns have no O.Length constraint. On some DB engines (e.g., H2, DB2), this may default to a short VARCHAR that could truncate large offset strings. Consider adding an explicit length or O.SqlType("TEXT") for consistent cross-engine behavior, especially given this PR's goal of DB-engine agnosticism.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/OffsetTable.scala`
around lines 29 - 30, The minOffset and maxOffset column definitions currently
lack length/type constraints which can lead to truncation on some DB engines;
update the declarations for def minOffset and def maxOffset to include an
explicit size or text type (e.g., add O.Length(x) or O.SqlType("TEXT") in the
column(...) call) so the columns are created with a sufficient/portable type
across databases.
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/MetadataTable.scala (1)

29-29: Optional: Consider adding O.Length or O.SqlType to the value column for clarity and consistency.

The value column at line 29 lacks an explicit length constraint. While this is consistent with other variable-content columns in the codebase (e.g., schemaJson in SchemaTable, minOffset/maxOffset in OffsetTable), adding an explicit constraint would clarify intent and improve cross-DB portability documentation. Without a constraint, Slick defers to the database's default mapping, which varies by engine.

Consider adding O.SqlType("TEXT") or O.Length if metadata values have known size bounds, or document the decision to allow database-specific defaults if unbounded values are intentional.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/MetadataTable.scala`
at line 29, The MetadataTable's value column is declared as def value =
column[String]("value") without any explicit size or SQL type, which can lead to
inconsistent DB defaults; update the declaration for value in MetadataTable to
include an explicit column option such as O.SqlType("TEXT") or an appropriate
O.Length(...) (or document why leaving it unbounded is intentional) so database
mapping is explicit and portable across engines.
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperJdbc.scala (1)

36-55: Clean refactoring to parameterized profile.

The constructor change and the pattern of creating per-profile table wrappers is consistent and correct. The import slickProfile.api._ brings in the right path-dependent Slick DSL for the given profile.

One observation: the same table wrapper instantiation pattern (anonymous new BookkeepingTable { override val profile = slickProfile }) is also done in PramenDb.scala. When BookkeeperJdbc is created via the fromJdbcConfig companion method (line 361), it creates its own wrappers, which is fine. But when constructed from Bookkeeper.fromConfig (which passes dbOpt.get.slickDb), the caller already has a PramenDb instance with the same wrappers. This duplication is acceptable for isolation, but if it becomes a maintenance concern, you could consider accepting the table wrappers directly or reusing the PramenDb instance.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperJdbc.scala`
around lines 36 - 55, The ctor of BookkeeperJdbc currently constructs new
per-profile table wrappers (BookkeepingTable, SchemaTable, OffsetTable,
MetadataTable) which duplicates wrappers already present in PramenDb; refactor
to allow reusing them by adding an overloaded BookkeeperJdbc constructor or
additional optional parameters that accept the table wrapper instances or a
PramenDb reference, update the factory methods Bookkeeper.fromConfig and the
companion fromJdbcConfig to pass through the existing PramenDb's wrappers (or
the PramenDb itself) when available, and ensure the existing default ctor still
constructs wrappers for callers that don't provide them so behavior remains
backward-compatible.
pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/JournalTable.scala (2)

42-46: Inconsistent column naming convention (pre-existing).

Columns pipelineId, pipelineName, and environmentName (lines 42-44) use camelCase while other columns use snake_case (e.g., job_name, watcher_table_name, started_at). This is inherited from the existing schema migrations in PramenDb.scala (lines 97-99), so not introduced by this PR, but worth noting for future cleanup.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/JournalTable.scala`
around lines 42 - 46, The column names pipelineId, pipelineName, and
environmentName in JournalTable use camelCase while the rest use snake_case;
rename these column definitions to pipeline_id, pipeline_name, and
environment_name (preserving types and O.Length) and update all usages (queries,
mappings, case class field references) that reference JournalTable.pipelineId,
pipelineName, and environmentName so they point to the new identifiers; ensure
the names match the existing DB migrations in PramenDb (so the DB column names
remain consistent) and run/verify compilation and tests to catch any remaining
references.

48-52: 22-field tuple projection is at the Scala 2 limit.

The * projection maps to a 22-element tuple, which is the maximum arity for tupled/unapply in Scala 2. Adding another column will require switching to Slick's HList-based mapping or a nested tuple approach. This is a pre-existing constraint, not introduced by this PR.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/JournalTable.scala`
around lines 48 - 52, The current def * projection in JournalTable returns a
22-field tuple and relies on JournalTask.tupled/JournalTask.unapply which hits
Scala 2's 22-arity limit; change the mapping to avoid a single 22-tuple by
either (a) splitting the columns into two (or more) nested tuples and provide a
custom mapping function that converts the nested tuple(s) to/from JournalTask,
or (b) switch to an HList-based mapping (Slick shapeless support) for >22
fields; update def * to use the new nested-tuple or HList mapping and remove the
direct JournalTask.tupled/unapply usage so the projection no longer exceeds the
22-arity limit.
pramen/core/src/main/scala/za/co/absa/pramen/core/rdb/PramenDb.scala (1)

45-62: Table wrappers in PramenDb are also duplicated in consumer classes.

PramenDb creates public table wrappers (bookkeepingTable, offsetTable, journalTable, lockTicketTable, metadataTable) that are also independently created in BookkeeperJdbc, TokenLockJdbc, etc. This means two sets of TableQuery instances exist for the same tables when using PramenDb-originated connections. While functionally harmless, you could consider passing the PramenDb instance (or its wrappers) directly to consumer classes to avoid the duplication.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pramen/core/src/main/scala/za/co/absa/pramen/core/rdb/PramenDb.scala` around
lines 45 - 62, PramenDb currently instantiates public table wrappers
(bookkeepingTable, offsetTable, journalTable, lockTicketTable, metadataTable)
while consumer classes recreate their own TableQuery instances — change consumer
constructors (e.g., BookkeeperJdbc, TokenLockJdbc, any classes creating
duplicate tables) to accept either the PramenDb instance or explicit table
wrapper instances (BookkeepingTable, OffsetTable, JournalTable, LockTicketTable,
MetadataTable) and use those passed-in wrappers instead of constructing new
ones; update places that construct these consumers to pass
pramenDb.bookkeepingTable / pramenDb.lockTicketTable etc., and remove the
duplicate TableQuery creation from the consumer classes so only PramenDb
provides the shared table wrappers.
pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineLongFixture.scala (1)

216-216: Inconsistent accessor: pramenDb.db vs pramenDb.slickDb.

All other test files in this PR use pramenDb.slickDb (e.g., OffsetManagerJdbcSuite, JournalJdbcSuite), but this fixture uses pramenDb.db. While they resolve to the same Database instance, using a consistent accessor across the codebase improves readability and avoids confusion about whether they differ.

Consider replacing all occurrences of pramenDb.db in this file with pramenDb.slickDb to match the rest of the PR.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineLongFixture.scala`
at line 216, The fixture uses pramenDb.db when creating the OffsetManagerJdbc
instance (val om = new OffsetManagerJdbc(pramenDb.db, pramenDb.slickProfile,
123L); see OffsetManagerJdbc and om), which is inconsistent with other tests
that use pramenDb.slickDb; change the accessor from pramenDb.db to
pramenDb.slickDb throughout this fixture so the Database reference is accessed
the same way as in OffsetManagerJdbcSuite and JournalJdbcSuite.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Outside diff comments:
In `@pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockJdbc.scala`:
- Around line 76-81: The code currently matches
org.postgresql.util.PSQLException (introducing a Postgres-only dependency);
instead remove that specific case and handle database-agnostic constraint
violations by matching java.sql.SQLException and checking SQLState (SQLState
starting with "23" indicates integrity constraint violation) or falling back to
the existing message-contains("constraint") check; update the match in
TokenLockJdbc (replace the case Failure(_: org.postgresql.util.PSQLException)
with case Failure(ex: SQLException) if ex.getSQLState != null &&
ex.getSQLState.startsWith("23") => tryAcquireExistingTicket()) so no PostgreSQL
driver class is referenced while preserving behavior.

In `@pramen/core/src/main/scala/za/co/absa/pramen/core/rdb/PramenDb.scala`:
- Around line 138-149: The addColumn method builds SQL with hardcoded double
quotes which breaks MySQL/SQL Server; update addColumn to use the configured
Slick profile's identifier quoting instead of literal " characters: obtain the
active profile via getProfile(), use its quoting helper (or implement a small
helper that matches getProfile() to return the correct quote chars — backtick
for MySQL, square brackets for SQL Server, double quotes for Postgres/others) to
wrap table and column names, then build and run the ALTER TABLE statement with
db.run(sqlu"...") using those properly quoted identifiers (keep references to
addColumn, getProfile(), db.run and sqlu). Ensure the thrown RuntimeException
remains but the SQL uses profile-aware quoting.

---

Nitpick comments:
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperJdbc.scala`:
- Around line 36-55: The ctor of BookkeeperJdbc currently constructs new
per-profile table wrappers (BookkeepingTable, SchemaTable, OffsetTable,
MetadataTable) which duplicates wrappers already present in PramenDb; refactor
to allow reusing them by adding an overloaded BookkeeperJdbc constructor or
additional optional parameters that accept the table wrapper instances or a
PramenDb reference, update the factory methods Bookkeeper.fromConfig and the
companion fromJdbcConfig to pass through the existing PramenDb's wrappers (or
the PramenDb itself) when available, and ensure the existing default ctor still
constructs wrappers for callers that don't provide them so behavior remains
backward-compatible.

In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/MetadataTable.scala`:
- Line 29: The MetadataTable's value column is declared as def value =
column[String]("value") without any explicit size or SQL type, which can lead to
inconsistent DB defaults; update the declaration for value in MetadataTable to
include an explicit column option such as O.SqlType("TEXT") or an appropriate
O.Length(...) (or document why leaving it unbounded is intentional) so database
mapping is explicit and portable across engines.

In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/OffsetTable.scala`:
- Around line 29-30: The minOffset and maxOffset column definitions currently
lack length/type constraints which can lead to truncation on some DB engines;
update the declarations for def minOffset and def maxOffset to include an
explicit size or text type (e.g., add O.Length(x) or O.SqlType("TEXT") in the
column(...) call) so the columns are created with a sufficient/portable type
across databases.

In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerJdbc.scala`:
- Around line 104-125: commitRerun currently issues two separate db.run calls
(an update then a delete) which can leave stale records if the delete fails;
change it to run both operations in a single transactional DBIO by composing the
two queries (the update on offsetTable.records filtered by
request.tableName/infoDate/createdAt and the delete on offsetTable.records
filtered by request.tableName/infoDate with createdAt =!=
request.createdAt.toEpochMilli) into a DBIO.sequence or DBIO.seq and call
.transactionally before executing, remove the separate .execute() calls and run
the single transactional DBIO so both update and cleanup succeed or roll back
together.

In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/JournalTable.scala`:
- Around line 42-46: The column names pipelineId, pipelineName, and
environmentName in JournalTable use camelCase while the rest use snake_case;
rename these column definitions to pipeline_id, pipeline_name, and
environment_name (preserving types and O.Length) and update all usages (queries,
mappings, case class field references) that reference JournalTable.pipelineId,
pipelineName, and environmentName so they point to the new identifiers; ensure
the names match the existing DB migrations in PramenDb (so the DB column names
remain consistent) and run/verify compilation and tests to catch any remaining
references.
- Around line 48-52: The current def * projection in JournalTable returns a
22-field tuple and relies on JournalTask.tupled/JournalTask.unapply which hits
Scala 2's 22-arity limit; change the mapping to avoid a single 22-tuple by
either (a) splitting the columns into two (or more) nested tuples and provide a
custom mapping function that converts the nested tuple(s) to/from JournalTask,
or (b) switch to an HList-based mapping (Slick shapeless support) for >22
fields; update def * to use the new nested-tuple or HList mapping and remove the
direct JournalTask.tupled/unapply usage so the projection no longer exceeds the
22-arity limit.

In `@pramen/core/src/main/scala/za/co/absa/pramen/core/rdb/PramenDb.scala`:
- Around line 45-62: PramenDb currently instantiates public table wrappers
(bookkeepingTable, offsetTable, journalTable, lockTicketTable, metadataTable)
while consumer classes recreate their own TableQuery instances — change consumer
constructors (e.g., BookkeeperJdbc, TokenLockJdbc, any classes creating
duplicate tables) to accept either the PramenDb instance or explicit table
wrapper instances (BookkeepingTable, OffsetTable, JournalTable, LockTicketTable,
MetadataTable) and use those passed-in wrappers instead of constructing new
ones; update places that construct these consumers to pass
pramenDb.bookkeepingTable / pramenDb.lockTicketTable etc., and remove the
duplicate TableQuery creation from the consumer classes so only PramenDb
provides the shared table wrappers.

In
`@pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineLongFixture.scala`:
- Line 216: The fixture uses pramenDb.db when creating the OffsetManagerJdbc
instance (val om = new OffsetManagerJdbc(pramenDb.db, pramenDb.slickProfile,
123L); see OffsetManagerJdbc and om), which is inconsistent with other tests
that use pramenDb.slickDb; change the accessor from pramenDb.db to
pramenDb.slickDb throughout this fixture so the Database reference is accessed
the same way as in OffsetManagerJdbcSuite and JournalJdbcSuite.

@github-actions
Copy link

github-actions bot commented Feb 17, 2026

Unit Test Coverage

Overall Project 84.34% -0.33% 🍏
Files changed 91.62% 🍏

Module Coverage
pramen:core Jacoco Report 86.28% -0.36% 🍏
Files
Module File Coverage
pramen:core Jacoco Report TokenLockFactoryJdbc.scala 100% 🍏
JournalTable.scala 99.74% -0.26% 🍏
BookkeepingTable.scala 99.44% -0.56% 🍏
OffsetTable.scala 99.4% -0.6% 🍏
MetadataTable.scala 99.22% -0.78% 🍏
LockTicketTable.scala 98.8% -1.2% 🍏
SchemaTable.scala 98.77% -1.23% 🍏
MetadataManagerJdbc.scala 95.37% 🍏
Bookkeeper.scala 92% 🍏
AppContextImpl.scala 91.41% 🍏
PipelineStateImpl.scala 90.33% 🍏
PipelineNotificationBuilderHtml.scala 89.94% 🍏
OffsetManagerJdbc.scala 89.02% 🍏
JournalJdbc.scala 85.78% 🍏
BookkeeperJdbc.scala 69.6% -5.97%
SlickUtils.scala 61.45% 🍏
TokenLockJdbc.scala 57.17% -16.52%
PramenDb.scala 55.95% -13.55%

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (4)
pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineStateImpl.scala (1)

219-219: Carrying thread stack traces into the failure exception is a solid improvement; minor semantic note on class choice.

Using OsSignalException here is functionally correct and better than the previous IllegalStateException (the stack traces are now preserved). However, OsSignalException is semantically modelled for OS signals (the class name, and its message template "The process was interrupted by $signalName." is designed for "SIGINT", "SIGTERM", etc.). Passing "System.exit()" as the signalName will surface the message "The process was interrupted by System.exit()." in pipeline notifications and emails, which could confuse operators — System.exit() is a programmatic JVM call, not an OS signal.

Consider either:

  • Using a more neutral exception type or a dedicated UnexpectedExitException for this path, or
  • Adjusting OsSignalException's message template to accommodate both cases (e.g., "The process was terminated by $signalName.").
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineStateImpl.scala`
at line 219, The current call
setFailureException(OsSignalException("System.exit()", nonDaemonStackTraces))
uses OsSignalException which implies an OS signal; update the failure exception
to a semantically neutral or dedicated type instead: either replace
OsSignalException with a new UnexpectedExitException (or similar) that accepts
the programmatic exit name and stack traces, or modify OsSignalException's
message template to use a neutral wording like "terminated by" so it can accept
"System.exit()" without misleading operators; change the instantiation at the
setFailureException call and ensure the chosen exception class
(UnexpectedExitException or revised OsSignalException) accepts the same
parameters (signal/exit name and nonDaemonStackTraces).
pramen/core/src/test/scala/za/co/absa/pramen/core/tests/lock/TokenLockSQLiteSuite.scala (1)

38-44: Dead code in before block.

The UsingUtils.using(RdbJdbc(jdbcConfig)) block (lines 40-42) has an empty body (the DROP SCHEMA DDL is commented out). This creates an RdbJdbc connection and immediately closes it, which serves no purpose for SQLite. Consider removing the no-op block to avoid confusion.

♻️ Proposed cleanup
   before {
     if (pramenDb != null) pramenDb.close()
-    UsingUtils.using(RdbJdbc(jdbcConfig)) { rdb =>
-      //rdb.executeDDL("DROP SCHEMA PUBLIC CASCADE;")
-    }
     pramenDb = PramenDb(jdbcConfig)
   }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@pramen/core/src/test/scala/za/co/absa/pramen/core/tests/lock/TokenLockSQLiteSuite.scala`
around lines 38 - 44, The UsingUtils.using(RdbJdbc(jdbcConfig)) call in the
before block is a no-op (its body is commented out) and should be removed to
avoid opening and closing an unnecessary RdbJdbc connection; update the before
block in TokenLockSQLiteSuite to delete the
UsingUtils.using(RdbJdbc(jdbcConfig)) { ... } block (or restore the intended DDL
if the drop schema was meant to run), keeping the existing pramenDb close/create
logic (pramenDb.close() and pramenDb = PramenDb(jdbcConfig)).
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerJdbc.scala (1)

29-37: Clean refactoring to profile-parameterized Slick usage.

The constructor change and the per-profile OffsetTable / SlickUtils wiring are consistent with the broader refactoring pattern across the codebase. The approach of instantiating table wrappers as anonymous trait implementations is used identically in PramenDb.scala.

One minor observation: PramenDb already exposes a public offsetTable: OffsetTable (see PramenDb.scala line 49). If callers construct OffsetManagerJdbc with the same slickProfile that PramenDb already holds, a second OffsetTable instance is created. This is functionally harmless (lightweight wrapper), but you could consider accepting the pre-built OffsetTable from PramenDb to avoid duplication.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerJdbc.scala`
around lines 29 - 37, OffsetManagerJdbc currently constructs its own OffsetTable
and SlickUtils (via new OffsetTable { override val profile = slickProfile } and
new SlickUtils(slickProfile) causing duplicate OffsetTable instances when
PramenDb already provides one; change OffsetManagerJdbc to accept an OffsetTable
(and optionally SlickUtils) as constructor parameters (e.g., add a param
offsetTable: OffsetTable) and use that provided instance instead of creating a
new anonymous OffsetTable inside the class, falling back to constructing a
profile-bound OffsetTable only if none is supplied; reference symbols:
OffsetManagerJdbc, OffsetTable, SlickUtils, and PramenDb.offsetTable.
pramen/core/src/main/scala/za/co/absa/pramen/core/rdb/PramenDb.scala (1)

181-193: Driver-to-profile mapping covers a good range of supported databases.

The mapping correctly handles both old and new MySQL drivers, as well as both Microsoft and jTDS SQL Server drivers. The IllegalArgumentException for unrecognized drivers provides a clear error message.

Consider whether Oracle Database support (oracle.jdbc.OracleDriverslick.jdbc.OracleProfile) should be included, given its prevalence in enterprise environments. This could also be deferred to a follow-up if Oracle isn't currently needed.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pramen/core/src/main/scala/za/co/absa/pramen/core/rdb/PramenDb.scala` around
lines 181 - 193, The getProfile method currently maps several JDBC drivers to
Slick profiles but omits Oracle; add pattern matching for Oracle drivers (e.g.,
"oracle.jdbc.OracleDriver" and "oracle.jdbc.driver.OracleDriver") to return
slick.jdbc.OracleProfile in the driver match within getProfile, and update any
relevant tests or documentation to include Oracle support and error messaging
remains unchanged for unknown drivers.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In
`@pramen/core/src/test/scala/za/co/absa/pramen/core/tests/lock/TokenLockSQLiteSuite.scala`:
- Around line 96-111: The test currently uses eventually which may pass
immediately; to verify TokenLockJdbc's background refresh (tokenExpiresSeconds
overridden to 2L on lock1) ensure the test waits past the TTL before checking
that lock2 cannot acquire the lock: after acquiring lock1 (and before the final
assertions) insert a delay longer than 2 seconds (e.g., Thread.sleep > 2000ms)
to let the original ticket expire had it not been refreshed, then
assert(!lock2.tryAcquire()) and assert(!lock1.tryAcquire()) to confirm the
background watcher in TokenLockJdbc keeps the ticket alive.

---

Nitpick comments:
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerJdbc.scala`:
- Around line 29-37: OffsetManagerJdbc currently constructs its own OffsetTable
and SlickUtils (via new OffsetTable { override val profile = slickProfile } and
new SlickUtils(slickProfile) causing duplicate OffsetTable instances when
PramenDb already provides one; change OffsetManagerJdbc to accept an OffsetTable
(and optionally SlickUtils) as constructor parameters (e.g., add a param
offsetTable: OffsetTable) and use that provided instance instead of creating a
new anonymous OffsetTable inside the class, falling back to constructing a
profile-bound OffsetTable only if none is supplied; reference symbols:
OffsetManagerJdbc, OffsetTable, SlickUtils, and PramenDb.offsetTable.

In `@pramen/core/src/main/scala/za/co/absa/pramen/core/rdb/PramenDb.scala`:
- Around line 181-193: The getProfile method currently maps several JDBC drivers
to Slick profiles but omits Oracle; add pattern matching for Oracle drivers
(e.g., "oracle.jdbc.OracleDriver" and "oracle.jdbc.driver.OracleDriver") to
return slick.jdbc.OracleProfile in the driver match within getProfile, and
update any relevant tests or documentation to include Oracle support and error
messaging remains unchanged for unknown drivers.

In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineStateImpl.scala`:
- Line 219: The current call
setFailureException(OsSignalException("System.exit()", nonDaemonStackTraces))
uses OsSignalException which implies an OS signal; update the failure exception
to a semantically neutral or dedicated type instead: either replace
OsSignalException with a new UnexpectedExitException (or similar) that accepts
the programmatic exit name and stack traces, or modify OsSignalException's
message template to use a neutral wording like "terminated by" so it can accept
"System.exit()" without misleading operators; change the instantiation at the
setFailureException call and ensure the chosen exception class
(UnexpectedExitException or revised OsSignalException) accepts the same
parameters (signal/exit name and nonDaemonStackTraces).

In
`@pramen/core/src/test/scala/za/co/absa/pramen/core/tests/lock/TokenLockSQLiteSuite.scala`:
- Around line 38-44: The UsingUtils.using(RdbJdbc(jdbcConfig)) call in the
before block is a no-op (its body is commented out) and should be removed to
avoid opening and closing an unnecessary RdbJdbc connection; update the before
block in TokenLockSQLiteSuite to delete the
UsingUtils.using(RdbJdbc(jdbcConfig)) { ... } block (or restore the intended DDL
if the drop schema was meant to run), keeping the existing pramenDb close/create
logic (pramenDb.close() and pramenDb = PramenDb(jdbcConfig)).

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationBuilderHtml.scala (1)

764-771: ⚠️ Potential issue | 🟡 Minor

diff.tableName is unescaped in the same row block.

While columnName/dataType/oldType/newType are now properly escaped, diff.tableName at line 765 is passed to TextElement without escapeHTML. A table name containing <, >, or & (e.g., from a schema with non-standard naming) would produce malformed HTML in the notification email. Given the fix is being applied to the neighbouring cells, escaping this field here maintains consistency.

🛡️ Proposed fix
-        TextElement(diff.tableName),
+        TextElement(StringUtils.escapeHTML(diff.tableName)),
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationBuilderHtml.scala`
around lines 764 - 771, In PipelineNotificationBuilderHtml, the
tableBuilder.withRow call builds a row using TextElement(diff.tableName) without
escaping HTML; update that usage to wrap diff.tableName with the same escapeHTML
helper used for column/data-type cells (i.e., replace
TextElement(diff.tableName) with TextElement(escapeHTML(diff.tableName))) so
table names containing <, > or & are properly escaped and consistent with the
other cells.
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperJdbc.scala (1)

317-329: ⚠️ Potential issue | 🟡 Minor

saveSchemaRaw is missing ensureDbConnected unlike saveSchema.

saveSchema (Line 303) calls slickUtils.ensureDbConnected(pramenDb.slickDb) before executing queries, but saveSchemaRaw does not. This inconsistency could cause issues if the DB connection has been lost before saveSchemaRaw is called.

Proposed fix
   private[pramen] def saveSchemaRaw(table: String, infoDate: String, schema: String): Unit = {
     try {
+      slickUtils.ensureDbConnected(pramenDb.slickDb)
       pramenDb.slickDb.run(
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperJdbc.scala`
around lines 317 - 329, The saveSchemaRaw method omits the DB connection check
present in saveSchema; call slickUtils.ensureDbConnected(pramenDb.slickDb) at
the start of saveSchemaRaw before running any pramenDb.slickDb queries to ensure
the connection is valid, then proceed with the existing delete and insert logic
and existing NonFatal exception handling (i.e., keep the try/catch and log.error
as-is).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@pramen/core/src/main/scala/za/co/absa/pramen/core/app/AppContextImpl.scala`:
- Around line 42-47: In AppContextImpl.close(), if bookkeeper.close() throws,
closable.close() never runs causing resource leaks; change the method to ensure
both resources are closed regardless of exceptions by wrapping
bookkeeper.close() in a try block and invoking closable.close() in a finally (or
use try-with-resources style/utility) and nulling closable afterwards, and make
sure to capture and rethrow or aggregate exceptions so errors from both
bookkeeper.close() and closable.close() are not silently lost; update the
close() implementation referencing AppContextImpl.close, bookkeeper, and
closable accordingly.

In
`@pramen/core/src/test/scala/za/co/absa/pramen/core/tests/lock/TokenLockSQLiteSuite.scala`:
- Around line 57-92: Tests acquire TokenLock instances via getLock (lock1,
lock2) but may not release them if an assertion fails, leaving TokenLockJdbc's
background refresh thread running; wrap each test's lock usage in a try { ... }
finally { lock.release() } (or multiple locks: release each acquired lock in
finally) so that every acquired TokenLock (references: getLock, TokenLockJdbc,
lock1.release, lock2.release) is guaranteed to be released even on assertion
failure; apply the same try-finally pattern to both tests in this file.
- Around line 37-40: The File constructor arguments are reversed causing dbFile
to point to the wrong path and leaving stale SQLite data between tests; in
TokenLockSQLiteSuite locate where dbFile is created (variable dbFile) and change
the constructor to new File(tempDir, "pramen.sqlite") so the file refers to
$tempDir/pramen.sqlite, then keep the existing exists() and delete() checks to
remove the DB before each test run.

---

Outside diff comments:
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperJdbc.scala`:
- Around line 317-329: The saveSchemaRaw method omits the DB connection check
present in saveSchema; call slickUtils.ensureDbConnected(pramenDb.slickDb) at
the start of saveSchemaRaw before running any pramenDb.slickDb queries to ensure
the connection is valid, then proceed with the existing delete and insert logic
and existing NonFatal exception handling (i.e., keep the try/catch and log.error
as-is).

In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationBuilderHtml.scala`:
- Around line 764-771: In PipelineNotificationBuilderHtml, the
tableBuilder.withRow call builds a row using TextElement(diff.tableName) without
escaping HTML; update that usage to wrap diff.tableName with the same escapeHTML
helper used for column/data-type cells (i.e., replace
TextElement(diff.tableName) with TextElement(escapeHTML(diff.tableName))) so
table names containing <, > or & are properly escaped and consistent with the
other cells.

---

Duplicate comments:
In
`@pramen/core/src/test/scala/za/co/absa/pramen/core/tests/lock/TokenLockSQLiteSuite.scala`:
- Around line 94-107: Test is already correct: TokenLockJdbc is instantiated
with overridden tokenExpiresSeconds = 2L and try-finally ensures
lock1.release(); no changes needed — keep the current test using TokenLockJdbc,
tokenExpiresSeconds, tryAcquire(), Thread.sleep(3000) and release() as written.

Comment on lines +57 to +92
"Token lock" should {
"be able to acquire and release locks" in {
val lock1 = getLock("token1")

assert(lock1.tryAcquire())
assert(!lock1.tryAcquire())

val lock2 = getLock("token1")
assert(!lock2.tryAcquire())

lock1.release()

assert(lock2.tryAcquire())
assert(!lock2.tryAcquire())

lock2.release()
}

"multiple token locks should not affect each other" in {
val lock1 = getLock("token1")
val lock2 = getLock("token2")

assert(lock1.tryAcquire())
assert(lock2.tryAcquire())

assert(!lock1.tryAcquire())
assert(!lock2.tryAcquire())

lock1.release()

assert(lock1.tryAcquire())
assert(!lock2.tryAcquire())

lock1.release()
lock2.release()
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Missing try-finally for lock release may leave background threads running after assertion failures.

If any assertion in test 1 or test 2 fails before release() is called, TokenLockJdbc's background refresh thread stays alive. The subsequent before block closes pramenDb, and that stale thread will encounter errors accessing the now-closed DB. Combined with the reversed File args that prevent SQLite cleanup, this can cause cascading flakiness.

🛡️ Suggested fix for test 1 (apply same pattern to test 2)
     "be able to acquire and release locks" in {
       val lock1 = getLock("token1")

-      assert(lock1.tryAcquire())
-      assert(!lock1.tryAcquire())
-
-      val lock2 = getLock("token1")
-      assert(!lock2.tryAcquire())
-
-      lock1.release()
-
-      assert(lock2.tryAcquire())
-      assert(!lock2.tryAcquire())
-
-      lock2.release()
+      try {
+        assert(lock1.tryAcquire())
+        assert(!lock1.tryAcquire())
+
+        val lock2 = getLock("token1")
+        try {
+          assert(!lock2.tryAcquire())
+          lock1.release()
+          assert(lock2.tryAcquire())
+          assert(!lock2.tryAcquire())
+        } finally {
+          lock2.release()
+        }
+      } finally {
+        lock1.release()
+      }
     }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@pramen/core/src/test/scala/za/co/absa/pramen/core/tests/lock/TokenLockSQLiteSuite.scala`
around lines 57 - 92, Tests acquire TokenLock instances via getLock (lock1,
lock2) but may not release them if an assertion fails, leaving TokenLockJdbc's
background refresh thread running; wrap each test's lock usage in a try { ... }
finally { lock.release() } (or multiple locks: release each acquired lock in
finally) so that every acquired TokenLock (references: getLock, TokenLockJdbc,
lock1.release, lock2.release) is guaranteed to be released even on assertion
failure; apply the same try-finally pattern to both tests in this file.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperJdbc.scala (2)

299-330: ⚠️ Potential issue | 🟠 Major

Non-atomic delete + insert risks permanent schema data loss.

Both saveSchema and saveSchemaRaw issue two independent slickDb.run(...) calls — a delete followed by an insert — with no transaction wrapping. If the delete succeeds and the insert fails (connection drop, constraint violation, etc.), the schema record for that (table, infoDate) pair is permanently gone with no way to recover it.

This is distinct from deleteTable's intentional non-transactional multi-table sweep (which is acceptable because clients retry later). Here, losing a schema record is non-recoverable within normal execution flow.

Wrap both statements in a single transactional DBIOAction:

🔒 Proposed fix: wrap delete+insert in a transaction
-    try {
-      slickUtils.ensureDbConnected(pramenDb.slickDb)
-      pramenDb.slickDb.run(
-        pramenDb.schemaTable.records.filter(t => t.pramenTableName === table && t.infoDate === infoDateStr).delete
-      ).execute()
-
-      pramenDb.slickDb.run(
-        pramenDb.schemaTable.records += SchemaRecord(table, infoDate.toString, schema.json)
-      ).execute()
-    } catch {
-      case NonFatal(ex) => log.error(s"Unable to write to the bookkeeping schema table.", ex)
-    }
+    try {
+      slickUtils.ensureDbConnected(pramenDb.slickDb)
+      val upsertAction = (
+        pramenDb.schemaTable.records.filter(t => t.pramenTableName === table && t.infoDate === infoDateStr).delete
+          andThen (pramenDb.schemaTable.records += SchemaRecord(table, infoDate.toString, schema.json))
+      ).transactionally
+      pramenDb.slickDb.run(upsertAction).execute()
+    } catch {
+      case NonFatal(ex) => log.error(s"Unable to write to the bookkeeping schema table.", ex)
+    }

Apply the same pattern to saveSchemaRaw.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperJdbc.scala`
around lines 299 - 330, The delete followed by insert in saveSchema and
saveSchemaRaw is non-atomic and can permanently lose schema rows if the insert
fails; instead build a single DBIO composed action (e.g. DBIO.seq(deleteAction,
insertAction).transactionally or
deleteAction.andThen(insertAction).transactionally) using the same
pramenDb.schemaTable.records filter delete and the += SchemaRecord insert, then
call pramenDb.slickDb.run(...) once on that transactional DBIO after
slickUtils.ensureDbConnected(pramenDb.slickDb), preserving the existing NonFatal
error handling.

285-297: ⚠️ Potential issue | 🟡 Minor

getLatestSchema is the only query method without error handling.

Every other *FromStorage method wraps the slickUtils.executeQuery call in try { ... } catch { case NonFatal(ex) => throw new RuntimeException(...) }. getLatestSchema has no such guard, so raw Slick/JDBC exceptions (connection errors, serialization failures) propagate directly instead of being wrapped with context.

🛡️ Proposed fix: add consistent error handling
-    slickUtils.executeQuery(pramenDb.slickDb, query)
-      .map(schemaRecord => TableSchema(schemaRecord.pramenTableName, schemaRecord.infoDate, schemaRecord.schemaJson))
-      .flatMap(tableSchema =>
-        TableSchema.toSchemaAndDate(tableSchema)
-      )
-      .headOption
+    try {
+      slickUtils.executeQuery(pramenDb.slickDb, query)
+        .map(schemaRecord => TableSchema(schemaRecord.pramenTableName, schemaRecord.infoDate, schemaRecord.schemaJson))
+        .flatMap(tableSchema => TableSchema.toSchemaAndDate(tableSchema))
+        .headOption
+    } catch {
+      case NonFatal(ex) => throw new RuntimeException(s"Unable to read from the bookkeeping schema table.", ex)
+    }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperJdbc.scala`
around lines 285 - 297, The getLatestSchema method lacks the consistent error
handling used elsewhere: wrap the slickUtils.executeQuery call (the query built
from pramenDb.schemaTable.records for table/infoDate) in a try { ... } catch {
case NonFatal(ex) => throw new RuntimeException(...) } block, catching NonFatal
and rethrowing a RuntimeException that includes contextual details (table and
infoDate) while preserving the original exception as the cause; ensure the rest
of the pipeline (mapping to TableSchema and calling TableSchema.toSchemaAndDate)
remains unchanged so behavior and return type (Option[(StructType, LocalDate)])
are preserved.
🧹 Nitpick comments (1)
pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationBuilderHtml.scala (1)

752-772: HTML-escaping for schema diff cells looks correct.

User-supplied columnName, dataType, oldType, newType, and diff.tableName are all properly routed through StringUtils.escapeHTML before being embedded in the HTML cell content. The hardcoded <b>…</b> bold tags are not in the escape path (intentionally), so formatting is preserved while injection via field names/types is blocked.

The two unescaped values on lines 769–770 (diff.infoDateOld.toString, diff.infoDateNew.toString) are LocalDate instances whose toString yields only digits and hyphens (YYYY-MM-DD), so they carry no XSS risk. However, escaping them would make the escaping policy consistent across the entire row.

♻️ Optional: escape date cells for consistency
-        TextElement(diff.infoDateOld.toString),
-        TextElement(diff.infoDateNew.toString)
+        TextElement(StringUtils.escapeHTML(diff.infoDateOld.toString)),
+        TextElement(StringUtils.escapeHTML(diff.infoDateNew.toString))
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationBuilderHtml.scala`
around lines 752 - 772, The row currently builds date cells using
diff.infoDateOld.toString and diff.infoDateNew.toString without escaping; for
consistency with other cells, pass these values through StringUtils.escapeHTML
before creating TextElement so all user-derived content uses the same escaping
policy—update the call site where tableBuilder.withRow is invoked in
PipelineNotificationBuilderHtml (the block assembling the row for schema diffs
that references diff.tableName, changeCell, oldColumnCell, newColumnCell) to
wrap diff.infoDateOld.toString and diff.infoDateNew.toString with
StringUtils.escapeHTML when constructing their TextElement values.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@pramen/core/src/main/scala/za/co/absa/pramen/core/app/AppContextImpl.scala`:
- Around line 44-49: In AppContextImpl (the close/cleanup logic around
bookkeeper.close() and closable.close()), make the closable field idempotent by
nulling the field before invoking its close to avoid double-close if close()
throws: capture the current closable into a local val, set the class-level
closable = null, then call local.close(); keep the try/finally around
bookkeeper.close() as-is but ensure you reference the same local variable when
closing the resource so the if (closable != null) guard can’t result in a second
close after an exception.

In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperJdbc.scala`:
- Around line 199-207: The code executes listQuery / slickUtils.executeQuery
(producing tablesToDelete) outside the existing try-catch in BookkeeperJdbc, so
move the execution of listQuery and the subsequent length check into the
surrounding try { ... } catch { case NonFatal(ex) => ... } block (or extend that
try to start before val listQuery is executed) so any DB/query failures are
caught and rethrown with the same RuntimeException handling used for other DB
operations; ensure you reference listQuery, tablesToDelete, and the existing
try-catch around the deletion logic when making the change.

---

Outside diff comments:
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperJdbc.scala`:
- Around line 299-330: The delete followed by insert in saveSchema and
saveSchemaRaw is non-atomic and can permanently lose schema rows if the insert
fails; instead build a single DBIO composed action (e.g. DBIO.seq(deleteAction,
insertAction).transactionally or
deleteAction.andThen(insertAction).transactionally) using the same
pramenDb.schemaTable.records filter delete and the += SchemaRecord insert, then
call pramenDb.slickDb.run(...) once on that transactional DBIO after
slickUtils.ensureDbConnected(pramenDb.slickDb), preserving the existing NonFatal
error handling.
- Around line 285-297: The getLatestSchema method lacks the consistent error
handling used elsewhere: wrap the slickUtils.executeQuery call (the query built
from pramenDb.schemaTable.records for table/infoDate) in a try { ... } catch {
case NonFatal(ex) => throw new RuntimeException(...) } block, catching NonFatal
and rethrowing a RuntimeException that includes contextual details (table and
infoDate) while preserving the original exception as the cause; ensure the rest
of the pipeline (mapping to TableSchema and calling TableSchema.toSchemaAndDate)
remains unchanged so behavior and return type (Option[(StructType, LocalDate)])
are preserved.

---

Nitpick comments:
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationBuilderHtml.scala`:
- Around line 752-772: The row currently builds date cells using
diff.infoDateOld.toString and diff.infoDateNew.toString without escaping; for
consistency with other cells, pass these values through StringUtils.escapeHTML
before creating TextElement so all user-derived content uses the same escaping
policy—update the call site where tableBuilder.withRow is invoked in
PipelineNotificationBuilderHtml (the block assembling the row for schema diffs
that references diff.tableName, changeCell, oldColumnCell, newColumnCell) to
wrap diff.infoDateOld.toString and diff.infoDateNew.toString with
StringUtils.escapeHTML when constructing their TextElement values.

Comment on lines 44 to 49
try {
bookkeeper.close()
} finally {
closable.close()
closable = null
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

closable = null is unreachable if closable.close() throws — risk of double-close.

If closable.close() throws, closable = null on Line 48 is skipped. A subsequent call to close() will pass the if (closable != null) guard again and invoke both bookkeeper.close() and closable.close() a second time.

Set the field to null (via a local capture) before calling close() to make the guard idempotent regardless of exceptions:

🛡️ Proposed fix
     try {
       bookkeeper.close()
     } finally {
-      closable.close()
-      closable = null
+      val c = closable
+      closable = null
+      c.close()
     }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
try {
bookkeeper.close()
} finally {
closable.close()
closable = null
}
try {
bookkeeper.close()
} finally {
val c = closable
closable = null
c.close()
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pramen/core/src/main/scala/za/co/absa/pramen/core/app/AppContextImpl.scala`
around lines 44 - 49, In AppContextImpl (the close/cleanup logic around
bookkeeper.close() and closable.close()), make the closable field idempotent by
nulling the field before invoking its close to avoid double-close if close()
throws: capture the current closable into a local val, set the class-level
closable = null, then call local.close(); keep the try/finally around
bookkeeper.close() as-is but ensure you reference the same local variable when
closing the resource so the if (closable != null) guard can’t result in a second
close after an exception.

Comment on lines +199 to 207
val listQuery = pramenDb.bookkeepingTable.records
.filter(r => r.pramenTableName === tableNameTrimmed || r.pramenTableName.like(likePattern, escape))
.map(_.pramenTableName)
.distinct

val tablesToDelete = SlickUtils.executeQuery(db, listQuery).sorted
val tablesToDelete = slickUtils.executeQuery(pramenDb.slickDb, listQuery).sorted

if (tablesToDelete.length > 100)
throw new IllegalArgumentException(s"The table wildcard '$tableName' matches more than 100 tables (${tablesToDelete.length}). To avoid accidental deletions, please refine the wildcard.")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

listQuery execution sits outside the try-catch block.

Line 204 executes a DB query but is not covered by the try { ... } catch { case NonFatal(ex) => throw new RuntimeException(...) } that starts at Line 213. A connection failure or query error during the listing phase will throw a raw exception with no contextual message, unlike every other DB operation in this class.

🛡️ Proposed fix: extend the try-catch to cover the list query
-    val tablesToDelete = slickUtils.executeQuery(pramenDb.slickDb, listQuery).sorted
-
-    if (tablesToDelete.length > 100)
-      throw new IllegalArgumentException(...)
-
-    val deletionQuery = ...
-
-    try {
-      val deletedBkCount = ...
+    val tablesToDelete = try {
+      slickUtils.executeQuery(pramenDb.slickDb, listQuery).sorted
+    } catch {
+      case NonFatal(ex) => throw new RuntimeException(s"Unable to list tables matching '$patternForLogging' from the bookkeeping table.", ex)
+    }
+
+    if (tablesToDelete.length > 100)
+      throw new IllegalArgumentException(...)
+
+    val deletionQuery = ...
+
+    try {
+      val deletedBkCount = ...
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperJdbc.scala`
around lines 199 - 207, The code executes listQuery / slickUtils.executeQuery
(producing tablesToDelete) outside the existing try-catch in BookkeeperJdbc, so
move the execution of listQuery and the subsequent length check into the
surrounding try { ... } catch { case NonFatal(ex) => ... } block (or extend that
try to start before val listQuery is executed) so any DB/query failures are
caught and rethrown with the same RuntimeException handling used for other DB
operations; ensure you reference listQuery, tablesToDelete, and the existing
try-catch around the deletion logic when making the change.

@yruslan yruslan force-pushed the feature/692-use-slick-profile-per-db-engine branch from cbe0b65 to 091add6 Compare February 20, 2026 08:55
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperJdbc.scala (2)

285-297: ⚠️ Potential issue | 🟡 Minor

getLatestSchema is the only read method with no exception handling.

Every other read operation in this class wraps slickUtils.executeQuery in try { ... } catch { case NonFatal(ex) => throw new RuntimeException(..., ex) }. The DB call at line 291 is unguarded, so raw Slick/JDBC exceptions propagate directly to callers instead of being mapped to a descriptive RuntimeException.

🛡️ Proposed fix: add consistent error handling
-    slickUtils.executeQuery(pramenDb.slickDb, query)
-      .map(schemaRecord => TableSchema(schemaRecord.pramenTableName, schemaRecord.infoDate, schemaRecord.schemaJson))
-      .flatMap(tableSchema =>
-        TableSchema.toSchemaAndDate(tableSchema)
-      )
-      .headOption
+    try {
+      slickUtils.executeQuery(pramenDb.slickDb, query)
+        .map(schemaRecord => TableSchema(schemaRecord.pramenTableName, schemaRecord.infoDate, schemaRecord.schemaJson))
+        .flatMap(tableSchema =>
+          TableSchema.toSchemaAndDate(tableSchema)
+        )
+        .headOption
+    } catch {
+      case NonFatal(ex) => throw new RuntimeException(s"Unable to read the latest schema for table '$table' from the bookkeeping schema table.", ex)
+    }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperJdbc.scala`
around lines 285 - 297, The getLatestSchema method currently calls
slickUtils.executeQuery (using pramenDb.slickDb and the query built from
pramenDb.schemaTable) without exception handling; wrap the
slickUtils.executeQuery(...) chain inside a try { ... } catch { case
NonFatal(ex) => throw new RuntimeException("Failed to read latest schema for
table " + table + " and infoDate " + infoDate, ex) } so any Slick/JDBC
exceptions are caught and rethrown as a descriptive RuntimeException; preserve
the existing mapping via TableSchema(...) and TableSchema.toSchemaAndDate inside
the try block.

96-101: ⚠️ Potential issue | 🟡 Minor

IllegalStateException from the integrity guard is caught and re-wrapped with a misleading DB-error message.

Line 97 throws IllegalStateException ("More than one record was returned…") — a data integrity check, not a DB failure. Because it is inside the try block, NonFatal catches it and re-throws it as RuntimeException("Unable to read from the bookkeeping table."). The original intent ("More than one record…") is buried as the cause; the outer message is misleading for on-call diagnostics.

🛡️ Proposed fix: move the integrity guard outside the try block
     try {
       val records = slickUtils.executeQuery(pramenDb.slickDb, query)
         .map(DataChunk.fromRecord)
         .toArray[DataChunk]
-
-      if (records.length > 1)
-        throw new IllegalStateException(s"More than one record was returned when only one was expected. Table: $table, infoDate: $infoDate")
-      records.headOption
     } catch {
       case NonFatal(ex) => throw new RuntimeException(s"Unable to read from the bookkeeping table.", ex)
     }
+
+    if (records.length > 1)
+      throw new IllegalStateException(s"More than one record was returned when only one was expected. Table: $table, infoDate: $infoDate")
+    records.headOption
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperJdbc.scala`
around lines 96 - 101, The integrity-check IllegalStateException thrown by the
guard in BookkeeperJdbc (the line: if (records.length > 1) throw new
IllegalStateException(...)) is currently inside the try block and gets caught by
the catch NonFatal(...) which then wraps it in a misleading RuntimeException;
move the integrity check out of the try block (or ensure it is not wrapped) so
that the IllegalStateException is allowed to propagate unchanged—i.e., first
read records inside the try, exit the try returning records (or the raw result),
then perform the length-check (or perform the check before the catch) so the
specific "More than one record..." IllegalStateException is not re-wrapped by
the catch in BookkeeperJdbc.
🧹 Nitpick comments (2)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperJdbc.scala (1)

302-313: saveSchema silently swallows failures while all other write methods rethrow.

saveRecordCountToStorage, deleteNonCurrentBatchRecords, and deleteTable all rethrow as RuntimeException on error. saveSchema (and saveSchemaRaw) only log.error and return normally. A caller has no way to distinguish a successful write from a failed one, which can cause silent schema-tracking inconsistencies.

saveSchemaRaw is a migration utility where silent failure may be intentional, but the production saveSchema path warrants a rethrow consistent with the rest of the class.

♻️ Proposed fix for `saveSchema`
     } catch {
-      case NonFatal(ex) => log.error(s"Unable to write to the bookkeeping schema table.", ex)
+      case NonFatal(ex) => throw new RuntimeException(s"Unable to write to the bookkeeping schema table.", ex)
     }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperJdbc.scala`
around lines 302 - 313, The saveSchema method currently swallows exceptions
while similar methods (saveRecordCountToStorage, deleteNonCurrentBatchRecords,
deleteTable) rethrow; update saveSchema to log the error and then rethrow a
RuntimeException (including the original exception) so callers can detect
failures, mirroring the error handling pattern used by those methods; leave
saveSchemaRaw behavior unchanged if it is intended for migrations.
pramen/core/src/test/scala/za/co/absa/pramen/core/tests/lock/TokenLockSQLiteSuite.scala (1)

50-55: Temp directory may leak if pramenDb.close() throws.

FileUtils.deleteDirectory is only reached if pramenDb.close() succeeds. A try-finally ensures the directory is always cleaned up.

♻️ Suggested fix
  override def afterAll(): Unit = {
-   if (pramenDb != null) pramenDb.close()
-   FileUtils.deleteDirectory(new File(tempDir))
-
-   super.afterAll()
+   try {
+     if (pramenDb != null) pramenDb.close()
+   } finally {
+     FileUtils.deleteDirectory(new File(tempDir))
+     super.afterAll()
+   }
  }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@pramen/core/src/test/scala/za/co/absa/pramen/core/tests/lock/TokenLockSQLiteSuite.scala`
around lines 50 - 55, The cleanup in afterAll currently calls pramenDb.close()
and only then deletes tempDir, so if pramenDb.close() throws the temp directory
may leak; update afterAll to ensure FileUtils.deleteDirectory(new File(tempDir))
always runs by wrapping the close in a try (or checking/pramenDb != null) and
placing the deleteDirectory call in a finally block, and still call
super.afterAll() after cleanup; reference the afterAll method, pramenDb,
tempDir, FileUtils.deleteDirectory and super.afterAll in your change.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Outside diff comments:
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperJdbc.scala`:
- Around line 285-297: The getLatestSchema method currently calls
slickUtils.executeQuery (using pramenDb.slickDb and the query built from
pramenDb.schemaTable) without exception handling; wrap the
slickUtils.executeQuery(...) chain inside a try { ... } catch { case
NonFatal(ex) => throw new RuntimeException("Failed to read latest schema for
table " + table + " and infoDate " + infoDate, ex) } so any Slick/JDBC
exceptions are caught and rethrown as a descriptive RuntimeException; preserve
the existing mapping via TableSchema(...) and TableSchema.toSchemaAndDate inside
the try block.
- Around line 96-101: The integrity-check IllegalStateException thrown by the
guard in BookkeeperJdbc (the line: if (records.length > 1) throw new
IllegalStateException(...)) is currently inside the try block and gets caught by
the catch NonFatal(...) which then wraps it in a misleading RuntimeException;
move the integrity check out of the try block (or ensure it is not wrapped) so
that the IllegalStateException is allowed to propagate unchanged—i.e., first
read records inside the try, exit the try returning records (or the raw result),
then perform the length-check (or perform the check before the catch) so the
specific "More than one record..." IllegalStateException is not re-wrapped by
the catch in BookkeeperJdbc.

---

Duplicate comments:
In `@pramen/core/src/main/scala/za/co/absa/pramen/core/app/AppContextImpl.scala`:
- Around line 44-49: The null-assignment for closable is currently after the
finally block and can be skipped if closable.close() throws; update the close
sequence in AppContextImpl so you capture the current closable into a local val
(e.g., val toClose = closable), perform bookkeeper.close() then in a finally
block call toClose.close() and set closable = null inside that finally so the
guard if (closable != null) becomes idempotent even if close() throws; ensure
you reference the existing bookkeeper.close() and closable.close() calls and
move the assignment into the finally.

In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperJdbc.scala`:
- Around line 199-204: The call to slickUtils.executeQuery(pramenDb.slickDb,
listQuery) happens before the surrounding try-catch in BookkeeperJdbc, so
connection/Slick failures there escape uncaught; move the execution of listQuery
(the evaluation of tablesToDelete) inside the existing try block that wraps
other DB operations in the same method (or add a dedicated try-catch around it)
and ensure the catch logs a contextual error message using the same logger
pattern used for other DB calls in BookkeeperJdbc so failures surface with
consistent diagnostic information.

In
`@pramen/core/src/test/scala/za/co/absa/pramen/core/tests/lock/TokenLockSQLiteSuite.scala`:
- Around line 58-92: The two tests ("be able to acquire and release locks" and
"multiple token locks should not affect each other") must ensure locks are
always released even if assertions fail: wrap each lock acquisition/usage in a
try { ... } finally { lock.release() } (or nested try-finally for multiple
locks) so that getLock("token1") and getLock("token2") releases are guaranteed;
apply the same pattern used in the third test (which already uses try-finally)
to both tests to prevent TokenLockJdbc background refresh threads from running
against a closed DB.

---

Nitpick comments:
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperJdbc.scala`:
- Around line 302-313: The saveSchema method currently swallows exceptions while
similar methods (saveRecordCountToStorage, deleteNonCurrentBatchRecords,
deleteTable) rethrow; update saveSchema to log the error and then rethrow a
RuntimeException (including the original exception) so callers can detect
failures, mirroring the error handling pattern used by those methods; leave
saveSchemaRaw behavior unchanged if it is intended for migrations.

In
`@pramen/core/src/test/scala/za/co/absa/pramen/core/tests/lock/TokenLockSQLiteSuite.scala`:
- Around line 50-55: The cleanup in afterAll currently calls pramenDb.close()
and only then deletes tempDir, so if pramenDb.close() throws the temp directory
may leak; update afterAll to ensure FileUtils.deleteDirectory(new File(tempDir))
always runs by wrapping the close in a try (or checking/pramenDb != null) and
placing the deleteDirectory call in a finally block, and still call
super.afterAll() after cleanup; reference the afterAll method, pramenDb,
tempDir, FileUtils.deleteDirectory and super.afterAll in your change.

@yruslan yruslan merged commit d04fd5b into main Feb 20, 2026
7 checks passed
@yruslan yruslan deleted the feature/692-use-slick-profile-per-db-engine branch February 20, 2026 09:37
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Use Slick profile corresponding to the bookkeeping database vendor

1 participant