Documentation
¶
Index ¶
- Constants
- type AckFunc
- type ChangeRecord
- type ChildPartition
- type ChildPartitionsRecord
- type ColumnType
- type Config
- type Consumer
- type ConsumerFunc
- type ConsumerFuncWithAck
- type ConsumerWithAck
- type DataChangeRecord
- type DataChangeRecordWithPartitionMeta
- type HeartbeatRecord
- type Mod
- type ModType
- type Option
- func WithAckTimeout(d time.Duration) Option
- func WithEndTimestamp(endTimestamp time.Time) Option
- func WithHeartbeatInterval(heartbeatInterval time.Duration) Option
- func WithLogLevel(logLevel string) Option
- func WithMaxConcurrentPartitions(max int) Option
- func WithSerializedConsumer(serialized bool) Option
- func WithSpannerRequestPriotiry(priority spannerpb.RequestOptions_Priority) Option
- func WithStartTimestamp(startTimestamp time.Time) Option
- type PartitionMetadata
- type PartitionMetrics
- type PartitionStorage
- type State
- type Subscriber
- func (s *Subscriber) GetMetrics() PartitionMetrics
- func (s *Subscriber) Subscribe(ctx context.Context, consumer Consumer) error
- func (s *Subscriber) SubscribeFunc(ctx context.Context, f ConsumerFunc) error
- func (s *Subscriber) SubscribeFuncWithAck(ctx context.Context, f ConsumerFuncWithAck) error
- func (s *Subscriber) SubscribeWithAck(ctx context.Context, consumer ConsumerWithAck) error
- type Type
- type TypeCode
Constants ¶
const ( ModType_INSERT = "INSERT" ModType_UPDATE = "UPDATE" ModType_DELETE = "DELETE" )
const (
// RootPartitionToken is the token value for the root partition.
RootPartitionToken = "Parent0"
)
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AckFunc ¶
type AckFunc func(error)
AckFunc acknowledges or nacks a record. Passing nil signals successful processing; passing a non-nil error nacks the record, which stops partition processing and surfaces the error to the subscriber.
type ChangeRecord ¶
type ChangeRecord struct {
DataChangeRecords []*dataChangeRecord `spanner:"data_change_record" json:"data_change_record"`
HeartbeatRecords []*HeartbeatRecord `spanner:"heartbeat_record" json:"heartbeat_record"`
ChildPartitionsRecords []*ChildPartitionsRecord `spanner:"child_partitions_record" json:"child_partitions_record"`
}
type ChildPartition ¶
type ChildPartitionsRecord ¶
type ChildPartitionsRecord struct {
StartTimestamp time.Time `spanner:"start_timestamp" json:"start_timestamp"`
RecordSequence string `spanner:"record_sequence" json:"record_sequence"`
ChildPartitions []*ChildPartition `spanner:"child_partitions" json:"child_partitions"`
}
type ColumnType ¶
type ColumnType struct {
Name string `json:"name"`
Type Type `json:"type"`
IsPrimaryKey bool `json:"is_primary_key,omitempty"`
OrdinalPosition int64 `json:"ordinal_position"`
}
ColumnType is the metadata of the column.
type Consumer ¶
type Consumer interface {
// Consume processes a marshaled DataChangeRecord.
Consume(change []byte) error
}
Consumer is the interface to consume the DataChangeRecord.
Consume might be called from multiple goroutines and must be re-entrant safe.
type ConsumerFunc ¶
ConsumerFunc type is an adapter to allow the use of ordinary functions as Consumer. The function receives json.Marshal(DataChangeRecord).
func (ConsumerFunc) Consume ¶
func (f ConsumerFunc) Consume(change []byte) error
Consume calls f(change) for ConsumerFunc, allowing function types to satisfy the Consumer interface. The input is json.Marshal(DataChangeRecord.
type ConsumerFuncWithAck ¶
ConsumerFuncWithAck is an adapter to allow the use of ordinary functions as ConsumerWithAck.
type ConsumerWithAck ¶
ConsumerWithAck is the interface to consume DataChangeRecords with explicit per-record acknowledgment.
Consume is called for each record. The ack function must be called exactly once when processing is complete — either with nil to ack, or with an error to nack. Consume itself may return immediately (async processing is allowed); the subscriber will not advance to the next batch until every ack from the current batch has been received.
If Consume returns a non-nil error the subscriber stops immediately without waiting for ack.
Consume might be called from multiple goroutines and must be re-entrant safe.
type DataChangeRecord ¶
type DataChangeRecord struct {
CommitTimestamp time.Time `json:"commit_timestamp"`
RecordSequence string `json:"record_sequence"`
ServerTransactionID string `json:"server_transaction_id"`
IsLastRecordInTransactionInPartition bool `json:"is_last_record_in_transaction_in_partition"`
TableName string `json:"table_name"`
ColumnTypes []*ColumnType `json:"column_types"`
Mods []*Mod `json:"mods"`
ModType ModType `json:"mod_type"`
ValueCaptureType string `json:"value_capture_type"`
NumberOfRecordsInTransaction int64 `json:"number_of_records_in_transaction"`
NumberOfPartitionsInTransaction int64 `json:"number_of_partitions_in_transaction"`
TransactionTag string `json:"transaction_tag"`
IsSystemTransaction bool `json:"is_system_transaction"`
}
DataChangeRecord is the change set of the table.
type DataChangeRecordWithPartitionMeta ¶ added in v1.2.0
type DataChangeRecordWithPartitionMeta struct {
*DataChangeRecord
PartitionToken string `spanner:"PartitionToken" json:"partition_token"`
StartTimestamp time.Time `spanner:"StartTimestamp" json:"start_timestamp"`
Watermark time.Time `spanner:"Watermark" json:"watermark"`
CreatedAt time.Time `spanner:"CreatedAt" json:"created_at"`
ScheduledAt *time.Time `spanner:"ScheduledAt" json:"scheduled_at,omitempty"`
RunningAt *time.Time `spanner:"RunningAt" json:"running_at,omitempty"`
}
type HeartbeatRecord ¶
type Mod ¶
type Mod struct {
Keys map[string]interface{} `json:"keys,omitempty"`
NewValues map[string]interface{} `json:"new_values,omitempty"`
OldValues map[string]interface{} `json:"old_values,omitempty"`
}
Mod contains the keys and the values of the changed records.
type Option ¶
type Option interface {
Apply(*config)
}
Option configures a Subscriber via functional options.
func WithAckTimeout ¶
WithAckTimeout sets the maximum duration the subscriber will wait for a consumer to acknowledge each batch of records when using ConsumerWithAck. If the timeout elapses before all acks in a batch are received, the subscriber returns an error and stops processing that partition. A value of 0 (default) disables the timeout — the subscriber waits indefinitely.
func WithEndTimestamp ¶
WithEndTimestamp sets the end timestamp option for reading change streams. The value must be within the retention period of the change stream and must be after the start timestamp. If not set, reads latest changes until canceled.
func WithHeartbeatInterval ¶
WithHeartbeatInterval sets the heartbeat interval for reading change streams. Default value is 10 seconds.
func WithLogLevel ¶ added in v1.0.2
WithLogLevel sets the log level for the subscriber.
func WithMaxConcurrentPartitions ¶
WithMaxConcurrentPartitions sets the maximum number of partitions that can be processed concurrently by a single runner. This helps prevent resource exhaustion when dealing with large numbers of partitions. A value of 0 (default) means no limit, maintaining backward compatibility. Recommended value: 100 for production workloads.
func WithSerializedConsumer ¶ added in v1.0.0
WithSerializedConsumer enables or disables serialized processing of records by the Consumer. When true, a mutex ensures that s.consumer.Consume() is called serially, simplifying Consumer implementations that are not re-entrant safe. This may impact performance. Default is false (concurrent consumption is allowed if the Consumer is re-entrant safe).
func WithSpannerRequestPriotiry ¶
func WithSpannerRequestPriotiry(priority spannerpb.RequestOptions_Priority) Option
WithSpannerRequestPriotiry sets the request priority option for reading change streams. Default value is unspecified, equivalent to high.
func WithStartTimestamp ¶
WithStartTimestamp sets the start timestamp option for reading change streams. The value must be within the retention period of the change stream and before the current time. Default value is current timestamp.
type PartitionMetadata ¶
type PartitionMetadata struct {
PartitionToken string `spanner:"PartitionToken" json:"partition_token"`
ParentTokens []string `spanner:"ParentTokens" json:"parent_tokens"`
StartTimestamp time.Time `spanner:"StartTimestamp" json:"start_timestamp"`
EndTimestamp time.Time `spanner:"EndTimestamp" json:"end_timestamp"`
HeartbeatMillis int64 `spanner:"HeartbeatMillis" json:"heartbeat_millis"`
State State `spanner:"State" json:"state"`
Watermark time.Time `spanner:"Watermark" json:"watermark"`
CreatedAt time.Time `spanner:"CreatedAt" json:"created_at"`
ScheduledAt *time.Time `spanner:"ScheduledAt" json:"scheduled_at,omitempty"`
RunningAt *time.Time `spanner:"RunningAt" json:"running_at,omitempty"`
FinishedAt *time.Time `spanner:"FinishedAt" json:"finished_at,omitempty"`
}
PartitionMetadata represents metadata for a change stream partition, including its state, timing, and parent/child relationships.
func (*PartitionMetadata) IsRootPartition ¶
func (p *PartitionMetadata) IsRootPartition() bool
IsRootPartition returns true if this partition is the root partition.
type PartitionMetrics ¶
type PartitionMetrics struct {
// ActivePartitions is the number of partitions currently being processed by this runner.
ActivePartitions int
// MaxConcurrentPartitions is the configured limit (0 = unlimited).
MaxConcurrentPartitions int
// CapacityUsedPercent is the percentage of capacity in use (0-100, or -1 if unlimited).
CapacityUsedPercent float64
// AvailableSlots is the number of additional partitions this runner can accept.
AvailableSlots int
}
PartitionMetrics holds observability metrics for partition processing.
type PartitionStorage ¶
type PartitionStorage interface {
// GetUnfinishedMinWatermarkPartition returns the unfinished partition with the minimum watermark, or nil if none exist.
GetUnfinishedMinWatermarkPartition(ctx context.Context) (*PartitionMetadata, error)
// GetInterruptedPartitions returns partitions that are scheduled or running but have lost their runner.
// The limit parameter restricts the maximum number of partitions to recover.
GetInterruptedPartitions(ctx context.Context, runnerID string, limit int) ([]*PartitionMetadata, error)
// InitializeRootPartition creates or updates the root partition metadata.
InitializeRootPartition(ctx context.Context, startTimestamp time.Time, endTimestamp time.Time, heartbeatInterval time.Duration) error
// GetAndSchedulePartitions finds partitions ready to be scheduled and assigns them to the given runnerID.
// The limit parameter restricts the maximum number of partitions to schedule.
GetAndSchedulePartitions(ctx context.Context, minWatermark time.Time, runnerID string, limit int) ([]*PartitionMetadata, error)
// AddChildPartitions adds new child partitions for a parent partition based on a ChildPartitionsRecord.
AddChildPartitions(ctx context.Context, parentPartition *PartitionMetadata, childPartitionsRecord *ChildPartitionsRecord) error
// UpdateToRunning marks the given partition as running.
UpdateToRunning(ctx context.Context, partition *PartitionMetadata) error
// RefreshRunner updates the liveness timestamp for the given runnerID.
RefreshRunner(ctx context.Context, runnerID string) error
// UpdateToFinished marks the given partition as finished.
UpdateToFinished(ctx context.Context, partition *PartitionMetadata, runnerID string) error
// UpdateWatermark updates the watermark for the given partition.
UpdateWatermark(ctx context.Context, partition *PartitionMetadata, watermark time.Time) error
// GetActiveRunnerCount returns the number of active runners that have refreshed recently.
// Used for metrics and observability.
GetActiveRunnerCount(ctx context.Context) (int, error)
}
PartitionStorage defines the interface for managing change stream partition metadata. Implementations must be concurrency-safe.
type State ¶
type State string
State represents the state of a partition in the change stream lifecycle.
const ( // StateCreated indicates the partition is newly created and not yet scheduled. StateCreated State = "CREATED" // StateScheduled indicates the partition is scheduled for processing. StateScheduled State = "SCHEDULED" // StateRunning indicates the partition is currently being processed. StateRunning State = "RUNNING" // StateFinished indicates the partition has been fully processed. StateFinished State = "FINISHED" )
type Subscriber ¶
type Subscriber struct {
// contains filtered or unexported fields
}
Subscriber subscribes to a change stream and manages partition processing.
func NewSubscriber ¶
func NewSubscriber( client *spanner.Client, streamName, runnerID string, partitionStorage PartitionStorage, options ...Option, ) *Subscriber
NewSubscriber creates a new subscriber of change streams. The returned Subscriber is ready to start processing with Subscribe.
func (*Subscriber) GetMetrics ¶
func (s *Subscriber) GetMetrics() PartitionMetrics
GetMetrics returns current partition processing metrics for observability. This can be used for monitoring dashboards, health checks, and capacity planning.
func (*Subscriber) Subscribe ¶
func (s *Subscriber) Subscribe(ctx context.Context, consumer Consumer) error
Subscribe starts subscribing to the change stream and processing records using the provided Consumer. This method blocks until all partitions are processed or the context is canceled.
func (*Subscriber) SubscribeFunc ¶
func (s *Subscriber) SubscribeFunc(ctx context.Context, f ConsumerFunc) error
SubscribeFunc is an adapter to allow the use of ordinary functions as Consumer. The function might be called from multiple goroutines and must be re-entrant safe.
func (*Subscriber) SubscribeFuncWithAck ¶
func (s *Subscriber) SubscribeFuncWithAck(ctx context.Context, f ConsumerFuncWithAck) error
SubscribeFuncWithAck is an adapter to allow the use of ordinary functions as ConsumerWithAck.
func (*Subscriber) SubscribeWithAck ¶
func (s *Subscriber) SubscribeWithAck(ctx context.Context, consumer ConsumerWithAck) error
SubscribeWithAck starts subscribing to the change stream using a ConsumerWithAck. Each record is delivered with an AckFunc that the consumer must call exactly once. The subscriber will not advance to the next batch until all acks for the current batch are received. This method blocks until all partitions are processed or the context is canceled.
type Type ¶
type Type struct {
Code TypeCode `json:"code"`
ArrayElementType TypeCode `json:"array_element_type,omitempty"`
}
Type is the type of the column.
type TypeCode ¶
type TypeCode string
const ( TypeCode_NONE TypeCode = "" TypeCode_BOOL TypeCode = "BOOL" TypeCode_INT64 TypeCode = "INT64" TypeCode_FLOAT64 TypeCode = "FLOAT64" TypeCode_TIMESTAMP TypeCode = "TIMESTAMP" TypeCode_DATE TypeCode = "DATE" TypeCode_STRING TypeCode = "STRING" TypeCode_BYTES TypeCode = "BYTES" TypeCode_NUMERIC TypeCode = "NUMERIC" TypeCode_JSON TypeCode = "JSON" TypeCode_ARRAY TypeCode = "ARRAY" )