xoutbox

package module
v0.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 3, 2026 License: MIT Imports: 3 Imported by: 0

README

xoutbox

🚀 A lightweight, production‑ready implementation of the Transactional Outbox Pattern for Go.

xoutbox helps you reliably publish events to message brokers (Kafka, NATS, RabbitMQ, etc.) without losing messages and without distributed transactions.

It ensures atomicity between database writes and event publishing using the Outbox Pattern.

Repository:
https://github.com/Ali127Dev/xoutbox


✨ Features

✅ Transactional Outbox pattern
✅ Broker‑agnostic (Kafka, NATS, RabbitMQ, etc.)
✅ Pluggable storage (Postgres, MySQL, etc.)
✅ Generic event ID support
✅ Safe concurrent workers
✅ Retry & dead‑letter support
✅ Simple interfaces
✅ Production‑friendly


🧠 The Problem

Imagine a typical service:

db.Save(order)
broker.Publish(OrderCreated)

If the service crashes between these two operations:

  • ✅ Order saved
  • ❌ Event never published

Your system becomes inconsistent.

Distributed transactions are heavy and usually avoided.


✅ The Solution: Transactional Outbox Pattern

Instead of publishing events directly, we store them in an outbox table inside the same database transaction.

Service
   │
   ├── Save business data
   ├── Insert event into outbox
   │
   ▼
Database
   │
   ▼
Outbox Worker
   │
   ▼
Message Broker

Result:

✅ No lost messages
✅ At‑least‑once delivery
✅ Reliable event publishing


📦 Installation

go get github.com/Ali127Dev/xoutbox

📐 Core Concepts

xoutbox is built around two main interfaces:

  • Publisher
  • Store

These allow the library to remain broker‑agnostic and database‑agnostic.


🧾 Event Model

type Event[T comparable] struct {
    ID          T
    EventType   string
    Payload     []byte
    Status      Status
    RetryCount  int
    MaxRetries  int
    CreatedAt   time.Time
    PublishedAt *time.Time
}

📡 Publisher Interface

type Publisher[T comparable] interface {
    Publish(ctx context.Context, event Event[T]) error
}

🗄 Database Schema Example

CREATE TABLE outbox (
    id TEXT PRIMARY KEY,
    event_type TEXT NOT NULL,
    payload JSONB NOT NULL,
    status TEXT NOT NULL DEFAULT 'pending',
    retry_count INT NOT NULL DEFAULT 0,
    max_retries INT NOT NULL DEFAULT 5,
    created_at TIMESTAMP NOT NULL DEFAULT NOW(),
    published_at TIMESTAMP
);

🔌 Kafka Publisher (Sarama Example)

type KafkaPublisher struct {
    producer sarama.AsyncProducer
    topic    string
}

func (p *KafkaPublisher) Publish(ctx context.Context, event xoutbox.Event[string]) error {
    msg := &sarama.ProducerMessage{
        Topic: p.topic,
        Key:   sarama.StringEncoder(event.ID),
        Value: sarama.ByteEncoder(event.Payload),
    }

    select {
    case p.producer.Input() <- msg:
        return nil
    case <-ctx.Done():
        return ctx.Err()
    }
}

⚙️ Worker Overview

1️⃣ Fetch events
2️⃣ Publish them
3️⃣ Mark processed or failed
4️⃣ Supports concurrency safely


📊 Delivery Semantics

xoutbox guarantees:

At‑least‑once delivery

Consumers must therefore be idempotent.


❤️ Author

Ali127Dev
https://github.com/Ali127Dev

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Event

type Event[T comparable] struct {
	ID        T
	EventType string
	Payload   []byte

	RetryCount int
	MaxRetries int

	Status Status

	CreatedAt   time.Time
	PublishedAt *time.Time
}

type Publisher

type Publisher[T comparable] interface {
	Publish(ctx context.Context, event Event[T]) error
}

type Status

type Status string
const (
	StatusPending    Status = "pending"
	StatusProcessing Status = "processing"
	StatusPublished  Status = "published"
	StatusFailed     Status = "failed"
	StatusDead       Status = "dead"
)

type Store

type Store[T comparable] interface {
	InsertEvent(ctx context.Context, event Event[T]) error
	FetchPending(ctx context.Context, limit int) ([]Event[T], error)
	MarkPublished(ctx context.Context, id T) error
	MarkFailed(ctx context.Context, id T, retryCount int) error
}

type Worker

type Worker[T comparable] struct {
	// contains filtered or unexported fields
}

func NewWorker

func NewWorker[T comparable](store Store[T], publisher Publisher[T], cfg WorkerConfig) *Worker[T]

func (*Worker[T]) Start

func (w *Worker[T]) Start(ctx context.Context) error

type WorkerConfig

type WorkerConfig struct {
	Interval    time.Duration
	BatchSize   int
	Concurrency int
}

func DefaultWorkerConfig

func DefaultWorkerConfig() WorkerConfig

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL