Documentation
¶
Index ¶
- type AddOffsetsToTxnRequest
- type AddOffsetsToTxnResponse
- type AddPartitionsToTxnRequest
- type AddPartitionsToTxnResponse
- type CoordinatorConfig
- type CoordinatorStats
- type EndTxnRequest
- type EndTxnResponse
- type ErrorCode
- type InitProducerIDRequest
- type InitProducerIDResponse
- type MemoryTransactionLog
- func (l *MemoryTransactionLog) Append(entry *TransactionLogEntry) error
- func (l *MemoryTransactionLog) Clear()
- func (l *MemoryTransactionLog) Count() int
- func (l *MemoryTransactionLog) Delete(txnID TransactionID) error
- func (l *MemoryTransactionLog) Read(txnID TransactionID) (*TransactionLogEntry, error)
- func (l *MemoryTransactionLog) ReadAll() ([]*TransactionLogEntry, error)
- type OffsetMetadata
- type PartitionMetadata
- type ProducerEpoch
- type ProducerID
- type ProducerMetadata
- type TransactionCoordinator
- func (tc *TransactionCoordinator) AddOffsetsToTxn(req *AddOffsetsToTxnRequest) (*AddOffsetsToTxnResponse, error)
- func (tc *TransactionCoordinator) AddPartitionsToTxn(req *AddPartitionsToTxnRequest) (*AddPartitionsToTxnResponse, error)
- func (tc *TransactionCoordinator) EndTxn(req *EndTxnRequest) (*EndTxnResponse, error)
- func (tc *TransactionCoordinator) GetTransactionState(txnID TransactionID) (TransactionState, error)
- func (tc *TransactionCoordinator) InitProducerID(req *InitProducerIDRequest) (*InitProducerIDResponse, error)
- func (tc *TransactionCoordinator) Stats() CoordinatorStats
- func (tc *TransactionCoordinator) Stop()
- type TransactionID
- type TransactionLog
- type TransactionLogEntry
- type TransactionMarker
- type TransactionMetadata
- type TransactionResult
- type TransactionState
- type TxnOffsetCommitRequest
- type TxnOffsetCommitResponse
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AddOffsetsToTxnRequest ¶
type AddOffsetsToTxnRequest struct {
TransactionID TransactionID
ProducerID ProducerID
ProducerEpoch ProducerEpoch
GroupID string
}
AddOffsetsToTxnRequest adds consumer group offsets to transaction
type AddOffsetsToTxnResponse ¶
type AddOffsetsToTxnResponse struct {
ErrorCode ErrorCode
}
AddOffsetsToTxnResponse confirms offset addition
type AddPartitionsToTxnRequest ¶
type AddPartitionsToTxnRequest struct {
TransactionID TransactionID
ProducerID ProducerID
ProducerEpoch ProducerEpoch
Partitions []PartitionMetadata
}
AddPartitionsToTxnRequest adds partitions to a transaction
type AddPartitionsToTxnResponse ¶
type AddPartitionsToTxnResponse struct {
Errors map[string]map[int32]ErrorCode // topic -> partition -> error
}
AddPartitionsToTxnResponse confirms partition addition
type CoordinatorConfig ¶
type CoordinatorConfig struct {
// Default transaction timeout
DefaultTransactionTimeout time.Duration
// Maximum transaction timeout allowed
MaxTransactionTimeout time.Duration
// How often to check for expired transactions
ExpirationCheckInterval time.Duration
// How long to keep completed transaction metadata
TransactionRetentionTime time.Duration
}
CoordinatorConfig holds configuration for the transaction coordinator
func DefaultCoordinatorConfig ¶
func DefaultCoordinatorConfig() CoordinatorConfig
DefaultCoordinatorConfig returns default configuration
type CoordinatorStats ¶
type CoordinatorStats struct {
ActiveTransactions int
CompletedTransactions int
AbortedTransactions int
TotalProducers int
}
CoordinatorStats holds coordinator statistics
type EndTxnRequest ¶
type EndTxnRequest struct {
TransactionID TransactionID
ProducerID ProducerID
ProducerEpoch ProducerEpoch
Commit bool // true to commit, false to abort
}
EndTxnRequest commits or aborts a transaction
type EndTxnResponse ¶
type EndTxnResponse struct {
ErrorCode ErrorCode
}
EndTxnResponse confirms transaction completion
type ErrorCode ¶
type ErrorCode int16
ErrorCode represents transaction-specific error codes
const ( ErrorNone ErrorCode = iota ErrorInvalidProducerEpoch ErrorInvalidTransactionState ErrorInvalidProducerIDMapping ErrorTransactionCoordinatorNotAvailable ErrorTransactionCoordinatorFenced ErrorProducerFenced ErrorInvalidTransactionTimeout ErrorConcurrentTransactions ErrorTransactionAborted ErrorInvalidPartitionList )
type InitProducerIDRequest ¶
type InitProducerIDRequest struct {
TransactionID TransactionID
TransactionTimeout time.Duration
}
InitProducerIDRequest is sent to get a producer ID
type InitProducerIDResponse ¶
type InitProducerIDResponse struct {
ProducerID ProducerID
ProducerEpoch ProducerEpoch
ErrorCode ErrorCode
}
InitProducerIDResponse contains the assigned producer ID
type MemoryTransactionLog ¶
type MemoryTransactionLog struct {
// contains filtered or unexported fields
}
MemoryTransactionLog is an in-memory implementation of TransactionLog
func NewMemoryTransactionLog ¶
func NewMemoryTransactionLog() *MemoryTransactionLog
NewMemoryTransactionLog creates a new in-memory transaction log
func (*MemoryTransactionLog) Append ¶
func (l *MemoryTransactionLog) Append(entry *TransactionLogEntry) error
Append adds a new entry to the log
func (*MemoryTransactionLog) Clear ¶
func (l *MemoryTransactionLog) Clear()
Clear removes all entries from the log
func (*MemoryTransactionLog) Count ¶
func (l *MemoryTransactionLog) Count() int
Count returns the number of entries in the log
func (*MemoryTransactionLog) Delete ¶
func (l *MemoryTransactionLog) Delete(txnID TransactionID) error
Delete removes an entry from the log
func (*MemoryTransactionLog) Read ¶
func (l *MemoryTransactionLog) Read(txnID TransactionID) (*TransactionLogEntry, error)
Read retrieves an entry from the log
func (*MemoryTransactionLog) ReadAll ¶
func (l *MemoryTransactionLog) ReadAll() ([]*TransactionLogEntry, error)
ReadAll retrieves all entries from the log
type OffsetMetadata ¶
OffsetMetadata contains offset and metadata
type PartitionMetadata ¶
PartitionMetadata represents a topic-partition involved in a transaction
type ProducerID ¶
type ProducerID int64
ProducerID is a unique identifier assigned to a transactional producer
type ProducerMetadata ¶
type ProducerMetadata struct {
ProducerID ProducerID
ProducerEpoch ProducerEpoch
TransactionID TransactionID
TransactionTimeout time.Duration
LastProducerEpoch ProducerEpoch
LastTimestamp time.Time
}
ProducerMetadata contains metadata about a transactional producer
type TransactionCoordinator ¶
type TransactionCoordinator struct {
// contains filtered or unexported fields
}
TransactionCoordinator manages transactional operations
func NewTransactionCoordinator ¶
func NewTransactionCoordinator(txnLog TransactionLog, config CoordinatorConfig, logger *logging.Logger) *TransactionCoordinator
NewTransactionCoordinator creates a new transaction coordinator
func (*TransactionCoordinator) AddOffsetsToTxn ¶
func (tc *TransactionCoordinator) AddOffsetsToTxn(req *AddOffsetsToTxnRequest) (*AddOffsetsToTxnResponse, error)
AddOffsetsToTxn adds consumer group offsets to a transaction
func (*TransactionCoordinator) AddPartitionsToTxn ¶
func (tc *TransactionCoordinator) AddPartitionsToTxn(req *AddPartitionsToTxnRequest) (*AddPartitionsToTxnResponse, error)
AddPartitionsToTxn adds partitions to an ongoing transaction
func (*TransactionCoordinator) EndTxn ¶
func (tc *TransactionCoordinator) EndTxn(req *EndTxnRequest) (*EndTxnResponse, error)
EndTxn commits or aborts a transaction
func (*TransactionCoordinator) GetTransactionState ¶
func (tc *TransactionCoordinator) GetTransactionState(txnID TransactionID) (TransactionState, error)
GetTransactionState returns the current state of a transaction
func (*TransactionCoordinator) InitProducerID ¶
func (tc *TransactionCoordinator) InitProducerID(req *InitProducerIDRequest) (*InitProducerIDResponse, error)
InitProducerID initializes a producer ID for transactional operations
func (*TransactionCoordinator) Stats ¶
func (tc *TransactionCoordinator) Stats() CoordinatorStats
Stats returns coordinator statistics
func (*TransactionCoordinator) Stop ¶
func (tc *TransactionCoordinator) Stop()
Stop stops the coordinator
type TransactionID ¶
type TransactionID string
TransactionID is a unique identifier for a transaction
type TransactionLog ¶
type TransactionLog interface {
Append(entry *TransactionLogEntry) error
Read(txnID TransactionID) (*TransactionLogEntry, error)
ReadAll() ([]*TransactionLogEntry, error)
Delete(txnID TransactionID) error
}
TransactionLog interface for persisting transaction state
type TransactionLogEntry ¶
type TransactionLogEntry struct {
TransactionID TransactionID
ProducerID ProducerID
ProducerEpoch ProducerEpoch
State TransactionState
Partitions []PartitionMetadata
Timestamp time.Time
}
TransactionLogEntry represents an entry in the transaction log
type TransactionMarker ¶
type TransactionMarker struct {
ProducerID ProducerID
ProducerEpoch ProducerEpoch
Commit bool // true for commit, false for abort
Timestamp int64
}
TransactionMarker represents a control message for transaction boundaries
type TransactionMetadata ¶
type TransactionMetadata struct {
TransactionID TransactionID
ProducerID ProducerID
ProducerEpoch ProducerEpoch
State TransactionState
Partitions []PartitionMetadata
TransactionTimeout time.Duration
StartTime time.Time
LastUpdateTime time.Time
}
TransactionMetadata contains metadata about a transaction
func (*TransactionMetadata) IsExpired ¶
func (tm *TransactionMetadata) IsExpired() bool
IsExpired checks if the transaction has exceeded its timeout
type TransactionResult ¶
type TransactionResult int
TransactionResult indicates the outcome of a transaction
const ( ResultCommit TransactionResult = iota ResultAbort )
type TransactionState ¶
type TransactionState int
TransactionState represents the state of a transaction
const ( // StateEmpty indicates no transaction is in progress StateEmpty TransactionState = iota // StateOngoing indicates transaction is in progress StateOngoing // StatePrepareCommit indicates transaction is preparing to commit StatePrepareCommit // StatePrepareAbort indicates transaction is preparing to abort StatePrepareAbort // StateCompleteCommit indicates transaction has been committed StateCompleteCommit // StateCompleteAbort indicates transaction has been aborted StateCompleteAbort )
func (TransactionState) String ¶
func (s TransactionState) String() string
String returns the string representation of TransactionState
type TxnOffsetCommitRequest ¶
type TxnOffsetCommitRequest struct {
TransactionID TransactionID
GroupID string
ProducerID ProducerID
ProducerEpoch ProducerEpoch
Offsets map[string]map[int32]OffsetMetadata // topic -> partition -> offset
}
TxnOffsetCommitRequest commits offsets as part of transaction
type TxnOffsetCommitResponse ¶
type TxnOffsetCommitResponse struct {
Errors map[string]map[int32]ErrorCode // topic -> partition -> error
}
TxnOffsetCommitResponse confirms offset commit