transaction

package
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 15, 2026 License: Apache-2.0 Imports: 7 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AddOffsetsToTxnRequest

type AddOffsetsToTxnRequest struct {
	TransactionID TransactionID
	ProducerID    ProducerID
	ProducerEpoch ProducerEpoch
	GroupID       string
}

AddOffsetsToTxnRequest adds consumer group offsets to transaction

type AddOffsetsToTxnResponse

type AddOffsetsToTxnResponse struct {
	ErrorCode ErrorCode
}

AddOffsetsToTxnResponse confirms offset addition

type AddPartitionsToTxnRequest

type AddPartitionsToTxnRequest struct {
	TransactionID TransactionID
	ProducerID    ProducerID
	ProducerEpoch ProducerEpoch
	Partitions    []PartitionMetadata
}

AddPartitionsToTxnRequest adds partitions to a transaction

type AddPartitionsToTxnResponse

type AddPartitionsToTxnResponse struct {
	Errors map[string]map[int32]ErrorCode // topic -> partition -> error
}

AddPartitionsToTxnResponse confirms partition addition

type CoordinatorConfig

type CoordinatorConfig struct {
	// Default transaction timeout
	DefaultTransactionTimeout time.Duration

	// Maximum transaction timeout allowed
	MaxTransactionTimeout time.Duration

	// How often to check for expired transactions
	ExpirationCheckInterval time.Duration

	// How long to keep completed transaction metadata
	TransactionRetentionTime time.Duration
}

CoordinatorConfig holds configuration for the transaction coordinator

func DefaultCoordinatorConfig

func DefaultCoordinatorConfig() CoordinatorConfig

DefaultCoordinatorConfig returns default configuration

type CoordinatorStats

type CoordinatorStats struct {
	ActiveTransactions    int
	CompletedTransactions int
	AbortedTransactions   int
	TotalProducers        int
}

CoordinatorStats holds coordinator statistics

type EndTxnRequest

type EndTxnRequest struct {
	TransactionID TransactionID
	ProducerID    ProducerID
	ProducerEpoch ProducerEpoch
	Commit        bool // true to commit, false to abort
}

EndTxnRequest commits or aborts a transaction

type EndTxnResponse

type EndTxnResponse struct {
	ErrorCode ErrorCode
}

EndTxnResponse confirms transaction completion

type ErrorCode

type ErrorCode int16

ErrorCode represents transaction-specific error codes

const (
	ErrorNone ErrorCode = iota
	ErrorInvalidProducerEpoch
	ErrorInvalidTransactionState
	ErrorInvalidProducerIDMapping
	ErrorTransactionCoordinatorNotAvailable
	ErrorTransactionCoordinatorFenced
	ErrorProducerFenced
	ErrorInvalidTransactionTimeout
	ErrorConcurrentTransactions
	ErrorTransactionAborted
	ErrorInvalidPartitionList
)

func (ErrorCode) Error

func (e ErrorCode) Error() string

Error returns the error message

func (ErrorCode) String

func (e ErrorCode) String() string

String returns the string representation of ErrorCode

type InitProducerIDRequest

type InitProducerIDRequest struct {
	TransactionID      TransactionID
	TransactionTimeout time.Duration
}

InitProducerIDRequest is sent to get a producer ID

type InitProducerIDResponse

type InitProducerIDResponse struct {
	ProducerID    ProducerID
	ProducerEpoch ProducerEpoch
	ErrorCode     ErrorCode
}

InitProducerIDResponse contains the assigned producer ID

type MemoryTransactionLog

type MemoryTransactionLog struct {
	// contains filtered or unexported fields
}

MemoryTransactionLog is an in-memory implementation of TransactionLog

func NewMemoryTransactionLog

func NewMemoryTransactionLog() *MemoryTransactionLog

NewMemoryTransactionLog creates a new in-memory transaction log

func (*MemoryTransactionLog) Append

Append adds a new entry to the log

func (*MemoryTransactionLog) Clear

func (l *MemoryTransactionLog) Clear()

Clear removes all entries from the log

func (*MemoryTransactionLog) Count

func (l *MemoryTransactionLog) Count() int

Count returns the number of entries in the log

func (*MemoryTransactionLog) Delete

func (l *MemoryTransactionLog) Delete(txnID TransactionID) error

Delete removes an entry from the log

func (*MemoryTransactionLog) Read

Read retrieves an entry from the log

func (*MemoryTransactionLog) ReadAll

func (l *MemoryTransactionLog) ReadAll() ([]*TransactionLogEntry, error)

ReadAll retrieves all entries from the log

type OffsetMetadata

type OffsetMetadata struct {
	Offset   int64
	Metadata string
}

OffsetMetadata contains offset and metadata

type PartitionMetadata

type PartitionMetadata struct {
	Topic     string
	Partition int32
}

PartitionMetadata represents a topic-partition involved in a transaction

type ProducerEpoch

type ProducerEpoch int16

ProducerEpoch is used to fence zombie producers

type ProducerID

type ProducerID int64

ProducerID is a unique identifier assigned to a transactional producer

type ProducerMetadata

type ProducerMetadata struct {
	ProducerID         ProducerID
	ProducerEpoch      ProducerEpoch
	TransactionID      TransactionID
	TransactionTimeout time.Duration
	LastProducerEpoch  ProducerEpoch
	LastTimestamp      time.Time
}

ProducerMetadata contains metadata about a transactional producer

type TransactionCoordinator

type TransactionCoordinator struct {
	// contains filtered or unexported fields
}

TransactionCoordinator manages transactional operations

func NewTransactionCoordinator

func NewTransactionCoordinator(txnLog TransactionLog, config CoordinatorConfig, logger *logging.Logger) *TransactionCoordinator

NewTransactionCoordinator creates a new transaction coordinator

func (*TransactionCoordinator) AddOffsetsToTxn

AddOffsetsToTxn adds consumer group offsets to a transaction

func (*TransactionCoordinator) AddPartitionsToTxn

AddPartitionsToTxn adds partitions to an ongoing transaction

func (*TransactionCoordinator) EndTxn

EndTxn commits or aborts a transaction

func (*TransactionCoordinator) GetTransactionState

func (tc *TransactionCoordinator) GetTransactionState(txnID TransactionID) (TransactionState, error)

GetTransactionState returns the current state of a transaction

func (*TransactionCoordinator) InitProducerID

InitProducerID initializes a producer ID for transactional operations

func (*TransactionCoordinator) Stats

Stats returns coordinator statistics

func (*TransactionCoordinator) Stop

func (tc *TransactionCoordinator) Stop()

Stop stops the coordinator

type TransactionID

type TransactionID string

TransactionID is a unique identifier for a transaction

type TransactionLog

type TransactionLog interface {
	Append(entry *TransactionLogEntry) error
	Read(txnID TransactionID) (*TransactionLogEntry, error)
	ReadAll() ([]*TransactionLogEntry, error)
	Delete(txnID TransactionID) error
}

TransactionLog interface for persisting transaction state

type TransactionLogEntry

type TransactionLogEntry struct {
	TransactionID TransactionID
	ProducerID    ProducerID
	ProducerEpoch ProducerEpoch
	State         TransactionState
	Partitions    []PartitionMetadata
	Timestamp     time.Time
}

TransactionLogEntry represents an entry in the transaction log

type TransactionMarker

type TransactionMarker struct {
	ProducerID    ProducerID
	ProducerEpoch ProducerEpoch
	Commit        bool // true for commit, false for abort
	Timestamp     int64
}

TransactionMarker represents a control message for transaction boundaries

type TransactionMetadata

type TransactionMetadata struct {
	TransactionID      TransactionID
	ProducerID         ProducerID
	ProducerEpoch      ProducerEpoch
	State              TransactionState
	Partitions         []PartitionMetadata
	TransactionTimeout time.Duration
	StartTime          time.Time
	LastUpdateTime     time.Time
}

TransactionMetadata contains metadata about a transaction

func (*TransactionMetadata) IsExpired

func (tm *TransactionMetadata) IsExpired() bool

IsExpired checks if the transaction has exceeded its timeout

type TransactionResult

type TransactionResult int

TransactionResult indicates the outcome of a transaction

const (
	ResultCommit TransactionResult = iota
	ResultAbort
)

type TransactionState

type TransactionState int

TransactionState represents the state of a transaction

const (
	// StateEmpty indicates no transaction is in progress
	StateEmpty TransactionState = iota

	// StateOngoing indicates transaction is in progress
	StateOngoing

	// StatePrepareCommit indicates transaction is preparing to commit
	StatePrepareCommit

	// StatePrepareAbort indicates transaction is preparing to abort
	StatePrepareAbort

	// StateCompleteCommit indicates transaction has been committed
	StateCompleteCommit

	// StateCompleteAbort indicates transaction has been aborted
	StateCompleteAbort
)

func (TransactionState) String

func (s TransactionState) String() string

String returns the string representation of TransactionState

type TxnOffsetCommitRequest

type TxnOffsetCommitRequest struct {
	TransactionID TransactionID
	GroupID       string
	ProducerID    ProducerID
	ProducerEpoch ProducerEpoch
	Offsets       map[string]map[int32]OffsetMetadata // topic -> partition -> offset
}

TxnOffsetCommitRequest commits offsets as part of transaction

type TxnOffsetCommitResponse

type TxnOffsetCommitResponse struct {
	Errors map[string]map[int32]ErrorCode // topic -> partition -> error
}

TxnOffsetCommitResponse confirms offset commit

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL