README
¶
oniontx 
oniontx enables moving persistence logic control (for example: transaction management) from the Persistence (repository) layer
to the Application (service) layer using an owner-defined contract.
The library provides two complementary approaches that can be used independently or together:
mtxpackage: Local ACID transactions for single-resource operationssagapackage: Distributed compensating transactions for multi-resource coordination
Both packages maintain clean architecture principles by keeping transaction control at the application level while repositories remain focused on data access.
💡 Key Features
- Clean Architecture First: Transactions managed at the application layer, not in repositories
- Dual Transaction Support:
mtxpackage for local ACID transactions (single database)sagapackage for distributed compensating transactions (multiple services/databases)
- Database Agnostic: Ready-to-use implementations for popular databases and libraries
- Testability First: Built-in support for major testing frameworks
- Type-Safe: Full generics support for compile-time safety
- Context-Aware: Proper context propagation throughout transaction boundaries
Package mtx - Local Transactions
🔴 NOTE: Use mtx when working with a single database instance.
It manages ACID transactions across multiple repositories.
For multiple repositories, use mtx.Transactor with saga.Sagaⓘ.
The core entity is Transactor — it provides a clean abstraction over database transactions and offers:
- simple implementation for
stdlib - simple implementation for popular libraries
- custom implementation's contract
- simple testing with testing frameworks
Default implementation examples for libs
test/integration module contains examples
of default Transactor implementations (stdlib, sqlx, pgx, gorm, redis, mongo):
Custom implementation
If required, oniontx provides the ability to
implement custom algorithms for managing transactions (see examples).
Interfaces:
type (
// Mandatory
TxBeginner[T Tx] interface {
comparable
BeginTx(ctx context.Context) (T, error)
}
// Mandatory
Tx interface {
Rollback(ctx context.Context) error
Commit(ctx context.Context) error
}
// Optional - using to putting/getting transaction from `context.Context`
// (library contains default `СtxOperator` implementation)
СtxOperator[T Tx] interface {
Inject(ctx context.Context, tx T) context.Context
Extract(ctx context.Context) (T, bool)
}
)
Examples
❗ ️These examples are based on the stdlib package.
TxBeginner and Tx implementations:
// Prepared contracts for execution
package db
import (
"context"
"database/sql"
"github.com/kozmod/oniontx/mtx"
)
// Executor represents common methods of sql.DB and sql.Tx.
type Executor interface {
ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error)
}
// DB is sql.DB wrapper, implements mtx.TxBeginner.
type DB struct {
*sql.DB
}
func (db *DB) BeginTx(ctx context.Context) (*Tx, error) {
var txOptions sql.TxOptions
for _, opt := range opts {
opt.Apply(&txOptions)
}
tx, err := db.DB.BeginTx(ctx, &txOptions)
return &Tx{Tx: tx}, err
}
// Tx is sql.Tx wrapper, implements mtx.Tx.
type Tx struct {
*sql.Tx
}
func (t *Tx) Rollback(_ context.Context) error {
return t.Tx.Rollback()
}
func (t *Tx) Commit(_ context.Context) error {
return t.Tx.Commit()
}
Repositories implementation:
package repoA
import (
"context"
"fmt"
"github.com/kozmod/oniontx/mtx"
"github.com/user/some_project/internal/db"
)
type RepositoryA struct {
Transactor *mtx.Transactor[*db.DB, *db.Tx]
}
func (r RepositoryA) Insert(ctx context.Context, val int) error {
var executor db.Executor
executor, ok := r.Transactor.TryGetTx(ctx)
if !ok {
executor = r.Transactor.TxBeginner()
}
_, err := executor.ExecContext(ctx, "UPDATE some_A SET value = $1", val)
if err != nil {
return fmt.Errorf("update 'some_A': %w", err)
}
return nil
}
package repoB
import (
"context"
"fmt"
"github.com/kozmod/oniontx/mtx"
"github.com/user/some_project/internal/db"
)
type RepositoryB struct {
Transactor *mtx.Transactor[*db.DB, *db.Tx]
}
func (r RepositoryB) Insert(ctx context.Context, val int) error {
var executor db.Executor
executor, ok := r.Transactor.TryGetTx(ctx)
if !ok {
executor = r.Transactor.TxBeginner()
}
_, err := executor.ExecContext(ctx, "UPDATE some_A SET value = $1", val)
if err != nil {
return fmt.Errorf("update 'some_A': %w", err)
}
return nil
}
UseCase implementation:
package usecase
import (
"context"
"fmt"
)
type (
// transactor is the contract of the mtx.Transactor
transactor interface {
WithinTx(ctx context.Context, fn func(ctx context.Context) error) (err error)
}
// Repo is the contract of repositories
repo interface {
Insert(ctx context.Context, val int) error
}
)
type UseCase struct {
RepoA repo
RepoB repo
Transactor transactor
}
func (s *UseCase) Exec(ctx context.Context, insert int) error {
err := s.Transactor.WithinTx(ctx, func(ctx context.Context) error {
if err := s.RepoA.Insert(ctx, insert); err != nil {
return fmt.Errorf("call repository A: %w", err)
}
if err := s.RepoB.Insert(ctx, insert); err != nil {
return fmt.Errorf("call repository B: %w", err)
}
return nil
})
if err != nil {
return fmt.Errorf(" execute: %w", err)
}
return nil
}
Configuring:
package main
import (
"context"
"database/sql"
"os"
"github.com/kozmod/oniontx/mtx"
"github.com/user/some_project/internal/repoA"
"github.com/user/some_project/internal/repoB"
"github.com/user/some_project/internal/usecase"
)
func main() {
var (
database *sql.DB // database pointer
wrapper = &db.DB{DB: database}
operator = mtx.NewContextOperator[*db.DB, *db.Tx](&wrapper)
transactor = mtx.NewTransactor[*db.DB, *db.Tx](wrapper, operator)
repositoryA = repoA.RepositoryA{
Transactor: transactor,
}
repositoryB = repoB.RepositoryB{
Transactor: transactor,
}
useCase = usecase.UseCase{
RepoA: &repositoryA,
RepoB: &repositoryB,
Transactor: transactor,
}
)
err := useCase.Exec(context.Background(), 1)
if err != nil {
os.Exit(1)
}
}
Execution transaction in the different use cases
Executing the same transaction for different UseCases using the same Transactor instance
UseCases:
package a
import (
"context"
"fmt"
)
type (
// transactor is the contract of the mtx.Transactor
transactor interface {
WithinTx(ctx context.Context, fn func(ctx context.Context) error) (err error)
}
// Repo is the contract of repositories
repoA interface {
Insert(ctx context.Context, val int) error
Delete(ctx context.Context, val float64) error
}
)
type UseCaseA struct {
Repo repoA
Transactor transactor
}
func (s *UseCaseA) Exec(ctx context.Context, insert int, delete float64) error {
err := s.Transactor.WithinTx(ctx, func(ctx context.Context) error {
if err := s.Repo.Insert(ctx, insert); err != nil {
return fmt.Errorf("call repository - insert: %w", err)
}
if err := s.Repo.Delete(ctx, delete); err != nil {
return fmt.Errorf("call repository - delete: %w", err)
}
return nil
})
if err != nil {
return fmt.Errorf("usecaseA - execute: %w", err)
}
return nil
}
package b
import (
"context"
"fmt"
)
type (
// transactor is the contract of the mtx.Transactor
transactor interface {
WithinTx(ctx context.Context, fn func(ctx context.Context) error) (err error)
}
// Repo is the contract of repositories
repoB interface {
Insert(ctx context.Context, val string) error
}
// Repo is the contract of the useCase
useCaseA interface {
Exec(ctx context.Context, insert int, delete float64) error
}
)
type UseCaseB struct {
Repo repoB
UseCaseA useCaseA
Transactor transactor
}
func (s *UseCaseB) Exec(ctx context.Context, insertA string, insertB int, delete float64) error {
err := s.Transactor.WithinTx(ctx, func(ctx context.Context) error {
if err := s.Repo.Insert(ctx, insertA); err != nil {
return fmt.Errorf("call repository - insert: %w", err)
}
if err := s.UseCaseA.Exec(ctx, insertB, delete); err != nil {
return fmt.Errorf("call usecaseB - exec: %w", err)
}
return nil
})
if err != nil {
return fmt.Errorf("execute: %w", err)
}
return nil
}
Main:
package main
import (
"context"
"database/sql"
"os"
"github.com/kozmod/oniontx/mtx"
"github.com/user/some_project/internal/db"
"github.com/user/some_project/internal/repoA"
"github.com/user/some_project/internal/repoB"
"github.com/user/some_project/internal/usecase/a"
"github.com/user/some_project/internal/usecase/b"
)
func main() {
var (
database *sql.DB // database pointer
wrapper = &db.DB{DB: database}
operator = mtx.NewContextOperator[*db.DB, *db.Tx](&wrapper)
transactor = mtx.NewTransactor[*db.DB, *db.Tx](wrapper, operator)
useCaseA = a.UseCaseA{
Repo: repoA.RepositoryA{
Transactor: transactor,
},
}
useCaseB = b.UseCaseB{
Repo: repoB.RepositoryB{
Transactor: transactor,
},
UseCaseA: &useCaseA,
}
)
err := useCaseB.Exec(context.Background(), "some_to_insert_useCase_A", 1, 1.1)
if err != nil {
os.Exit(1)
}
}
Package saga - Distributed Transactions
Use saga when coordinating operations across multiple services, databases,
or external systems. It implements the In-Progress Saga pattern with compensating actions
to maintain data consistency in distributed environments.
Unlike Distributed Sagas that require a centralized orchestrator or choreography between services, this implementation is designed as an In-Progress Saga where:
- The saga execution happens within a single process/monolith
- All steps are defined and executed locally
- Compensations are called within the same process
- No distributed coordination or persistent saga state is required
The Saga coordinates the execution of a business process consisting of multiple steps.
Each step contains:
- Action: The main operation to execute
- Compensation: A rollback operation that undoes the action if later steps fail
Steps execute sequentially. If any step fails, all previous steps are automatically compensated, ensuring system consistency.
Example:
steps := []saga.Step{
saga.NewStep("first_step").
WithAction(
// Add action with decorators
saga.NewAction(func(ctx context.Context, track saga.Track) error {
err := fmt.Errorf("first_step_Error")
return err
}).
// Protection against panics — important for production!
// If the action panics, the panic will be caught
// and returned as an error with ErrPanicRecovered
WithPanicRecovery().
// Add retry for action
WithRetry(
// 2 attempts, 1s between attempts
saga.NewBaseRetryOpt(2, 1*time.Second),
),
).
// Add compensation
WithCompensation(
saga.NewCompensation(func(ctx context.Context, track saga.Track) error {
// Compensation logic.
// Use track.GetData() to inspect what failed
data := track.GetData()
if len(data.Action.Errors) > 0 {
log.Printf("Compensating for error: %v", data.Action.Errors[0])
}
return performCompensation(ctx)
}).
// Compensation can also have retry logic
WithRetry(
saga.NewAdvanceRetryPolicy(
2, // max attempts
1*time.Second, // initial delay
saga.NewExponentialBackoff(), // exponential backoff
).
// Jitter prevents "thundering herd" during mass failures
WithJitter(
saga.NewFullJitter(), // random delay
).
// maximum delay cap
WithMaxDelay(10*time.Second),
),
).
WithCompensationRequired(),
}
// Execute the saga
//
// With this approach:
// 1. If action fails, it will be retried according to the retry policy
// 2. If all attempts fail, compensations will run
// 3. Compensations will also retry on failure with exponential backoff
// 4. Jitter distributes load during mass failure scenarios
result, err := saga.NewSaga(steps).Execute(context.Background())
if err != nil {
// Handle the `Result` and errors
}
More examples:
Testing
test package contains useful examples for creating unit test:
Directories
¶
| Path | Synopsis |
|---|---|
|
gorm
module
|
|
|
internal
|
|
|
Package mtx provides a flexible transaction management system with support for nested transactions, panic recovery, and context-based transaction propagation.
|
Package mtx provides a flexible transaction management system with support for nested transactions, panic recovery, and context-based transaction propagation. |
|
pgx
module
|
|
|
sqlx
module
|
|
|
stdlib
module
|
|
|
test
module
|

