Documentation
¶
Index ¶
- Constants
- type ChangeRecord
- type ChildPartition
- type ChildPartitionsRecord
- type ColumnType
- type Config
- type Consumer
- type ConsumerFunc
- type DataChangeRecord
- type HeartbeatRecord
- type Mod
- type ModType
- type Option
- type PartitionMetadata
- type PartitionStorage
- type State
- type Subscriber
- type Type
- type TypeCode
Constants ¶
const ( ModType_INSERT = "INSERT" ModType_UPDATE = "UPDATE" ModType_DELETE = "DELETE" )
const (
RootPartitionToken = "Parent0"
)
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ChangeRecord ¶
type ChangeRecord struct {
DataChangeRecords []*dataChangeRecord `spanner:"data_change_record" json:"data_change_record"`
HeartbeatRecords []*HeartbeatRecord `spanner:"heartbeat_record" json:"heartbeat_record"`
ChildPartitionsRecords []*ChildPartitionsRecord `spanner:"child_partitions_record" json:"child_partitions_record"`
}
type ChildPartition ¶
type ChildPartitionsRecord ¶
type ChildPartitionsRecord struct {
StartTimestamp time.Time `spanner:"start_timestamp" json:"start_timestamp"`
RecordSequence string `spanner:"record_sequence" json:"record_sequence"`
ChildPartitions []*ChildPartition `spanner:"child_partitions" json:"child_partitions"`
}
type ColumnType ¶
type ColumnType struct {
Name string `json:"name"`
Type Type `json:"type"`
IsPrimaryKey bool `json:"is_primary_key,omitempty"`
OrdinalPosition int64 `json:"ordinal_position"`
}
ColumnType is the metadata of the column.
type Consumer ¶
Consumer is the interface to consume the DataChangeRecord.
Consume might be called from multiple goroutines and must be re-entrant safe.
type ConsumerFunc ¶
ConsumerFunc type is an adapter to allow the use of ordinary functions as Consumer. consumes json.Marshal(DataChangeRecord)
func (ConsumerFunc) Consume ¶
func (f ConsumerFunc) Consume(change []byte) error
Consume calls f(change). consumes json.Marshal(DataChangeRecord)
type DataChangeRecord ¶
type DataChangeRecord struct {
CommitTimestamp time.Time `json:"commit_timestamp"`
RecordSequence string `json:"record_sequence"`
ServerTransactionID string `json:"server_transaction_id"`
IsLastRecordInTransactionInPartition bool `json:"is_last_record_in_transaction_in_partition"`
TableName string `json:"table_name"`
ColumnTypes []*ColumnType `json:"column_types"`
Mods []*Mod `json:"mods"`
ModType ModType `json:"mod_type"`
ValueCaptureType string `json:"value_capture_type"`
NumberOfRecordsInTransaction int64 `json:"number_of_records_in_transaction"`
NumberOfPartitionsInTransaction int64 `json:"number_of_partitions_in_transaction"`
TransactionTag string `json:"transaction_tag"`
IsSystemTransaction bool `json:"is_system_transaction"`
}
DataChangeRecord is the change set of the table.
type HeartbeatRecord ¶
type Mod ¶
type Mod struct {
Keys map[string]interface{} `json:"keys,omitempty"`
NewValues map[string]interface{} `json:"new_values,omitempty"`
OldValues map[string]interface{} `json:"old_values,omitempty"`
}
Mod contains the keys and the values of the changed records.
type Option ¶
type Option interface {
Apply(*config)
}
func WithEndTimestamp ¶
WithEndTimestamp set the end timestamp option for read change streams.
The value must be within the retention period of the change stream and must be after the start timestamp. If not set, read latest changes until canceled.
func WithHeartbeatInterval ¶
WithHeartbeatInterval set the heartbeat interval for read change streams.
Default value is 10 seconds.
func WithSpannerRequestPriotiry ¶
func WithSpannerRequestPriotiry(priority spannerpb.RequestOptions_Priority) Option
WithSpannerRequestPriotiry set the request priority option for read change streams.
Default value is unspecified, equivalent to high.
func WithStartTimestamp ¶
WithStartTimestamp set the start timestamp option for read change streams.
The value must be within the retention period of the change stream and before the current time. Default value is current timestamp.
type PartitionMetadata ¶
type PartitionMetadata struct {
PartitionToken string `spanner:"PartitionToken" json:"partition_token"`
ParentTokens []string `spanner:"ParentTokens" json:"parent_tokens"`
StartTimestamp time.Time `spanner:"StartTimestamp" json:"start_timestamp"`
EndTimestamp time.Time `spanner:"EndTimestamp" json:"end_timestamp"`
HeartbeatMillis int64 `spanner:"HeartbeatMillis" json:"heartbeat_millis"`
State State `spanner:"State" json:"state"`
Watermark time.Time `spanner:"Watermark" json:"watermark"`
CreatedAt time.Time `spanner:"CreatedAt" json:"created_at"`
ScheduledAt *time.Time `spanner:"ScheduledAt" json:"scheduled_at,omitempty"`
RunningAt *time.Time `spanner:"RunningAt" json:"running_at,omitempty"`
FinishedAt *time.Time `spanner:"FinishedAt" json:"finished_at,omitempty"`
}
func (*PartitionMetadata) IsRootPartition ¶
func (p *PartitionMetadata) IsRootPartition() bool
IsRootPartition returns true if this is root partition.
type PartitionStorage ¶
type PartitionStorage interface {
GetUnfinishedMinWatermarkPartition(ctx context.Context) (*PartitionMetadata, error)
GetInterruptedPartitions(ctx context.Context, runnerID string) ([]*PartitionMetadata, error)
InitializeRootPartition(ctx context.Context, startTimestamp time.Time, endTimestamp time.Time, heartbeatInterval time.Duration) error
GetAndSchedulePartitions(ctx context.Context, minWatermark time.Time, runnerID string) ([]*PartitionMetadata, error)
AddChildPartitions(ctx context.Context, parentPartition *PartitionMetadata, childPartitionsRecord *ChildPartitionsRecord) error
UpdateToRunning(ctx context.Context, partition *PartitionMetadata) error
RefreshRunner(ctx context.Context, runnerID string) error
UpdateToFinished(ctx context.Context, partition *PartitionMetadata) error
UpdateWatermark(ctx context.Context, partition *PartitionMetadata, watermark time.Time) error
}
type Subscriber ¶
type Subscriber struct {
// contains filtered or unexported fields
}
Subscriber subscribes change stream.
func NewSubscriber ¶
func NewSubscriber( client *spanner.Client, streamName, runnerID string, partitionStorage PartitionStorage, options ...Option, ) *Subscriber
NewSubscriber creates a new subscriber of change streams.
func (*Subscriber) Subscribe ¶
func (s *Subscriber) Subscribe(ctx context.Context, consumer Consumer) error
Subscribe starts subscribing to the change stream.
func (*Subscriber) SubscribeFunc ¶
func (s *Subscriber) SubscribeFunc(ctx context.Context, f ConsumerFunc) error
SubscribeFunc is an adapter to allow the use of ordinary functions as Consumer.
function might be called from multiple goroutines and must be re-entrant safe.
type Type ¶
type Type struct {
Code TypeCode `json:"code"`
ArrayElementType TypeCode `json:"array_element_type,omitempty"`
}
Type is the type of the column.
type TypeCode ¶
type TypeCode string
const ( TypeCode_NONE TypeCode = "" TypeCode_BOOL TypeCode = "BOOL" TypeCode_INT64 TypeCode = "INT64" TypeCode_FLOAT64 TypeCode = "FLOAT64" TypeCode_TIMESTAMP TypeCode = "TIMESTAMP" TypeCode_DATE TypeCode = "DATE" TypeCode_STRING TypeCode = "STRING" TypeCode_BYTES TypeCode = "BYTES" TypeCode_NUMERIC TypeCode = "NUMERIC" TypeCode_JSON TypeCode = "JSON" TypeCode_ARRAY TypeCode = "ARRAY" )