partitionstorage

package
v1.4.2-devel Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 20, 2026 License: Apache-2.0 Imports: 14 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func WithRequestPriotiry

func WithRequestPriotiry(priority spannerpb.RequestOptions_Priority) spannerOption

WithRequestPriotiry sets the priority option for Spanner requests. Default value is unspecified, equivalent to high.

Types

type InmemoryPartitionStorage

type InmemoryPartitionStorage struct {
	// contains filtered or unexported fields
}

InmemoryPartitionStorage implements PartitionStorage that stores PartitionMetadata in memory.

func NewInmemory

func NewInmemory() *InmemoryPartitionStorage

NewInmemory creates a new instance of InmemoryPartitionStorage.

func (*InmemoryPartitionStorage) AddChildPartitions

AddChildPartitions adds new child partitions for a parent partition based on a ChildPartitionsRecord.

func (*InmemoryPartitionStorage) GetActiveRunnerCount

func (s *InmemoryPartitionStorage) GetActiveRunnerCount(ctx context.Context) (int, error)

GetActiveRunnerCount returns 1 for in-memory storage since it only supports single runner. This is a no-op implementation as in-memory storage doesn't track multiple runners.

func (*InmemoryPartitionStorage) GetAndSchedulePartitions added in v0.2.0

func (s *InmemoryPartitionStorage) GetAndSchedulePartitions(ctx context.Context, minWatermark time.Time, runnerID string, limit int) ([]*screamer.PartitionMetadata, error)

GetAndSchedulePartitions finds partitions ready to be scheduled and marks them as scheduled. The limit parameter restricts the maximum number of partitions to schedule.

func (*InmemoryPartitionStorage) GetInterruptedPartitions

func (s *InmemoryPartitionStorage) GetInterruptedPartitions(ctx context.Context, runnerID string, limit int) ([]*screamer.PartitionMetadata, error)

GetInterruptedPartitions is a no-op for in-memory storage and always returns nil. The limit parameter is ignored as this implementation doesn't track runner assignments.

func (*InmemoryPartitionStorage) GetSchedulablePartitions

func (s *InmemoryPartitionStorage) GetSchedulablePartitions(ctx context.Context, minWatermark time.Time) ([]*screamer.PartitionMetadata, error)

GetSchedulablePartitions returns partitions that are ready to be scheduled based on the minimum watermark.

func (*InmemoryPartitionStorage) GetUnfinishedMinWatermarkPartition

func (s *InmemoryPartitionStorage) GetUnfinishedMinWatermarkPartition(ctx context.Context) (*screamer.PartitionMetadata, error)

GetUnfinishedMinWatermarkPartition returns the unfinished partition with the minimum watermark. Returns nil if there are no unfinished partitions.

func (*InmemoryPartitionStorage) InitializeRootPartition

func (s *InmemoryPartitionStorage) InitializeRootPartition(ctx context.Context, startTimestamp time.Time, endTimestamp time.Time, heartbeatInterval time.Duration) error

InitializeRootPartition creates or updates the root partition metadata in memory.

func (*InmemoryPartitionStorage) RefreshRunner added in v0.3.1

func (s *InmemoryPartitionStorage) RefreshRunner(ctx context.Context, runnerID string) error

RefreshRunner is a no-op for in-memory storage.

func (*InmemoryPartitionStorage) UpdateToFinished

func (s *InmemoryPartitionStorage) UpdateToFinished(ctx context.Context, partition *screamer.PartitionMetadata, runnerID string) error

UpdateToFinished marks the given partition as finished and sets the FinishedAt timestamp.

func (*InmemoryPartitionStorage) UpdateToRunning

func (s *InmemoryPartitionStorage) UpdateToRunning(ctx context.Context, partition *screamer.PartitionMetadata) error

UpdateToRunning marks the given partition as running and sets the RunningAt timestamp.

func (*InmemoryPartitionStorage) UpdateToScheduled

func (s *InmemoryPartitionStorage) UpdateToScheduled(ctx context.Context, partitions []*screamer.PartitionMetadata) error

UpdateToScheduled marks the given partitions as scheduled and sets the ScheduledAt timestamp.

func (*InmemoryPartitionStorage) UpdateWatermark

func (s *InmemoryPartitionStorage) UpdateWatermark(ctx context.Context, partition *screamer.PartitionMetadata, watermark time.Time) error

UpdateWatermark updates the watermark for the given partition.

type SpannerPartitionStorage

type SpannerPartitionStorage struct {
	// contains filtered or unexported fields
}

SpannerPartitionStorage implements PartitionStorage that stores PartitionMetadata in Cloud Spanner.

func NewSpanner

func NewSpanner(client *spanner.Client, tableName string, options ...spannerOption) *SpannerPartitionStorage

NewSpanner creates a new instance of SpannerPartitionStorage for the given Spanner client and table name. Optional spannerOption(s) can be provided to configure request priority.

func (*SpannerPartitionStorage) AddChildPartitions

AddChildPartitions adds new child partitions for a parent partition based on a ChildPartitionsRecord. Used when a partition splits or merges.

func (*SpannerPartitionStorage) GetActiveRunnerCount

func (s *SpannerPartitionStorage) GetActiveRunnerCount(ctx context.Context) (int, error)

GetActiveRunnerCount returns the number of active runners that have refreshed within the last 5 seconds. Used for metrics and observability to understand cluster size.

func (*SpannerPartitionStorage) GetAndSchedulePartitions added in v0.2.0

func (s *SpannerPartitionStorage) GetAndSchedulePartitions(ctx context.Context, minWatermark time.Time, runnerID string, limit int) ([]*screamer.PartitionMetadata, error)

GetAndSchedulePartitions finds partitions ready to be scheduled and assigns them to the given runnerID. Returns the scheduled partitions. The limit parameter restricts the maximum number of partitions to schedule. Uses optimistic concurrency with randomized ordering to reduce conflicts between multiple runners.

func (*SpannerPartitionStorage) GetInterruptedPartitions

func (s *SpannerPartitionStorage) GetInterruptedPartitions(ctx context.Context, runnerID string, limit int) ([]*screamer.PartitionMetadata, error)

GetInterruptedPartitions returns partitions that are scheduled or running but have lost their runner. Assigns the current runnerID to these partitions for recovery. The limit parameter restricts the maximum number of partitions to recover.

func (*SpannerPartitionStorage) GetUnfinishedMinWatermarkPartition

func (s *SpannerPartitionStorage) GetUnfinishedMinWatermarkPartition(ctx context.Context) (*screamer.PartitionMetadata, error)

GetUnfinishedMinWatermarkPartition returns the unfinished partition with the minimum watermark. Returns nil if there are no unfinished partitions.

func (*SpannerPartitionStorage) InitializeRootPartition

func (s *SpannerPartitionStorage) InitializeRootPartition(ctx context.Context, startTimestamp time.Time, endTimestamp time.Time, heartbeatInterval time.Duration) error

InitializeRootPartition creates or updates the root partition metadata in the table. Used to start a new change stream subscription.

func (*SpannerPartitionStorage) RefreshRunner added in v0.3.0

func (s *SpannerPartitionStorage) RefreshRunner(ctx context.Context, runnerID string) error

RefreshRunner updates the UpdatedAt timestamp for the given runnerID in the Runner table. Used to indicate liveness of a runner.

func (*SpannerPartitionStorage) RegisterRunner added in v0.3.0

func (s *SpannerPartitionStorage) RegisterRunner(ctx context.Context, runnerID string) error

RegisterRunner registers a runner in the Runner table with the given runnerID. Used for distributed lock and partition assignment.

func (*SpannerPartitionStorage) RunMigrations added in v0.3.0

func (s *SpannerPartitionStorage) RunMigrations(ctx context.Context) error

RunMigrations creates or updates the necessary Spanner tables and indexes for partition and runner metadata management. It is idempotent and can be safely called multiple times.

func (*SpannerPartitionStorage) UpdateToFinished

func (s *SpannerPartitionStorage) UpdateToFinished(ctx context.Context, partition *screamer.PartitionMetadata, runnerID string) error

UpdateToFinished marks the given partition as finished and sets the FinishedAt timestamp.

func (*SpannerPartitionStorage) UpdateToRunning

func (s *SpannerPartitionStorage) UpdateToRunning(ctx context.Context, partition *screamer.PartitionMetadata) error

UpdateToRunning marks the given partition as running and sets the RunningAt timestamp.

func (*SpannerPartitionStorage) UpdateWatermark

func (s *SpannerPartitionStorage) UpdateWatermark(ctx context.Context, partition *screamer.PartitionMetadata, watermark time.Time) error

UpdateWatermark updates the watermark for the given partition.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL