datafx

package
v0.7.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 10, 2025 License: Apache-2.0 Imports: 7 Imported by: 0

README

ajan/datafx

Overview

datafx package provides a high-level, technology-agnostic data persistence abstraction layer. It sits on top of connfx (the connection/adapter layer) and offers simple, consistent data operations that work with any storage technology - whether Redis, PostgreSQL, MongoDB, DynamoDB, or others.

The key principle is separation of concerns: connfx handles infrastructure (connections, drivers, protocols) while datafx handles business logic (data operations, transactions, queues). This architecture allows you to write storage-agnostic code that can easily switch between different backends without changing your business logic.

Architecture

datafx (Business Layer)
    ↓ depends on
connfx (Adapter Layer)
    ↓ implements
Storage Technologies (Redis, PostgreSQL, MongoDB, AMQP/RabbitMQ, etc.)

Configuration

datafx depends on connfx for connection management. You configure connections through connfx and then create datafx instances from those connections.

Connection Configuration (via connfx)
// Redis configuration example
redisConfig := &connfx.ConfigTarget{
    Protocol: "redis",
    Host:     "localhost",
    Port:     6379,
    DSN:      "redis://localhost:6379",
}

// PostgreSQL configuration example
postgresConfig := &connfx.ConfigTarget{
    Protocol: "postgres",
    Host:     "localhost",
    Port:     5432,
    DSN:      "postgres://user:pass@localhost:5432/dbname",
}

// MongoDB configuration example
mongoConfig := &connfx.ConfigTarget{
    Protocol: "mongodb",
    Host:     "localhost",
    Port:     27017,
    DSN:      "mongodb://localhost:27017/mydb",
}

// AMQP/RabbitMQ configuration example
amqpConfig := &connfx.ConfigTarget{
    Protocol: "amqp",
    Host:     "localhost",
    Port:     5672,
    DSN:      "amqp://guest:guest@localhost:5672/",
}
Data Types

datafx works with standard Go types and automatically handles JSON marshaling:

// Example user type
type User struct {
    ID    string `json:"id"`
    Name  string `json:"name"`
    Email string `json:"email"`
}

// Example product type
type Product struct {
    ID       string  `json:"id"`
    Name     string  `json:"name"`
    Price    float64 `json:"price"`
    Category string  `json:"category"`
}

// Example message types for queues
type OrderEvent struct {
    OrderID   string    `json:"order_id"`
    CustomerID string   `json:"customer_id"`
    Amount    float64   `json:"amount"`
    Timestamp time.Time `json:"timestamp"`
}

type NotificationMessage struct {
    UserID  string `json:"user_id"`
    Type    string `json:"type"`
    Content string `json:"content"`
}

Features

  • Technology Agnostic: Same API works with Redis, SQL databases, document stores, message queues, etc.
  • Automatic Marshaling: Handles JSON serialization/deserialization transparently
  • Transaction Support: ACID transactions for compatible storage backends
  • Cache Support: High-performance caching with TTL/expiration support
  • Queue Support: Message queue operations with automatic reconnection and acknowledgment
  • Connection Behaviors: Automatic capability detection (key-value, document, relational, transactional, cache, queue)
  • Type Safety: Compile-time interface verification and generic type support
  • Error Handling: Comprehensive error context with sentinel errors
  • Raw Data Support: Work with []byte directly when needed
  • Extensible: Easy to add new storage adapters without changing business code

API

Basic Usage
import (
    "context"
    "github.com/eser/ajan/connfx"
    "github.com/eser/ajan/connfx/adapters"
    "github.com/eser/ajan/datafx"
    "github.com/eser/ajan/logfx"
)

func main() {
    ctx := context.Background()

    // Setup connection registry
    logger := logfx.NewLogger(os.Stdout, &logfx.Config{Level: logfx.LevelInfo})
    registry := connfx.NewRegistryWithDefaults(logger)

    // Configure connection
    config := &connfx.ConfigTarget{
        Protocol: "redis",
        Host:     "localhost",
        Port:     6379,
        DSN:      "redis://localhost:6379",
    }

    // Add connection to registry
    conn, err := registry.AddConnection(ctx, connfx.DefaultConnection, config)
    if err != nil {
        log.Fatal(err)
    }

    // Create datafx.Store instance
    data, err := datafx.NewStore(conn)
    if err != nil {
        log.Fatal(err)
    }

    // Use data operations
    user := &User{
        ID:    "user123",
        Name:  "John Doe",
        Email: "john@example.com",
    }

    // Set data (auto-marshaled to JSON)
    err = data.Set(ctx, "user:123", user)

    // Get data (auto-unmarshaled from JSON)
    var retrievedUser User
    err = data.Get(ctx, "user:123", &retrievedUser)

    // Check existence
    exists, err := data.Exists(ctx, "user:123")

    // Update data
    user.Name = "John Smith"
    err = data.Update(ctx, "user:123", user)

    // Remove data
    err = data.Remove(ctx, "user:123")
}
Core Operations
// Set data - automatically marshaled to JSON
err := data.Set(ctx, "user:123", user)

// Get data - automatically unmarshaled from JSON
var user User
err := data.Get(ctx, "user:123", &user)

// Update existing data
err := data.Update(ctx, "user:123", updatedUser)

// Remove data
err := data.Remove(ctx, "user:123")

// Check if key exists
exists, err := data.Exists(ctx, "user:123")
Raw Byte Operations
// Set raw bytes
err := data.SetRaw(ctx, "key", []byte("raw data"))

// Get raw bytes
rawData, err := data.GetRaw(ctx, "key")

// Update raw bytes
err := data.UpdateRaw(ctx, "key", []byte("updated raw data"))
Transactional Operations

For storage backends that support transactions:

// Create transactional store instance
txData, err := datafx.NewTransactionalStore(conn)
if err != nil {
    log.Fatal(err)
}

// Execute operations within a transaction
err = txData.ExecuteTransaction(ctx, func(tx *datafx.TransactionStore) error {
    // All operations within this function are transactional
    user := &User{ID: "123", Name: "John"}

    if err := tx.Set(ctx, "user:123", user); err != nil {
        return err // Transaction will be rolled back
    }

    product := &Product{ID: "456", Name: "Widget", Price: 19.99}
    if err := tx.Set(ctx, "product:456", product); err != nil {
        return err // Transaction will be rolled back
    }

    return nil // Transaction will be committed
})

if err != nil {
    log.Printf("Transaction failed: %v", err)
}
Working with Multiple Connections
// Add multiple connections
cacheConn, err = registry.AddConnection(ctx, "redis-cache", redisConfig)
dbConn, err = registry.AddConnection(ctx, "postgres-main", postgresConfig)

// Create separate data instances
cache, _ := datafx.NewCache(cacheConn)
database, _ := datafx.NewStore(dbConn)

// Use them independently
cache.Set(ctx, "session:abc", sessionData)  // Goes to Redis
database.Set(ctx, "user:123", userData)    // Goes to PostgreSQL
Connection Discovery by Behavior
// Find all key-value storage connections
kvConnections := registry.GetByBehavior(connfx.ConnectionBehaviorKeyValue)

// Find all transactional connections
txConnections := registry.GetByBehavior(connfx.ConnectionBehaviorTransactional)

// Find all relational database connections
sqlConnections := registry.GetByBehavior(connfx.ConnectionBehaviorRelational)

// Use the first available key-value store
if len(kvConnections) > 0 {
    cache, _ := datafx.NewStore(kvConnections[0])
    cache.Set(ctx, "temp:data", someData)
}
Cache Operations

For connections that support caching (e.g., Redis):

// Create cache instance
cache, err := datafx.NewCache(conn)
if err != nil {
    log.Fatal(err)
}

// Set with expiration
user := &User{ID: "123", Name: "John"}
err = cache.Set(ctx, "user:123", user, 5*time.Minute)

// Get cached value
var cachedUser User
err = cache.Get(ctx, "user:123", &cachedUser)

// Check TTL
ttl, err := cache.GetTTL(ctx, "user:123")
fmt.Printf("TTL: %v\n", ttl)

// Set expiration on existing key
err = cache.Expire(ctx, "user:123", 10*time.Minute)

// Delete from cache
err = cache.Delete(ctx, "user:123")

// Raw cache operations
err = cache.SetRaw(ctx, "session:abc", []byte("session-data"), time.Hour)
rawData, err := cache.GetRaw(ctx, "session:abc")
Queue Operations

For connections that support message queues (e.g., AMQP/RabbitMQ):

import (
    "context"
    "github.com/eser/ajan/connfx"
    "github.com/eser/ajan/connfx/adapters"
    "github.com/eser/ajan/datafx"
    "github.com/eser/ajan/logfx"
)

func main() {
    ctx := context.Background()

    // Setup connection registry
    logger := logfx.NewLogger(os.Stdout, &logfx.Config{Level: logfx.LevelInfo})
    registry := connfx.NewRegistryWithDefaults(logger)

    // Configure AMQP connection
    config := &connfx.ConfigTarget{
        Protocol: "amqp",
        Host:     "localhost",
        Port:     5672,
        DSN:      "amqp://guest:guest@localhost:5672/",
    }

    // Add connection to registry
    conn, err := registry.AddConnection(ctx, "message-broker", config)
    if err != nil {
        log.Fatal(err)
    }

    // Create queue instance
    queue, err := datafx.NewQueue(conn)
    if err != nil {
        log.Fatal(err)
    }

    // Declare a queue
    queueName, err := queue.DeclareQueue(ctx, "user-events")
    if err != nil {
        log.Fatal(err)
    }

    // Publish messages
    orderEvent := &OrderEvent{
        OrderID:    "order-123",
        CustomerID: "customer-456",
        Amount:     99.99,
        Timestamp:  time.Now(),
    }

    err = queue.Publish(ctx, queueName, orderEvent)
    if err != nil {
        log.Fatal(err)
    }

    // Consume messages with custom configuration
    config := connfx.DefaultConsumerConfig()
    config.AutoAck = false // Manual acknowledgment

    messages, errors := queue.Consume(ctx, queueName, config)

    // Handle messages
    go func() {
        for {
            select {
            case msg := <-messages:
                var event OrderEvent
                if err := json.Unmarshal(msg.Body, &event); err != nil {
                    log.Printf("Failed to unmarshal message: %v", err)
                    msg.Nack(false) // Don't requeue invalid messages
                    continue
                }

                // Process the event
                log.Printf("Processing order: %s for customer: %s",
                    event.OrderID, event.CustomerID)

                // Acknowledge successful processing
                if err := msg.Ack(); err != nil {
                    log.Printf("Failed to ack message: %v", err)
                }

            case err := <-errors:
                log.Printf("Queue error: %v", err)
            case <-ctx.Done():
                return
            }
        }
    }()

    // Or use the convenient ProcessMessages method
    err = queue.ProcessMessages(ctx, queueName, config,
        func(ctx context.Context, message any) bool {
            event := message.(*OrderEvent)
            log.Printf("Processing order: %s", event.OrderID)

            // Return true to acknowledge, false to nack with requeue
            return true
        },
        &OrderEvent{}, // Message type for unmarshalling
    )
}
Queue Consumer Configuration
// Default configuration
config := connfx.DefaultConsumerConfig()

// Custom configuration
config := connfx.ConsumerConfig{
    AutoAck:   false, // Manual acknowledgment
    Exclusive: true,  // Exclusive access to queue
    NoLocal:   false, // Receive messages from this connection
    NoWait:    false, // Wait for server response
    Args:      nil,   // Additional arguments
}

// Start consuming with configuration
messages, errors := queue.Consume(ctx, "my-queue", config)
Raw Queue Operations
// Publish raw bytes
rawData := []byte(`{"type": "raw", "data": "some data"}`)
err = queue.PublishRaw(ctx, queueName, rawData)

// Consume with defaults
messages, errors := queue.ConsumeWithDefaults(ctx, queueName)

// Process raw messages
for msg := range messages {
    log.Printf("Received raw message: %s", string(msg.Body))

    // Access headers
    if contentType, ok := msg.Headers["content-type"]; ok {
        log.Printf("Content-Type: %v", contentType)
    }

    // Acknowledge message
    msg.Ack()
}

Connection Behaviors

datafx automatically detects and works with different storage capabilities:

  • ConnectionBehaviorKeyValue: Redis, Memcached, etc.
  • ConnectionBehaviorDocument: MongoDB, CouchDB, etc.
  • ConnectionBehaviorRelational: PostgreSQL, MySQL, SQLite, etc.
  • ConnectionBehaviorTransactional: Any storage supporting ACID transactions
  • ConnectionBehaviorCache: Redis, Memcached, etc. (with TTL/expiration support)
  • ConnectionBehaviorQueue: RabbitMQ, Apache Kafka, AWS SQS, etc.

Error Handling

datafx uses sentinel errors for consistent error handling:

import "errors"

// Check for specific errors
if errors.Is(err, datafx.ErrKeyNotFound) {
    // Handle key not found
}

if errors.Is(err, datafx.ErrConnectionNotSupported) {
    // Handle unsupported connection
}

if errors.Is(err, datafx.ErrTransactionFailed) {
    // Handle transaction failure
}

if errors.Is(err, datafx.ErrQueueNotSupported) {
    // Handle queue not supported
}

if errors.Is(err, datafx.ErrMessageProcessing) {
    // Handle message processing failure
}

Extending with New Storage Types

To add support for a new storage technology (e.g., Apache Kafka):

1. Implement connfx.Connection Interface
type KafkaConnection struct {
    client KafkaClient
}

func (kc *KafkaConnection) GetBehaviors() []connfx.ConnectionBehavior {
    return []connfx.ConnectionBehavior{
        connfx.ConnectionBehaviorStreaming,
        connfx.ConnectionBehaviorQueue,
    }
}
// ... implement other Connection methods
2. Implement connfx.QueueRepository Interface
func (kc *KafkaConnection) QueueDeclare(ctx context.Context, name string) (string, error) {
    // Kafka topic creation implementation
}

func (kc *KafkaConnection) Publish(ctx context.Context, queueName string, body []byte) error {
    // Kafka producer implementation
}

func (kc *KafkaConnection) Consume(ctx context.Context, queueName string, config connfx.ConsumerConfig) (<-chan connfx.Message, <-chan error) {
    // Kafka consumer implementation
}
3. Create Factory and Register
kafkaFactory := connfx.NewKafkaFactory()
registry.RegisterFactory(kafkaFactory)

// Now datafx works with Kafka!
queue, _ := datafx.NewQueue(registry.GetDefault())
queue.Publish(ctx, "my-topic", message) // Publishes to Kafka

Best Practices

  1. Use JSON Operations: Prefer Publish()/ProcessMessages() over PublishRaw() for automatic marshaling
  2. Handle Errors: Always check for specific sentinel errors
  3. Use Transactions: For operations that require consistency across multiple keys
  4. Connection Pooling: Let connfx handle connection lifecycle and pooling
  5. Behavior-Based Selection: Use GetByBehavior() for flexible connection selection
  6. Queue Management: Always declare queues before publishing/consuming
  7. Acknowledgment: Use manual acknowledgment for reliable message processing
  8. Separation of Concerns: Keep business logic in datafx, infrastructure concerns in connfx

Benefits

  • Vendor Independence: Switch storage backends without code changes
  • Consistent API: Same operations across all storage types
  • Type Safety: Compile-time verification and generic support
  • Transaction Support: ACID transactions where available
  • Queue Reliability: Automatic reconnection and message acknowledgment
  • Easy Testing: Mock connfx interfaces for unit tests
  • Extensible: Add new storage types without touching existing code
  • Performance: Direct adapter implementations without unnecessary layers

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrCacheNotSupported = errors.New("connection does not support cache operations")
	ErrKeyExpired        = errors.New("key has expired")
	ErrCacheOperation    = errors.New("cache operation failed")
)
View Source
var (
	ErrQueueNotSupported = errors.New("connection does not support queue operations")
	ErrMessageProcessing = errors.New("message processing failed")
	ErrContextCanceled   = errors.New("context canceled")
	ErrQueueOperation    = errors.New("queue operation failed")
)
View Source
var (
	ErrConnectionNotSupported = errors.New("connection does not support required operations")
	ErrKeyNotFound            = errors.New("key not found")
	ErrFailedToMarshal        = errors.New("failed to marshal data")
	ErrFailedToUnmarshal      = errors.New("failed to unmarshal data")
	ErrInvalidData            = errors.New("invalid data")
	ErrRepositoryOperation    = errors.New("repository operation failed")
)
View Source
var (
	ErrTransactionNotSupported = errors.New("connection does not support transactions")
	ErrTransactionFailed       = errors.New("transaction failed")
	ErrTransactionOperation    = errors.New("transaction operation failed")
)

Functions

This section is empty.

Types

type Cache added in v0.7.0

type Cache struct {
	// contains filtered or unexported fields
}

Cache provides high-level cache operations with expiration support.

func NewCache added in v0.7.0

func NewCache(conn connfx.Connection) (*Cache, error)

NewCache creates a new Cache instance from a connfx connection. The connection must support cache operations.

func (*Cache) Delete added in v0.7.0

func (c *Cache) Delete(ctx context.Context, key string) error

Delete removes a key from the cache.

func (*Cache) Exists added in v0.7.0

func (c *Cache) Exists(ctx context.Context, key string) (bool, error)

Exists checks if a key exists in the cache.

func (*Cache) Expire added in v0.7.0

func (c *Cache) Expire(ctx context.Context, key string, expiration time.Duration) error

Expire sets an expiration time for an existing key.

func (*Cache) Get added in v0.7.0

func (c *Cache) Get(ctx context.Context, key string, dest any) error

Get retrieves a value by key and unmarshals it into the provided destination.

func (*Cache) GetConnection added in v0.7.0

func (c *Cache) GetConnection() connfx.Connection

GetConnection returns the underlying connfx connection.

func (*Cache) GetRaw added in v0.7.0

func (c *Cache) GetRaw(ctx context.Context, key string) ([]byte, error)

GetRaw retrieves raw bytes by key.

func (*Cache) GetRepository added in v0.7.0

func (c *Cache) GetRepository() connfx.CacheRepository

GetRepository returns the underlying cache repository.

func (*Cache) GetTTL added in v0.7.0

func (c *Cache) GetTTL(ctx context.Context, key string) (time.Duration, error)

GetTTL returns the time-to-live for a key.

func (*Cache) Set added in v0.7.0

func (c *Cache) Set(ctx context.Context, key string, value any, expiration time.Duration) error

Set stores a value with the given key and expiration time after marshaling it to JSON.

func (*Cache) SetRaw added in v0.7.0

func (c *Cache) SetRaw(
	ctx context.Context,
	key string,
	value []byte,
	expiration time.Duration,
) error

SetRaw stores raw bytes with the given key and expiration time.

type Queue added in v0.7.0

type Queue struct {
	// contains filtered or unexported fields
}

Queue provides high-level message queue operations.

func NewQueue added in v0.7.0

func NewQueue(conn connfx.Connection) (*Queue, error)

NewQueue creates a new Queue instance from a connfx connection. The connection must support queue operations.

func (*Queue) Consume added in v0.7.0

func (q *Queue) Consume(
	ctx context.Context,
	queueName string,
	config connfx.ConsumerConfig,
) (<-chan connfx.Message, <-chan error)

Consume starts consuming messages from a queue with the given configuration. Returns channels for messages and errors.

func (*Queue) ConsumeWithDefaults added in v0.7.0

func (q *Queue) ConsumeWithDefaults(
	ctx context.Context,
	queueName string,
) (<-chan connfx.Message, <-chan error)

ConsumeWithDefaults starts consuming messages from a queue with default configuration.

func (*Queue) DeclareQueue added in v0.7.0

func (q *Queue) DeclareQueue(ctx context.Context, name string) (string, error)

DeclareQueue declares a queue and returns its name.

func (*Queue) GetConnection added in v0.7.0

func (q *Queue) GetConnection() connfx.Connection

GetConnection returns the underlying connfx connection.

func (*Queue) GetRepository added in v0.7.0

func (q *Queue) GetRepository() connfx.QueueRepository

GetRepository returns the underlying queue repository.

func (*Queue) ProcessMessages added in v0.7.0

func (q *Queue) ProcessMessages(
	ctx context.Context,
	queueName string,
	config connfx.ConsumerConfig,
	messageHandler func(ctx context.Context, message any) bool,
	messageType any,
) error

ProcessMessages provides a convenient way to process messages with automatic unmarshalling. The messageHandler function receives the unmarshaled message and should return true to acknowledge the message, or false to negatively acknowledge it.

func (*Queue) ProcessMessagesWithDefaults added in v0.7.0

func (q *Queue) ProcessMessagesWithDefaults(
	ctx context.Context,
	queueName string,
	messageHandler func(ctx context.Context, message any) bool,
	messageType any,
) error

ProcessMessagesWithDefaults processes messages with default consumer configuration.

func (*Queue) Publish added in v0.7.0

func (q *Queue) Publish(ctx context.Context, queueName string, message any) error

Publish sends a message to a queue after marshaling it to JSON.

func (*Queue) PublishRaw added in v0.7.0

func (q *Queue) PublishRaw(ctx context.Context, queueName string, data []byte) error

PublishRaw sends raw bytes to a queue.

type Store added in v0.7.0

type Store struct {
	// contains filtered or unexported fields
}

Store provides high-level data persistence operations.

func NewStore added in v0.7.0

func NewStore(conn connfx.Connection) (*Store, error)

New creates a new Store instance from a connfx connection. The connection must support data repository operations.

func (*Store) Exists added in v0.7.0

func (s *Store) Exists(ctx context.Context, key string) (bool, error)

Exists checks if a key exists.

func (*Store) Get added in v0.7.0

func (s *Store) Get(ctx context.Context, key string, dest any) error

Get retrieves a value by key and unmarshals it into the provided destination.

func (*Store) GetConnection added in v0.7.0

func (s *Store) GetConnection() connfx.Connection

GetConnection returns the underlying connfx connection.

func (*Store) GetRaw added in v0.7.0

func (s *Store) GetRaw(ctx context.Context, key string) ([]byte, error)

GetRaw retrieves raw bytes by key.

func (*Store) GetRepository added in v0.7.0

func (s *Store) GetRepository() connfx.Repository

GetRepository returns the underlying data repository.

func (*Store) Remove added in v0.7.0

func (s *Store) Remove(ctx context.Context, key string) error

Remove deletes a value by key.

func (*Store) Set added in v0.7.0

func (s *Store) Set(ctx context.Context, key string, value any) error

Set stores a value with the given key after marshaling it to JSON.

func (*Store) SetRaw added in v0.7.0

func (s *Store) SetRaw(ctx context.Context, key string, value []byte) error

SetRaw stores raw bytes with the given key.

func (*Store) Update added in v0.7.0

func (s *Store) Update(ctx context.Context, key string, value any) error

Update updates an existing value by key after marshaling it to JSON.

func (*Store) UpdateRaw added in v0.7.0

func (s *Store) UpdateRaw(ctx context.Context, key string, value []byte) error

UpdateRaw updates an existing value with raw bytes by key.

type TransactionStore added in v0.7.0

type TransactionStore struct {
	Repository connfx.Repository
}

TransactionStore provides data operations within a transaction context.

func (*TransactionStore) Exists added in v0.7.0

func (ts *TransactionStore) Exists(ctx context.Context, key string) (bool, error)

Exists checks if a key exists.

func (*TransactionStore) Get added in v0.7.0

func (ts *TransactionStore) Get(ctx context.Context, key string, dest any) error

Get retrieves a value by key and unmarshals it into the provided destination.

func (*TransactionStore) GetRaw added in v0.7.0

func (ts *TransactionStore) GetRaw(ctx context.Context, key string) ([]byte, error)

GetRaw retrieves raw bytes by key.

func (*TransactionStore) Remove added in v0.7.0

func (ts *TransactionStore) Remove(ctx context.Context, key string) error

Remove deletes a value by key.

func (*TransactionStore) Set added in v0.7.0

func (ts *TransactionStore) Set(ctx context.Context, key string, value any) error

Set stores a value with the given key after marshaling it to JSON.

func (*TransactionStore) SetRaw added in v0.7.0

func (ts *TransactionStore) SetRaw(ctx context.Context, key string, value []byte) error

SetRaw stores raw bytes with the given key.

func (*TransactionStore) Update added in v0.7.0

func (ts *TransactionStore) Update(ctx context.Context, key string, value any) error

Update updates an existing value by key after marshaling it to JSON.

func (*TransactionStore) UpdateRaw added in v0.7.0

func (ts *TransactionStore) UpdateRaw(ctx context.Context, key string, value []byte) error

UpdateRaw updates an existing value with raw bytes by key.

type TransactionalStore added in v0.7.0

type TransactionalStore struct {
	*Store
	// contains filtered or unexported fields
}

TransactionalStore provides transactional store operations.

func NewTransactionalStore added in v0.7.0

func NewTransactionalStore(conn connfx.Connection) (*TransactionalStore, error)

NewTransactionalStore creates a new TransactionalStore instance from a connfx connection. The connection must support transactional operations.

func (*TransactionalStore) ExecuteTransaction added in v0.7.0

func (ts *TransactionalStore) ExecuteTransaction(
	ctx context.Context,
	fn func(*TransactionStore) error,
) error

ExecuteTransaction executes a function within a transaction context. If the function returns an error, the transaction is rolled back. If the function succeeds, the transaction is committed.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL