oniontx enables moving persistence logic control (for example: transaction management) from the Persistence (repository) layer
to the Application (service) layer using an owner-defined contract.
The library provides two complementary approaches that can be used independently or together:
mtxpackage: Local ACID transactions for single-resource operationssagapackage: Distributed compensating transactions for multi-resource coordination
Both packages maintain clean architecture principles by keeping transaction control at the application level while repositories remain focused on data access.
- Clean Architecture First: Transactions managed at the application layer, not in repositories
- Dual Transaction Support:
mtxpackage for local ACID transactions (single database)sagapackage for distributed compensating transactions (multiple services/databases)
- Database Agnostic: Ready-to-use implementations for popular databases and libraries
- Testability First: Built-in support for major testing frameworks
- Type-Safe: Full generics support for compile-time safety
- Context-Aware: Proper context propagation throughout transaction boundaries
🔴 NOTE: Use mtx when working with a single database instance.
It manages ACID transactions across multiple repositories.
For multiple repositories, use mtx.Transactor with saga.Sagaⓘ.
The core entity is Transactor — it provides a clean abstraction over database transactions and offers:
- simple implementation for
stdlib - simple implementation for popular libraries
- custom implementation's contract
- simple testing with testing frameworks
test/integration module contains examples
of default Transactor implementations (stdlib, sqlx, pgx, gorm, redis, mongo):
If required, oniontx provides the ability to
implement custom algorithms for managing transactions (see examples).
type (
// Mandatory
TxBeginner[T Tx] interface {
comparable
BeginTx(ctx context.Context) (T, error)
}
// Mandatory
Tx interface {
Rollback(ctx context.Context) error
Commit(ctx context.Context) error
}
// Optional - using to putting/getting transaction from `context.Context`
// (library contains default `СtxOperator` implementation)
СtxOperator[T Tx] interface {
Inject(ctx context.Context, tx T) context.Context
Extract(ctx context.Context) (T, bool)
}
)❗ ️These examples are based on the stdlib package.
TxBeginner and Tx implementations:
// Prepared contracts for execution
package db
import (
"context"
"database/sql"
"github.com/kozmod/oniontx/mtx"
)
// Executor represents common methods of sql.DB and sql.Tx.
type Executor interface {
ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error)
}
// DB is sql.DB wrapper, implements mtx.TxBeginner.
type DB struct {
*sql.DB
}
func (db *DB) BeginTx(ctx context.Context) (*Tx, error) {
var txOptions sql.TxOptions
for _, opt := range opts {
opt.Apply(&txOptions)
}
tx, err := db.DB.BeginTx(ctx, &txOptions)
return &Tx{Tx: tx}, err
}
// Tx is sql.Tx wrapper, implements mtx.Tx.
type Tx struct {
*sql.Tx
}
func (t *Tx) Rollback(_ context.Context) error {
return t.Tx.Rollback()
}
func (t *Tx) Commit(_ context.Context) error {
return t.Tx.Commit()
}Repositories implementation:
package repoA
import (
"context"
"fmt"
"github.com/kozmod/oniontx/mtx"
"github.com/user/some_project/internal/db"
)
type RepositoryA struct {
Transactor *mtx.Transactor[*db.DB, *db.Tx]
}
func (r RepositoryA) Insert(ctx context.Context, val int) error {
var executor db.Executor
executor, ok := r.Transactor.TryGetTx(ctx)
if !ok {
executor = r.Transactor.TxBeginner()
}
_, err := executor.ExecContext(ctx, "UPDATE some_A SET value = $1", val)
if err != nil {
return fmt.Errorf("update 'some_A': %w", err)
}
return nil
}package repoB
import (
"context"
"fmt"
"github.com/kozmod/oniontx/mtx"
"github.com/user/some_project/internal/db"
)
type RepositoryB struct {
Transactor *mtx.Transactor[*db.DB, *db.Tx]
}
func (r RepositoryB) Insert(ctx context.Context, val int) error {
var executor db.Executor
executor, ok := r.Transactor.TryGetTx(ctx)
if !ok {
executor = r.Transactor.TxBeginner()
}
_, err := executor.ExecContext(ctx, "UPDATE some_A SET value = $1", val)
if err != nil {
return fmt.Errorf("update 'some_A': %w", err)
}
return nil
}UseCase implementation:
package usecase
import (
"context"
"fmt"
)
type (
// transactor is the contract of the mtx.Transactor
transactor interface {
WithinTx(ctx context.Context, fn func(ctx context.Context) error) (err error)
}
// Repo is the contract of repositories
repo interface {
Insert(ctx context.Context, val int) error
}
)
type UseCase struct {
RepoA repo
RepoB repo
Transactor transactor
}
func (s *UseCase) Exec(ctx context.Context, insert int) error {
err := s.Transactor.WithinTx(ctx, func(ctx context.Context) error {
if err := s.RepoA.Insert(ctx, insert); err != nil {
return fmt.Errorf("call repository A: %w", err)
}
if err := s.RepoB.Insert(ctx, insert); err != nil {
return fmt.Errorf("call repository B: %w", err)
}
return nil
})
if err != nil {
return fmt.Errorf(" execute: %w", err)
}
return nil
}Configuring:
package main
import (
"context"
"database/sql"
"os"
"github.com/kozmod/oniontx/mtx"
"github.com/user/some_project/internal/repoA"
"github.com/user/some_project/internal/repoB"
"github.com/user/some_project/internal/usecase"
)
func main() {
var (
database *sql.DB // database pointer
wrapper = &db.DB{DB: database}
operator = mtx.NewContextOperator[*db.DB, *db.Tx](&wrapper)
transactor = mtx.NewTransactor[*db.DB, *db.Tx](wrapper, operator)
repositoryA = repoA.RepositoryA{
Transactor: transactor,
}
repositoryB = repoB.RepositoryB{
Transactor: transactor,
}
useCase = usecase.UseCase{
RepoA: &repositoryA,
RepoB: &repositoryB,
Transactor: transactor,
}
)
err := useCase.Exec(context.Background(), 1)
if err != nil {
os.Exit(1)
}
}Executing the same transaction for different UseCases using the same Transactor instance
UseCases:
package a
import (
"context"
"fmt"
)
type (
// transactor is the contract of the mtx.Transactor
transactor interface {
WithinTx(ctx context.Context, fn func(ctx context.Context) error) (err error)
}
// Repo is the contract of repositories
repoA interface {
Insert(ctx context.Context, val int) error
Delete(ctx context.Context, val float64) error
}
)
type UseCaseA struct {
Repo repoA
Transactor transactor
}
func (s *UseCaseA) Exec(ctx context.Context, insert int, delete float64) error {
err := s.Transactor.WithinTx(ctx, func(ctx context.Context) error {
if err := s.Repo.Insert(ctx, insert); err != nil {
return fmt.Errorf("call repository - insert: %w", err)
}
if err := s.Repo.Delete(ctx, delete); err != nil {
return fmt.Errorf("call repository - delete: %w", err)
}
return nil
})
if err != nil {
return fmt.Errorf("usecaseA - execute: %w", err)
}
return nil
}package b
import (
"context"
"fmt"
)
type (
// transactor is the contract of the mtx.Transactor
transactor interface {
WithinTx(ctx context.Context, fn func(ctx context.Context) error) (err error)
}
// Repo is the contract of repositories
repoB interface {
Insert(ctx context.Context, val string) error
}
// Repo is the contract of the useCase
useCaseA interface {
Exec(ctx context.Context, insert int, delete float64) error
}
)
type UseCaseB struct {
Repo repoB
UseCaseA useCaseA
Transactor transactor
}
func (s *UseCaseB) Exec(ctx context.Context, insertA string, insertB int, delete float64) error {
err := s.Transactor.WithinTx(ctx, func(ctx context.Context) error {
if err := s.Repo.Insert(ctx, insertA); err != nil {
return fmt.Errorf("call repository - insert: %w", err)
}
if err := s.UseCaseA.Exec(ctx, insertB, delete); err != nil {
return fmt.Errorf("call usecaseB - exec: %w", err)
}
return nil
})
if err != nil {
return fmt.Errorf("execute: %w", err)
}
return nil
}Main:
package main
import (
"context"
"database/sql"
"os"
"github.com/kozmod/oniontx/mtx"
"github.com/user/some_project/internal/db"
"github.com/user/some_project/internal/repoA"
"github.com/user/some_project/internal/repoB"
"github.com/user/some_project/internal/usecase/a"
"github.com/user/some_project/internal/usecase/b"
)
func main() {
var (
database *sql.DB // database pointer
wrapper = &db.DB{DB: database}
operator = mtx.NewContextOperator[*db.DB, *db.Tx](&wrapper)
transactor = mtx.NewTransactor[*db.DB, *db.Tx](wrapper, operator)
useCaseA = a.UseCaseA{
Repo: repoA.RepositoryA{
Transactor: transactor,
},
}
useCaseB = b.UseCaseB{
Repo: repoB.RepositoryB{
Transactor: transactor,
},
UseCaseA: &useCaseA,
}
)
err := useCaseB.Exec(context.Background(), "some_to_insert_useCase_A", 1, 1.1)
if err != nil {
os.Exit(1)
}
}Use saga when coordinating operations across multiple services, databases,
or external systems. It implements the In-Progress Saga pattern with compensating actions
to maintain data consistency in distributed environments.
Unlike Distributed Sagas that require a centralized orchestrator or choreography between services, this implementation is designed as an In-Progress Saga where:
- The saga execution happens within a single process/monolith
- All steps are defined and executed locally
- Compensations are called within the same process
- No distributed coordination or persistent saga state is required
The Saga coordinates the execution of a business process consisting of multiple steps.
Each step contains:
- Action: The main operation to execute
- Compensation: A rollback operation that undoes the action if later steps fail
Steps execute sequentially. If any step fails, all previous steps are automatically compensated in reverse order, ensuring system consistency
Example:
// Use StepBuilder for more complex configuration
// This approach provides access to all library features:
// - Panic recovery
// - Retry policies
// - Custom backoff strategies
// - Jitter for load distribution
steps := []saga.Step{
saga.NewStep("first_step").
WithAction(
// Add action with decorators
saga.NewAction(func(ctx context.Context) error {
// Simulate error to demonstrate retry
return fmt.Errorf("first_step_Error")
}).
// Protection against panics — important for production!
// If the action panics, the panic will be caught
// and returned as an error with ErrPanicRecovered
WithPanicRecovery().
// Add retry for action
WithRetry(
// 2 attempts, 1s between attempts
saga.NewBaseRetryOpt(2, 1*time.Second).
// Return all errors which arise during retries
WithReturnAllAroseErr(), ), ).
// Add compensation
WithCompensation(
saga.NewCompensation(func(ctx context.Context, aroseErr error) error {
// Compensation logic.
// aroseErr — error from action that triggered compensation
// This can be useful for logging or strategy selection
return nil
}).
// Compensation can also have retry logic
WithRetry(
saga.NewAdvanceRetryPolicy(
2, // max attempts
1*time.Second, // initial delay
saga.NewExponentialBackoff(), // exponential backoff
).
// Jitter prevents "thundering herd"
WithJitter(
// random delay
saga.NewFullJitter(),
).
// maximum delay
WithMaxDelay(10 * time.Second), ), ),}
// Execute the saga
//
// With this approach:
// 1. If action fails, there will be 2 attempts with exponential backoff
// 2. If all attempts fail, compensations will run
// 3. Compensations will also retry on failure
// 4. Jitter distributes load during mass failures
err := saga.NewSaga(steps).Execute(context.Background())
if err != nil {
// Handle error
}More examples:
test package contains useful examples for creating unit test:


