Documentation
¶
Index ¶
- func ComputeChangeID(txID, table string, data map[string]interface{}, lsn []byte, op Operation) string
- type BatchContext
- type CaptureChange
- type CaptureResult
- type Capturer
- type CapturerName
- type Change
- type DataSet
- type DeliveryReason
- type LSN
- type Operation
- type RunState
- type Runtime
- type Sink
- type SinkConfig
- type StateManager
- type Transaction
- type TransactionBuffer
- func (tb *TransactionBuffer) Add(change Change)
- func (tb *TransactionBuffer) Close()
- func (tb *TransactionBuffer) Flush() []*Transaction
- func (tb *TransactionBuffer) GetCompleteTransactions() []*Transaction
- func (tb *TransactionBuffer) GetTableMaxCommitTime(table string) (time.Time, bool)
- func (tb *TransactionBuffer) IsEmpty() bool
- func (tb *TransactionBuffer) SetComplete(txID string) *Transaction
- func (tb *TransactionBuffer) SetOnComplete(fn func(*Transaction))
- func (tb *TransactionBuffer) SetOnTimeout(fn func(*Transaction, DeliveryReason))
- func (tb *TransactionBuffer) Size() int
- func (tb *TransactionBuffer) UpdateTableMaxCommitTime(table string, commitTime time.Time)
- type TransactionWithBatch
- type Transformer
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ComputeChangeID ¶
func ComputeChangeID(txID, table string, data map[string]interface{}, lsn []byte, op Operation) string
ComputeChangeID computes a content-based hash ID for a change. It uses the same algorithm as store/sqlite to ensure consistency. The ID is deterministic: same input always produces the same ID.
Types ¶
type BatchContext ¶
type BatchContext struct {
// SkillName is the skill being executed (for sink logging)
SkillName string
// BatchID is a unique identifier for each poll cycle (root trace ID)
// Format: {timestamp-ms}-{uuid-short-8chars}
BatchID string
// StartTime is when this poll cycle started
StartTime time.Time
}
BatchContext carries observability context through the pipeline. It is created at the start of each poll cycle and passed to handlers, skills, and sinks for logging correlation.
func NewBatchContext ¶
func NewBatchContext() *BatchContext
NewBatchContext creates a new BatchContext with a unique BatchID
type CaptureChange ¶
type CaptureChange map[string]interface{}
CaptureChange represents a single row change captured from CDC. This is the intermediate type used between Capturer and Runtime to avoid import cycles.
type CaptureResult ¶
type CaptureResult struct {
Changes []CaptureChange
BatchID string // Unique identifier for this fetch batch
NextCapturer CapturerName // Which capturer to use for next fetch
}
CaptureResult carries data and metadata from a Capturer fetch operation.
type Capturer ¶
type Capturer interface {
// Fetch returns the next batch of changes and indicates which capturer to use next.
Fetch(ctx context.Context) *CaptureResult
// Stop signals the capturer to stop fetching.
// Called by Runtime when the overall system is shutting down.
Stop()
}
Capturer is the interface for data ingestion sources. Each implementation corresponds to a CDC mode: incremental (Change), Snapshot, or Replay. Capturer only handles data fetching — post-fetch processing (Transformer, DLQ, offset) is handled by Runtime.
type CapturerName ¶
type CapturerName string
CapturerName represents the name of a data capturer.
const ( CapturerCDC CapturerName = "cdc" CapturerSnapshot CapturerName = "snapshot" CapturerReplay CapturerName = "replay" )
type Change ¶
type Change struct {
Table string `json:"table"`
TransactionID string `json:"transaction_id"`
LSN []byte `json:"lsn"`
Operation Operation `json:"operation"`
Data map[string]interface{} `json:"data"`
CommitTime time.Time `json:"commit_time"` // Transaction commit time from source database
ID string `json:"id"` // Content-based hash ID for deduplication
TableKeys string `json:"table_keys"` // Primary key values (comma-separated)
}
Change represents a single row change
type DeliveryReason ¶
type DeliveryReason string
DeliveryReason indicates why a transaction was delivered
const ( DeliveryReasonCommitTime DeliveryReason = "commit_time" // All involved tables indicated transaction complete DeliveryReasonPollInterval DeliveryReason = "poll_interval" // Poll interval gating safety boundary DeliveryReasonTimeout DeliveryReason = "timeout" // maxWaitTime timeout fallback DeliveryReasonBatchLimit DeliveryReason = "batch_limit" // Batch size limits reached )
type LSN ¶
type LSN []byte
LSN represents a Log Sequence Number
type RunState ¶
type RunState string
RunState represents the current running state of the system.
const ( // StateIdle means no operation is running. StateIdle RunState = "idle" // StatePolling means CDC Poller is running. StatePolling RunState = "polling" // StateReplay means Replay service is running. StateReplay RunState = "replay" // StateSnapshot means Snapshot service is running (reserved for future). StateSnapshot RunState = "snapshot" )
type Runtime ¶
type Runtime struct {
// contains filtered or unexported fields
}
Runtime orchestrates data capture and ETL processing. It manages all three capturers (CDC, Snapshot, Replay) and handles post-fetch logic (Transformer, DLQ, state management). Note: Store is now internal to ChangeCapturer; Runtime does not manage it.
func NewRuntime ¶
func NewRuntime( capturers map[CapturerName]Capturer, transformer Transformer, dlq *dlq.DLQ, monitorDB *monitor.DB, ) *Runtime
NewRuntime creates a new Runtime with all capturers initialized.
func (*Runtime) Close ¶
func (r *Runtime) Close()
Close stops all capturers. Called when runtime is shutting down.
func (*Runtime) Run ¶
Run starts the runtime and loops until context is cancelled. It polls the current capturer on each tick and switches capturers based on NextCapturer.
func (*Runtime) SwitchTo ¶
func (r *Runtime) SwitchTo(name CapturerName)
SwitchTo switches the current capturer to the specified one. Called by Dashboard to manually switch between capturers.
type Sink ¶
type Sink struct {
Config SinkConfig
DataSet *DataSet
OpType Operation // Uses core.Operation from transaction.go
}
Sink represents a sink operation from SQL plugin
type SinkConfig ¶
type SinkConfig struct {
Name string
Database string // Database name (maps to platform-configured storage)
Output string
PrimaryKey string
OnConflict string
}
SinkConfig represents sink configuration
type StateManager ¶
type StateManager struct {
// contains filtered or unexported fields
}
StateManager coordinates the running state of Poller, Replay, and Snapshot. It acts as a simple state marker - operations check state and pause themselves. State is stored in memory and resets to idle on restart.
func NewStateManager ¶
func NewStateManager() *StateManager
NewStateManager creates a new StateManager with initial state idle.
func (*StateManager) CanStart ¶
func (sm *StateManager) CanStart(want RunState) bool
CanStart checks if a new operation can be started. Returns true only if current state is idle and requested state is not idle. Note: For operations that want to "take over", use Set() directly.
func (*StateManager) Current ¶
func (sm *StateManager) Current() RunState
Current returns the current running state.
func (*StateManager) IsIdle ¶
func (sm *StateManager) IsIdle() bool
IsIdle checks if the system is in idle state.
func (*StateManager) Metadata ¶
func (sm *StateManager) Metadata() map[string]any
Metadata returns a copy of the current running metadata. Used for progress tracking (processed count, total count, etc).
func (*StateManager) Set ¶
func (sm *StateManager) Set(state RunState)
Set directly sets the state without checking. Used by operations that want to "take over" (e.g., Replay when Poller is running). Other operations will detect the state change and pause themselves. StateIdle always clears metadata. Non-idle state transitions also clear metadata.
func (*StateManager) SetMetadata ¶
func (sm *StateManager) SetMetadata(key string, value any)
SetMetadata updates a metadata key-value pair. Used during operation to report progress.
type Transaction ¶
type Transaction struct {
TraceID string `json:"trace_id"` // Human-readable trace ID for log correlation
ID string `json:"id"` // Internal transaction ID
Changes []Change `json:"changes"`
CreatedAt time.Time `json:"created_at"`
CommitTime time.Time `json:"commit_time"` // Transaction commit time (from first change)
FirstSeenTime time.Time `json:"first_seen_time"` // When first change was added to buffer
InvolvedTables []string `json:"involved_tables"` // Tables that have changes in this transaction
}
Transaction represents a group of changes in the same transaction
func NewTransaction ¶
func NewTransaction(id string) *Transaction
NewTransaction creates a new transaction with a unique trace ID
func (*Transaction) AddChange ¶
func (t *Transaction) AddChange(c Change)
AddChange adds a change to the transaction
type TransactionBuffer ¶
type TransactionBuffer struct {
// contains filtered or unexported fields
}
TransactionBuffer holds pending transactions waiting for completion
func NewTransactionBuffer ¶
func NewTransactionBuffer(maxWaitTime time.Duration, pollInterval time.Duration, maxTxCount int, maxBatchBytes int) *TransactionBuffer
NewTransactionBuffer creates a new transaction buffer pollInterval is derived from CDC poll configuration and used for commit-time gating
func (*TransactionBuffer) Add ¶
func (tb *TransactionBuffer) Add(change Change)
Add adds a change to the buffer. Non-transactional changes (empty TransactionID) are delivered immediately.
func (*TransactionBuffer) Flush ¶
func (tb *TransactionBuffer) Flush() []*Transaction
Flush returns all pending transactions immediately (for shutdown/rebuild) Marks all as complete regardless of timeout status
func (*TransactionBuffer) GetCompleteTransactions ¶
func (tb *TransactionBuffer) GetCompleteTransactions() []*Transaction
GetCompleteTransactions returns transactions that are ready for delivery Delivery conditions (OR): 1. All involved tables have max(tableCommitTime) > tx.CommitTime (commit-time gating) 2. now.Sub(tx.CommitTime) > pollInterval * 2 (poll-interval gating safety boundary) 3. Batch limits reached
func (*TransactionBuffer) GetTableMaxCommitTime ¶
func (tb *TransactionBuffer) GetTableMaxCommitTime(table string) (time.Time, bool)
GetTableMaxCommitTime returns the max commit time for a table
func (*TransactionBuffer) IsEmpty ¶
func (tb *TransactionBuffer) IsEmpty() bool
IsEmpty returns true if there are no pending transactions
func (*TransactionBuffer) SetComplete ¶
func (tb *TransactionBuffer) SetComplete(txID string) *Transaction
SetComplete marks a transaction as complete and returns it for delivery
func (*TransactionBuffer) SetOnComplete ¶
func (tb *TransactionBuffer) SetOnComplete(fn func(*Transaction))
SetOnComplete sets the callback for completed transactions
func (*TransactionBuffer) SetOnTimeout ¶
func (tb *TransactionBuffer) SetOnTimeout(fn func(*Transaction, DeliveryReason))
SetOnTimeout sets the callback for timed-out transactions (includes delivery reason)
func (*TransactionBuffer) Size ¶
func (tb *TransactionBuffer) Size() int
Size returns the number of pending transactions
func (*TransactionBuffer) UpdateTableMaxCommitTime ¶
func (tb *TransactionBuffer) UpdateTableMaxCommitTime(table string, commitTime time.Time)
UpdateTableMaxCommitTime updates the max commit time for a table This is called externally when CDC captures new commit times
type TransactionWithBatch ¶
type TransactionWithBatch struct {
Transaction *Transaction
BatchCtx *BatchContext
}
TransactionWithBatch wraps a Transaction with its BatchContext
type Transformer ¶
type Transformer interface {
Transform(ctx context.Context, changes []Change, batchCtx *BatchContext) error
}
Transformer interface for ETL processing of CDC changes