screamer

package
v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 10, 2024 License: Apache-2.0 Imports: 10 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	DSN               string
	Stream            string
	MetadataTable     *string
	Start             *time.Time
	End               *time.Time
	HeartbeatInterval *time.Duration
	PartitionDSN      *string
	Priority          int32
}

type Consumer

type Consumer interface {
	Consume(change []byte) error
}

Consumer is the interface to consume the DataChangeRecord.

Consume might be called from multiple goroutines and must be re-entrant safe.

type ConsumerFunc

type ConsumerFunc func([]byte) error

ConsumerFunc type is an adapter to allow the use of ordinary functions as Consumer. consumes json.Marshal(DataChangeRecord)

func (ConsumerFunc) Consume

func (f ConsumerFunc) Consume(change []byte) error

Consume calls f(change). consumes json.Marshal(DataChangeRecord)

type Option

type Option interface {
	Apply(*config)
}

func WithEndTimestamp

func WithEndTimestamp(endTimestamp time.Time) Option

WithEndTimestamp set the end timestamp option for read change streams.

The value must be within the retention period of the change stream and must be after the start timestamp. If not set, read latest changes until canceled.

func WithHeartbeatInterval

func WithHeartbeatInterval(heartbeatInterval time.Duration) Option

WithHeartbeatInterval set the heartbeat interval for read change streams.

Default value is 10 seconds.

func WithSpannerRequestPriotiry

func WithSpannerRequestPriotiry(priority spannerpb.RequestOptions_Priority) Option

WithSpannerRequestPriotiry set the request priority option for read change streams.

Default value is unspecified, equivalent to high.

func WithStartTimestamp

func WithStartTimestamp(startTimestamp time.Time) Option

WithStartTimestamp set the start timestamp option for read change streams.

The value must be within the retention period of the change stream and before the current time. Default value is current timestamp.

type PartitionStorage

type PartitionStorage interface {
	GetUnfinishedMinWatermarkPartition(ctx context.Context) (*model.PartitionMetadata, error)
	GetInterruptedPartitions(ctx context.Context) ([]*model.PartitionMetadata, error)
	InitializeRootPartition(ctx context.Context, startTimestamp time.Time, endTimestamp time.Time, heartbeatInterval time.Duration) error
	GetSchedulablePartitions(ctx context.Context, minWatermark time.Time) ([]*model.PartitionMetadata, error)
	AddChildPartitions(ctx context.Context, parentPartition *model.PartitionMetadata, childPartitionsRecord *model.ChildPartitionsRecord) error
	UpdateToScheduled(ctx context.Context, partitions []*model.PartitionMetadata) error
	UpdateToRunning(ctx context.Context, partition *model.PartitionMetadata) error
	UpdateToFinished(ctx context.Context, partition *model.PartitionMetadata) error
	UpdateWatermark(ctx context.Context, partition *model.PartitionMetadata, watermark time.Time) error
}

type Subscriber

type Subscriber struct {
	// contains filtered or unexported fields
}

Subscriber subscribes change stream.

func NewSubscriber

func NewSubscriber(
	client *spanner.Client,
	streamName string,
	partitionStorage PartitionStorage,
	options ...Option,
) *Subscriber

NewSubscriber creates a new subscriber of change streams.

func (*Subscriber) Subscribe

func (s *Subscriber) Subscribe(ctx context.Context, consumer Consumer) error

Subscribe starts subscribing to the change stream.

func (*Subscriber) SubscribeFunc

func (s *Subscriber) SubscribeFunc(ctx context.Context, f ConsumerFunc) error

SubscribeFunc is an adapter to allow the use of ordinary functions as Consumer.

function might be called from multiple goroutines and must be re-entrant safe.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL