screamer

package module
v1.3.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 14, 2025 License: Apache-2.0 Imports: 11 Imported by: 0

README

Screamer... (S)panner (C)hange st(REAM) read(ER)

Test Go Reference

Cloud Spanner Change Streams Subscriber for Go


New: Built-in Distributed Locking for Scalability

Screamer now includes built-in distributed locking and runner liveness tracking. When you scale up and run multiple Screamer instances, partitions are automatically assigned to available runners, and failover is handled transparently. This enables robust, distributed processing of change streams with high availability.


Sypnosis

This library is an implementation to subscribe a change stream's records of Google Cloud Spanner in Go. It is heavily inspired by the SpannerIO connector of the Apache Beam SDK and is compatible with the PartitionMetadata data model.

Motivation

To read a change streams, Google Cloud offers Dataflow connector as a scalable and reliable solution, but in some cases the abstraction and capabilities of Dataflow pipelines can be too much (or is simply too expensive). This library aims to make reading change streams native for non beam/dataflow use cases.

Example Usage

package screamer


import (
	"context"
	"encoding/json"
	"errors"
	"fmt"
	"io"
	"os"
	"os/signal"
	"sync"

	"cloud.google.com/go/spanner"
	"github.com/anicoll/screamer"
	"github.com/anicoll/screamer/pkg/partitionstorage"
)

func main() {
	ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, os.Kill)
	defer stop()
	runnerID := "runner-1"

	database := fmt.Sprintf("projects/%s/instances/%s/databases/%s", "foo-project", "bar-instance", "baz-database")
	spannerClient, err := spanner.NewClient(ctx, database)
	if err != nil {
		panic(err)
	}
	defer spannerClient.Close()

	partitionMetadataTableName := "PartitionMetadata_FooStream"

	partitionStorage := partitionstorage.NewSpanner(spannerClient, partitionMetadataTableName)
	if err := partitionStorage.RunMigrations(ctx); err != nil {
		panic(err)
	}
	if err := partitionStorage.RegisterRunner(ctx, runnerID); err != nil {
		panic(err)
	}

	changeStreamName := "FooStream"
	subscriber := screamer.NewSubscriber(spannerClient, changeStreamName, runnerID, partitionStorage, screamer.WithLogLevel("debug"))

	fmt.Fprintf(os.Stderr, "Reading the stream...\n")
	logger := &Logger{out: os.Stdout}
	if err := subscriber.Subscribe(ctx, logger); err != nil && !errors.Is(ctx.Err(), context.Canceled) {
		panic(err)
	}
}

type Logger struct {
	out io.Writer
	mu  sync.Mutex
}

// []byte is marshalled screamer.DataChangeRecord
func (l *Logger) Consume(change []byte) error {
	l.mu.Lock()
	defer l.mu.Unlock()
	return json.NewEncoder(l.out).Encode(change)
}

CLI

Installation
$ go install github.com/anicoll/screamer@latest
Usage
NAME:
   screamer screamer

USAGE:
   screamer screamer [command options]

OPTIONS:
   --dsn value                  [$DSN]
   --stream value               [$STREAM]
   --metadata-table value       [$METADATA_TABLE]
   --start value               (default: Start timestamp with RFC3339 format, default: current timestamp) [$START]
   --end value                 (default: End timestamp with RFC3339 format default: indefinite) [$END]
   --heartbeat-interval value  (default: 3s) [$HEARTBEAT_INTERVAL]
   --partition-dsn value       (default: Database dsn for use by the partition metadata table. If not provided, the main dsn will be used.) [$PARTITION_DSN]
   --help, -h                  show help
Example

Credits

Heavily inspired by below projects.

Documentation

Index

Constants

View Source
const (
	ModType_INSERT = "INSERT"
	ModType_UPDATE = "UPDATE"
	ModType_DELETE = "DELETE"
)
View Source
const (
	// RootPartitionToken is the token value for the root partition.
	RootPartitionToken = "Parent0"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type ChangeRecord

type ChangeRecord struct {
	DataChangeRecords      []*dataChangeRecord      `spanner:"data_change_record" json:"data_change_record"`
	HeartbeatRecords       []*HeartbeatRecord       `spanner:"heartbeat_record" json:"heartbeat_record"`
	ChildPartitionsRecords []*ChildPartitionsRecord `spanner:"child_partitions_record" json:"child_partitions_record"`
}

type ChildPartition

type ChildPartition struct {
	Token                 string   `spanner:"token" json:"token"`
	ParentPartitionTokens []string `spanner:"parent_partition_tokens" json:"parent_partition_tokens"`
}

type ChildPartitionsRecord

type ChildPartitionsRecord struct {
	StartTimestamp  time.Time         `spanner:"start_timestamp" json:"start_timestamp"`
	RecordSequence  string            `spanner:"record_sequence" json:"record_sequence"`
	ChildPartitions []*ChildPartition `spanner:"child_partitions" json:"child_partitions"`
}

type ColumnType

type ColumnType struct {
	Name            string `json:"name"`
	Type            Type   `json:"type"`
	IsPrimaryKey    bool   `json:"is_primary_key,omitempty"`
	OrdinalPosition int64  `json:"ordinal_position"`
}

ColumnType is the metadata of the column.

type Config

type Config struct {
	DSN               string
	Stream            string
	MetadataTable     *string
	Start             *time.Time
	End               *time.Time
	HeartbeatInterval *time.Duration
	PartitionDSN      *string
	Priority          int32
}

type Consumer

type Consumer interface {
	// Consume processes a marshaled DataChangeRecord.
	Consume(change []byte) error
}

Consumer is the interface to consume the DataChangeRecord.

Consume might be called from multiple goroutines and must be re-entrant safe.

type ConsumerFunc

type ConsumerFunc func([]byte) error

ConsumerFunc type is an adapter to allow the use of ordinary functions as Consumer. The function receives json.Marshal(DataChangeRecord).

func (ConsumerFunc) Consume

func (f ConsumerFunc) Consume(change []byte) error

Consume calls f(change) for ConsumerFunc, allowing function types to satisfy the Consumer interface. The input is json.Marshal(DataChangeRecord.

type DataChangeRecord

type DataChangeRecord struct {
	CommitTimestamp                      time.Time     `json:"commit_timestamp"`
	RecordSequence                       string        `json:"record_sequence"`
	ServerTransactionID                  string        `json:"server_transaction_id"`
	IsLastRecordInTransactionInPartition bool          `json:"is_last_record_in_transaction_in_partition"`
	TableName                            string        `json:"table_name"`
	ColumnTypes                          []*ColumnType `json:"column_types"`
	Mods                                 []*Mod        `json:"mods"`
	ModType                              ModType       `json:"mod_type"`
	ValueCaptureType                     string        `json:"value_capture_type"`
	NumberOfRecordsInTransaction         int64         `json:"number_of_records_in_transaction"`
	NumberOfPartitionsInTransaction      int64         `json:"number_of_partitions_in_transaction"`
	TransactionTag                       string        `json:"transaction_tag"`
	IsSystemTransaction                  bool          `json:"is_system_transaction"`
}

DataChangeRecord is the change set of the table.

type DataChangeRecordWithPartitionMeta added in v1.2.0

type DataChangeRecordWithPartitionMeta struct {
	*DataChangeRecord
	PartitionToken string     `spanner:"PartitionToken" json:"partition_token"`
	StartTimestamp time.Time  `spanner:"StartTimestamp" json:"start_timestamp"`
	Watermark      time.Time  `spanner:"Watermark" json:"watermark"`
	CreatedAt      time.Time  `spanner:"CreatedAt" json:"created_at"`
	ScheduledAt    *time.Time `spanner:"ScheduledAt" json:"scheduled_at,omitempty"`
	RunningAt      *time.Time `spanner:"RunningAt" json:"running_at,omitempty"`
}

type HeartbeatRecord

type HeartbeatRecord struct {
	Timestamp time.Time `spanner:"timestamp" json:"timestamp"`
}

type Mod

type Mod struct {
	Keys      map[string]interface{} `json:"keys,omitempty"`
	NewValues map[string]interface{} `json:"new_values,omitempty"`
	OldValues map[string]interface{} `json:"old_values,omitempty"`
}

Mod contains the keys and the values of the changed records.

type ModType

type ModType string

type Option

type Option interface {
	Apply(*config)
}

Option configures a Subscriber via functional options.

func WithEndTimestamp

func WithEndTimestamp(endTimestamp time.Time) Option

WithEndTimestamp sets the end timestamp option for reading change streams. The value must be within the retention period of the change stream and must be after the start timestamp. If not set, reads latest changes until canceled.

func WithHeartbeatInterval

func WithHeartbeatInterval(heartbeatInterval time.Duration) Option

WithHeartbeatInterval sets the heartbeat interval for reading change streams. Default value is 10 seconds.

func WithLogLevel added in v1.0.2

func WithLogLevel(logLevel string) Option

WithLogLevel sets the log level for the subscriber.

func WithSerializedConsumer added in v1.0.0

func WithSerializedConsumer(serialized bool) Option

WithSerializedConsumer enables or disables serialized processing of records by the Consumer. When true, a mutex ensures that s.consumer.Consume() is called serially, simplifying Consumer implementations that are not re-entrant safe. This may impact performance. Default is false (concurrent consumption is allowed if the Consumer is re-entrant safe).

func WithSpannerRequestPriotiry

func WithSpannerRequestPriotiry(priority spannerpb.RequestOptions_Priority) Option

WithSpannerRequestPriotiry sets the request priority option for reading change streams. Default value is unspecified, equivalent to high.

func WithStartTimestamp

func WithStartTimestamp(startTimestamp time.Time) Option

WithStartTimestamp sets the start timestamp option for reading change streams. The value must be within the retention period of the change stream and before the current time. Default value is current timestamp.

type PartitionMetadata

type PartitionMetadata struct {
	PartitionToken  string     `spanner:"PartitionToken" json:"partition_token"`
	ParentTokens    []string   `spanner:"ParentTokens" json:"parent_tokens"`
	StartTimestamp  time.Time  `spanner:"StartTimestamp" json:"start_timestamp"`
	EndTimestamp    time.Time  `spanner:"EndTimestamp" json:"end_timestamp"`
	HeartbeatMillis int64      `spanner:"HeartbeatMillis" json:"heartbeat_millis"`
	State           State      `spanner:"State" json:"state"`
	Watermark       time.Time  `spanner:"Watermark" json:"watermark"`
	CreatedAt       time.Time  `spanner:"CreatedAt" json:"created_at"`
	ScheduledAt     *time.Time `spanner:"ScheduledAt" json:"scheduled_at,omitempty"`
	RunningAt       *time.Time `spanner:"RunningAt" json:"running_at,omitempty"`
	FinishedAt      *time.Time `spanner:"FinishedAt" json:"finished_at,omitempty"`
}

PartitionMetadata represents metadata for a change stream partition, including its state, timing, and parent/child relationships.

func (*PartitionMetadata) IsRootPartition

func (p *PartitionMetadata) IsRootPartition() bool

IsRootPartition returns true if this partition is the root partition.

type PartitionStorage

type PartitionStorage interface {
	// GetUnfinishedMinWatermarkPartition returns the unfinished partition with the minimum watermark, or nil if none exist.
	GetUnfinishedMinWatermarkPartition(ctx context.Context) (*PartitionMetadata, error)
	// GetInterruptedPartitions returns partitions that are scheduled or running but have lost their runner.
	GetInterruptedPartitions(ctx context.Context, runnerID string) ([]*PartitionMetadata, error)
	// InitializeRootPartition creates or updates the root partition metadata.
	InitializeRootPartition(ctx context.Context, startTimestamp time.Time, endTimestamp time.Time, heartbeatInterval time.Duration) error
	// GetAndSchedulePartitions finds partitions ready to be scheduled and assigns them to the given runnerID.
	GetAndSchedulePartitions(ctx context.Context, minWatermark time.Time, runnerID string) ([]*PartitionMetadata, error)
	// AddChildPartitions adds new child partitions for a parent partition based on a ChildPartitionsRecord.
	AddChildPartitions(ctx context.Context, parentPartition *PartitionMetadata, childPartitionsRecord *ChildPartitionsRecord) error
	// UpdateToRunning marks the given partition as running.
	UpdateToRunning(ctx context.Context, partition *PartitionMetadata) error
	// RefreshRunner updates the liveness timestamp for the given runnerID.
	RefreshRunner(ctx context.Context, runnerID string) error
	// UpdateToFinished marks the given partition as finished.
	UpdateToFinished(ctx context.Context, partition *PartitionMetadata, runnerID string) error
	// UpdateWatermark updates the watermark for the given partition.
	UpdateWatermark(ctx context.Context, partition *PartitionMetadata, watermark time.Time) error
}

PartitionStorage defines the interface for managing change stream partition metadata. Implementations must be concurrency-safe.

type State

type State string

State represents the state of a partition in the change stream lifecycle.

const (
	// StateCreated indicates the partition is newly created and not yet scheduled.
	StateCreated State = "CREATED"
	// StateScheduled indicates the partition is scheduled for processing.
	StateScheduled State = "SCHEDULED"
	// StateRunning indicates the partition is currently being processed.
	StateRunning State = "RUNNING"
	// StateFinished indicates the partition has been fully processed.
	StateFinished State = "FINISHED"
)

type Subscriber

type Subscriber struct {
	// contains filtered or unexported fields
}

Subscriber subscribes to a change stream and manages partition processing.

func NewSubscriber

func NewSubscriber(
	client *spanner.Client,
	streamName, runnerID string,
	partitionStorage PartitionStorage,
	options ...Option,
) *Subscriber

NewSubscriber creates a new subscriber of change streams. The returned Subscriber is ready to start processing with Subscribe.

func (*Subscriber) Subscribe

func (s *Subscriber) Subscribe(ctx context.Context, consumer Consumer) error

Subscribe starts subscribing to the change stream and processing records using the provided Consumer. This method blocks until all partitions are processed or the context is canceled.

func (*Subscriber) SubscribeFunc

func (s *Subscriber) SubscribeFunc(ctx context.Context, f ConsumerFunc) error

SubscribeFunc is an adapter to allow the use of ordinary functions as Consumer. The function might be called from multiple goroutines and must be re-entrant safe.

type Type

type Type struct {
	Code             TypeCode `json:"code"`
	ArrayElementType TypeCode `json:"array_element_type,omitempty"`
}

Type is the type of the column.

type TypeCode

type TypeCode string
const (
	TypeCode_NONE      TypeCode = ""
	TypeCode_BOOL      TypeCode = "BOOL"
	TypeCode_INT64     TypeCode = "INT64"
	TypeCode_FLOAT64   TypeCode = "FLOAT64"
	TypeCode_TIMESTAMP TypeCode = "TIMESTAMP"
	TypeCode_DATE      TypeCode = "DATE"
	TypeCode_STRING    TypeCode = "STRING"
	TypeCode_BYTES     TypeCode = "BYTES"
	TypeCode_NUMERIC   TypeCode = "NUMERIC"
	TypeCode_JSON      TypeCode = "JSON"
	TypeCode_ARRAY     TypeCode = "ARRAY"
)

Directories

Path Synopsis
cmd
internal
pkg

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL