core

package
v0.0.0-...-77b25ca Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 30, 2026 License: Apache-2.0 Imports: 13 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ComputeChangeID

func ComputeChangeID(txID, table string, data map[string]interface{}, lsn []byte, op Operation) string

ComputeChangeID computes a content-based hash ID for a change. It uses the same algorithm as store/sqlite to ensure consistency. The ID is deterministic: same input always produces the same ID.

Types

type BatchContext

type BatchContext struct {
	// SkillName is the skill being executed (for sink logging)
	SkillName string

	// BatchID is a unique identifier for each poll cycle (root trace ID)
	// Format: {timestamp-ms}-{uuid-short-8chars}
	BatchID string

	// StartTime is when this poll cycle started
	StartTime time.Time
}

BatchContext carries observability context through the pipeline. It is created at the start of each poll cycle and passed to handlers, skills, and sinks for logging correlation.

func NewBatchContext

func NewBatchContext() *BatchContext

NewBatchContext creates a new BatchContext with a unique BatchID

type CaptureChange

type CaptureChange map[string]interface{}

CaptureChange represents a single row change captured from CDC. This is the intermediate type used between Capturer and Runtime to avoid import cycles.

type CaptureResult

type CaptureResult struct {
	Changes      []CaptureChange
	BatchID      string       // Unique identifier for this fetch batch
	NextCapturer CapturerName // Which capturer to use for next fetch
}

CaptureResult carries data and metadata from a Capturer fetch operation.

type Capturer

type Capturer interface {
	// Fetch returns the next batch of changes and indicates which capturer to use next.
	Fetch(ctx context.Context) *CaptureResult

	// Stop signals the capturer to stop fetching.
	// Called by Runtime when the overall system is shutting down.
	Stop()
}

Capturer is the interface for data ingestion sources. Each implementation corresponds to a CDC mode: incremental (Change), Snapshot, or Replay. Capturer only handles data fetching — post-fetch processing (Transformer, DLQ, offset) is handled by Runtime.

type CapturerName

type CapturerName string

CapturerName represents the name of a data capturer.

const (
	CapturerCDC      CapturerName = "cdc"
	CapturerSnapshot CapturerName = "snapshot"
	CapturerReplay   CapturerName = "replay"
)

type Change

type Change struct {
	Table         string                 `json:"table"`
	TransactionID string                 `json:"transaction_id"`
	LSN           []byte                 `json:"lsn"`
	Operation     Operation              `json:"operation"`
	Data          map[string]interface{} `json:"data"`
	CommitTime    time.Time              `json:"commit_time"` // Transaction commit time from source database
	ID            string                 `json:"id"`          // Content-based hash ID for deduplication
	TableKeys     string                 `json:"table_keys"`  // Primary key values (comma-separated)
}

Change represents a single row change

type DataSet

type DataSet struct {
	Columns []string
	Rows    [][]any
}

DataSet represents query results from SQL execution

type DeliveryReason

type DeliveryReason string

DeliveryReason indicates why a transaction was delivered

const (
	DeliveryReasonCommitTime   DeliveryReason = "commit_time"   // All involved tables indicated transaction complete
	DeliveryReasonPollInterval DeliveryReason = "poll_interval" // Poll interval gating safety boundary
	DeliveryReasonTimeout      DeliveryReason = "timeout"       // maxWaitTime timeout fallback
	DeliveryReasonBatchLimit   DeliveryReason = "batch_limit"   // Batch size limits reached

)

type LSN

type LSN []byte

LSN represents a Log Sequence Number

func MinLSN

func MinLSN(lsns []LSN) (LSN, error)

MinLSN returns the minimum LSN from a slice

func ParseLSN

func ParseLSN(s string) (LSN, error)

ParseLSN creates LSN from hex string

func (LSN) Compare

func (l LSN) Compare(other LSN) int

Compare returns -1 if l < other, 0 if equal, 1 if l > other

func (LSN) IsZero

func (l LSN) IsZero() bool

IsZero returns true if LSN is all zeros

func (LSN) String

func (l LSN) String() string

String returns hex representation

type Operation

type Operation int

Operation represents the type of change

const (
	OpDelete       Operation = 1
	OpInsert       Operation = 2
	OpUpdateBefore Operation = 3
	OpUpdateAfter  Operation = 4
)

func (Operation) String

func (o Operation) String() string

String returns the operation name

type RunState

type RunState string

RunState represents the current running state of the system.

const (
	// StateIdle means no operation is running.
	StateIdle RunState = "idle"
	// StatePolling means CDC Poller is running.
	StatePolling RunState = "polling"
	// StateReplay means Replay service is running.
	StateReplay RunState = "replay"
	// StateSnapshot means Snapshot service is running (reserved for future).
	StateSnapshot RunState = "snapshot"
)

type Runtime

type Runtime struct {
	// contains filtered or unexported fields
}

Runtime orchestrates data capture and ETL processing. It manages all three capturers (CDC, Snapshot, Replay) and handles post-fetch logic (Transformer, DLQ, state management). Note: Store is now internal to ChangeCapturer; Runtime does not manage it.

func NewRuntime

func NewRuntime(
	capturers map[CapturerName]Capturer,
	transformer Transformer,
	dlq *dlq.DLQ,
	monitorDB *monitor.DB,
) *Runtime

NewRuntime creates a new Runtime with all capturers initialized.

func (*Runtime) CDC

func (r *Runtime) CDC() Capturer

CDC returns the CDC capturer instance.

func (*Runtime) Close

func (r *Runtime) Close()

Close stops all capturers. Called when runtime is shutting down.

func (*Runtime) Replay

func (r *Runtime) Replay() Capturer

Replay returns the Replay capturer instance.

func (*Runtime) Run

func (r *Runtime) Run(ctx context.Context) error

Run starts the runtime and loops until context is cancelled. It polls the current capturer on each tick and switches capturers based on NextCapturer.

func (*Runtime) Snapshot

func (r *Runtime) Snapshot() Capturer

Snapshot returns the Snapshot capturer instance.

func (*Runtime) SwitchTo

func (r *Runtime) SwitchTo(name CapturerName)

SwitchTo switches the current capturer to the specified one. Called by Dashboard to manually switch between capturers.

type Sink

type Sink struct {
	Config  SinkConfig
	DataSet *DataSet
	OpType  Operation // Uses core.Operation from transaction.go
}

Sink represents a sink operation from SQL plugin

type SinkConfig

type SinkConfig struct {
	Name       string
	Database   string // Database name (maps to platform-configured storage)
	Output     string
	PrimaryKey string
	OnConflict string
}

SinkConfig represents sink configuration

type StateManager

type StateManager struct {
	// contains filtered or unexported fields
}

StateManager coordinates the running state of Poller, Replay, and Snapshot. It acts as a simple state marker - operations check state and pause themselves. State is stored in memory and resets to idle on restart.

func NewStateManager

func NewStateManager() *StateManager

NewStateManager creates a new StateManager with initial state idle.

func (*StateManager) CanStart

func (sm *StateManager) CanStart(want RunState) bool

CanStart checks if a new operation can be started. Returns true only if current state is idle and requested state is not idle. Note: For operations that want to "take over", use Set() directly.

func (*StateManager) Current

func (sm *StateManager) Current() RunState

Current returns the current running state.

func (*StateManager) IsIdle

func (sm *StateManager) IsIdle() bool

IsIdle checks if the system is in idle state.

func (*StateManager) Metadata

func (sm *StateManager) Metadata() map[string]any

Metadata returns a copy of the current running metadata. Used for progress tracking (processed count, total count, etc).

func (*StateManager) Set

func (sm *StateManager) Set(state RunState)

Set directly sets the state without checking. Used by operations that want to "take over" (e.g., Replay when Poller is running). Other operations will detect the state change and pause themselves. StateIdle always clears metadata. Non-idle state transitions also clear metadata.

func (*StateManager) SetMetadata

func (sm *StateManager) SetMetadata(key string, value any)

SetMetadata updates a metadata key-value pair. Used during operation to report progress.

type Transaction

type Transaction struct {
	TraceID        string    `json:"trace_id"` // Human-readable trace ID for log correlation
	ID             string    `json:"id"`       // Internal transaction ID
	Changes        []Change  `json:"changes"`
	CreatedAt      time.Time `json:"created_at"`
	CommitTime     time.Time `json:"commit_time"`     // Transaction commit time (from first change)
	FirstSeenTime  time.Time `json:"first_seen_time"` // When first change was added to buffer
	InvolvedTables []string  `json:"involved_tables"` // Tables that have changes in this transaction
}

Transaction represents a group of changes in the same transaction

func NewTransaction

func NewTransaction(id string) *Transaction

NewTransaction creates a new transaction with a unique trace ID

func (*Transaction) AddChange

func (t *Transaction) AddChange(c Change)

AddChange adds a change to the transaction

type TransactionBuffer

type TransactionBuffer struct {
	// contains filtered or unexported fields
}

TransactionBuffer holds pending transactions waiting for completion

func NewTransactionBuffer

func NewTransactionBuffer(maxWaitTime time.Duration, pollInterval time.Duration, maxTxCount int, maxBatchBytes int) *TransactionBuffer

NewTransactionBuffer creates a new transaction buffer pollInterval is derived from CDC poll configuration and used for commit-time gating

func (*TransactionBuffer) Add

func (tb *TransactionBuffer) Add(change Change)

Add adds a change to the buffer. Non-transactional changes (empty TransactionID) are delivered immediately.

func (*TransactionBuffer) Close

func (tb *TransactionBuffer) Close()

Close stops the cleanup loop

func (*TransactionBuffer) Flush

func (tb *TransactionBuffer) Flush() []*Transaction

Flush returns all pending transactions immediately (for shutdown/rebuild) Marks all as complete regardless of timeout status

func (*TransactionBuffer) GetCompleteTransactions

func (tb *TransactionBuffer) GetCompleteTransactions() []*Transaction

GetCompleteTransactions returns transactions that are ready for delivery Delivery conditions (OR): 1. All involved tables have max(tableCommitTime) > tx.CommitTime (commit-time gating) 2. now.Sub(tx.CommitTime) > pollInterval * 2 (poll-interval gating safety boundary) 3. Batch limits reached

func (*TransactionBuffer) GetTableMaxCommitTime

func (tb *TransactionBuffer) GetTableMaxCommitTime(table string) (time.Time, bool)

GetTableMaxCommitTime returns the max commit time for a table

func (*TransactionBuffer) IsEmpty

func (tb *TransactionBuffer) IsEmpty() bool

IsEmpty returns true if there are no pending transactions

func (*TransactionBuffer) SetComplete

func (tb *TransactionBuffer) SetComplete(txID string) *Transaction

SetComplete marks a transaction as complete and returns it for delivery

func (*TransactionBuffer) SetOnComplete

func (tb *TransactionBuffer) SetOnComplete(fn func(*Transaction))

SetOnComplete sets the callback for completed transactions

func (*TransactionBuffer) SetOnTimeout

func (tb *TransactionBuffer) SetOnTimeout(fn func(*Transaction, DeliveryReason))

SetOnTimeout sets the callback for timed-out transactions (includes delivery reason)

func (*TransactionBuffer) Size

func (tb *TransactionBuffer) Size() int

Size returns the number of pending transactions

func (*TransactionBuffer) UpdateTableMaxCommitTime

func (tb *TransactionBuffer) UpdateTableMaxCommitTime(table string, commitTime time.Time)

UpdateTableMaxCommitTime updates the max commit time for a table This is called externally when CDC captures new commit times

type TransactionWithBatch

type TransactionWithBatch struct {
	Transaction *Transaction
	BatchCtx    *BatchContext
}

TransactionWithBatch wraps a Transaction with its BatchContext

type Transformer

type Transformer interface {
	Transform(ctx context.Context, changes []Change, batchCtx *BatchContext) error
}

Transformer interface for ETL processing of CDC changes

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL