Documentation
¶
Index ¶
- func WithRequestPriotiry(priority spannerpb.RequestOptions_Priority) spannerOption
- type InmemoryPartitionStorage
- func (s *InmemoryPartitionStorage) AddChildPartitions(ctx context.Context, parent *screamer.PartitionMetadata, ...) error
- func (s *InmemoryPartitionStorage) GetActiveRunnerCount(ctx context.Context) (int, error)
- func (s *InmemoryPartitionStorage) GetAndSchedulePartitions(ctx context.Context, minWatermark time.Time, runnerID string, limit int) ([]*screamer.PartitionMetadata, error)
- func (s *InmemoryPartitionStorage) GetInterruptedPartitions(ctx context.Context, runnerID string, limit int) ([]*screamer.PartitionMetadata, error)
- func (s *InmemoryPartitionStorage) GetSchedulablePartitions(ctx context.Context, minWatermark time.Time) ([]*screamer.PartitionMetadata, error)
- func (s *InmemoryPartitionStorage) GetUnfinishedMinWatermarkPartition(ctx context.Context) (*screamer.PartitionMetadata, error)
- func (s *InmemoryPartitionStorage) InitializeRootPartition(ctx context.Context, startTimestamp time.Time, endTimestamp time.Time, ...) error
- func (s *InmemoryPartitionStorage) RefreshRunner(ctx context.Context, runnerID string) error
- func (s *InmemoryPartitionStorage) UpdateToFinished(ctx context.Context, partition *screamer.PartitionMetadata, runnerID string) error
- func (s *InmemoryPartitionStorage) UpdateToRunning(ctx context.Context, partition *screamer.PartitionMetadata) error
- func (s *InmemoryPartitionStorage) UpdateToScheduled(ctx context.Context, partitions []*screamer.PartitionMetadata) error
- func (s *InmemoryPartitionStorage) UpdateWatermark(ctx context.Context, partition *screamer.PartitionMetadata, ...) error
- type SpannerPartitionStorage
- func (s *SpannerPartitionStorage) AddChildPartitions(ctx context.Context, parent *screamer.PartitionMetadata, ...) error
- func (s *SpannerPartitionStorage) GetActiveRunnerCount(ctx context.Context) (int, error)
- func (s *SpannerPartitionStorage) GetAndSchedulePartitions(ctx context.Context, minWatermark time.Time, runnerID string, limit int) ([]*screamer.PartitionMetadata, error)
- func (s *SpannerPartitionStorage) GetInterruptedPartitions(ctx context.Context, runnerID string, limit int) ([]*screamer.PartitionMetadata, error)
- func (s *SpannerPartitionStorage) GetUnfinishedMinWatermarkPartition(ctx context.Context) (*screamer.PartitionMetadata, error)
- func (s *SpannerPartitionStorage) InitializeRootPartition(ctx context.Context, startTimestamp time.Time, endTimestamp time.Time, ...) error
- func (s *SpannerPartitionStorage) RefreshRunner(ctx context.Context, runnerID string) error
- func (s *SpannerPartitionStorage) RegisterRunner(ctx context.Context, runnerID string) error
- func (s *SpannerPartitionStorage) RunMigrations(ctx context.Context) error
- func (s *SpannerPartitionStorage) UpdateToFinished(ctx context.Context, partition *screamer.PartitionMetadata, runnerID string) error
- func (s *SpannerPartitionStorage) UpdateToRunning(ctx context.Context, partition *screamer.PartitionMetadata) error
- func (s *SpannerPartitionStorage) UpdateWatermark(ctx context.Context, partition *screamer.PartitionMetadata, ...) error
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func WithRequestPriotiry ¶
func WithRequestPriotiry(priority spannerpb.RequestOptions_Priority) spannerOption
WithRequestPriotiry sets the priority option for Spanner requests. Default value is unspecified, equivalent to high.
Types ¶
type InmemoryPartitionStorage ¶
type InmemoryPartitionStorage struct {
// contains filtered or unexported fields
}
InmemoryPartitionStorage implements PartitionStorage that stores PartitionMetadata in memory.
func NewInmemory ¶
func NewInmemory() *InmemoryPartitionStorage
NewInmemory creates a new instance of InmemoryPartitionStorage.
func (*InmemoryPartitionStorage) AddChildPartitions ¶
func (s *InmemoryPartitionStorage) AddChildPartitions(ctx context.Context, parent *screamer.PartitionMetadata, r *screamer.ChildPartitionsRecord) error
AddChildPartitions adds new child partitions for a parent partition based on a ChildPartitionsRecord.
func (*InmemoryPartitionStorage) GetActiveRunnerCount ¶
func (s *InmemoryPartitionStorage) GetActiveRunnerCount(ctx context.Context) (int, error)
GetActiveRunnerCount returns 1 for in-memory storage since it only supports single runner. This is a no-op implementation as in-memory storage doesn't track multiple runners.
func (*InmemoryPartitionStorage) GetAndSchedulePartitions ¶ added in v0.2.0
func (s *InmemoryPartitionStorage) GetAndSchedulePartitions(ctx context.Context, minWatermark time.Time, runnerID string, limit int) ([]*screamer.PartitionMetadata, error)
GetAndSchedulePartitions finds partitions ready to be scheduled and marks them as scheduled. The limit parameter restricts the maximum number of partitions to schedule.
func (*InmemoryPartitionStorage) GetInterruptedPartitions ¶
func (s *InmemoryPartitionStorage) GetInterruptedPartitions(ctx context.Context, runnerID string, limit int) ([]*screamer.PartitionMetadata, error)
GetInterruptedPartitions is a no-op for in-memory storage and always returns nil. The limit parameter is ignored as this implementation doesn't track runner assignments.
func (*InmemoryPartitionStorage) GetSchedulablePartitions ¶
func (s *InmemoryPartitionStorage) GetSchedulablePartitions(ctx context.Context, minWatermark time.Time) ([]*screamer.PartitionMetadata, error)
GetSchedulablePartitions returns partitions that are ready to be scheduled based on the minimum watermark.
func (*InmemoryPartitionStorage) GetUnfinishedMinWatermarkPartition ¶
func (s *InmemoryPartitionStorage) GetUnfinishedMinWatermarkPartition(ctx context.Context) (*screamer.PartitionMetadata, error)
GetUnfinishedMinWatermarkPartition returns the unfinished partition with the minimum watermark. Returns nil if there are no unfinished partitions.
func (*InmemoryPartitionStorage) InitializeRootPartition ¶
func (s *InmemoryPartitionStorage) InitializeRootPartition(ctx context.Context, startTimestamp time.Time, endTimestamp time.Time, heartbeatInterval time.Duration) error
InitializeRootPartition creates or updates the root partition metadata in memory.
func (*InmemoryPartitionStorage) RefreshRunner ¶ added in v0.3.1
func (s *InmemoryPartitionStorage) RefreshRunner(ctx context.Context, runnerID string) error
RefreshRunner is a no-op for in-memory storage.
func (*InmemoryPartitionStorage) UpdateToFinished ¶
func (s *InmemoryPartitionStorage) UpdateToFinished(ctx context.Context, partition *screamer.PartitionMetadata, runnerID string) error
UpdateToFinished marks the given partition as finished and sets the FinishedAt timestamp.
func (*InmemoryPartitionStorage) UpdateToRunning ¶
func (s *InmemoryPartitionStorage) UpdateToRunning(ctx context.Context, partition *screamer.PartitionMetadata) error
UpdateToRunning marks the given partition as running and sets the RunningAt timestamp.
func (*InmemoryPartitionStorage) UpdateToScheduled ¶
func (s *InmemoryPartitionStorage) UpdateToScheduled(ctx context.Context, partitions []*screamer.PartitionMetadata) error
UpdateToScheduled marks the given partitions as scheduled and sets the ScheduledAt timestamp.
func (*InmemoryPartitionStorage) UpdateWatermark ¶
func (s *InmemoryPartitionStorage) UpdateWatermark(ctx context.Context, partition *screamer.PartitionMetadata, watermark time.Time) error
UpdateWatermark updates the watermark for the given partition.
type SpannerPartitionStorage ¶
type SpannerPartitionStorage struct {
// contains filtered or unexported fields
}
SpannerPartitionStorage implements PartitionStorage that stores PartitionMetadata in Cloud Spanner.
func NewSpanner ¶
func NewSpanner(client *spanner.Client, tableName string, options ...spannerOption) *SpannerPartitionStorage
NewSpanner creates a new instance of SpannerPartitionStorage for the given Spanner client and table name. Optional spannerOption(s) can be provided to configure request priority.
func (*SpannerPartitionStorage) AddChildPartitions ¶
func (s *SpannerPartitionStorage) AddChildPartitions(ctx context.Context, parent *screamer.PartitionMetadata, r *screamer.ChildPartitionsRecord) error
AddChildPartitions adds new child partitions for a parent partition based on a ChildPartitionsRecord. Used when a partition splits or merges.
func (*SpannerPartitionStorage) GetActiveRunnerCount ¶
func (s *SpannerPartitionStorage) GetActiveRunnerCount(ctx context.Context) (int, error)
GetActiveRunnerCount returns the number of active runners that have refreshed within the last 5 seconds. Used for metrics and observability to understand cluster size.
func (*SpannerPartitionStorage) GetAndSchedulePartitions ¶ added in v0.2.0
func (s *SpannerPartitionStorage) GetAndSchedulePartitions(ctx context.Context, minWatermark time.Time, runnerID string, limit int) ([]*screamer.PartitionMetadata, error)
GetAndSchedulePartitions finds partitions ready to be scheduled and assigns them to the given runnerID. Returns the scheduled partitions. The limit parameter restricts the maximum number of partitions to schedule. Uses optimistic concurrency with randomized ordering to reduce conflicts between multiple runners.
func (*SpannerPartitionStorage) GetInterruptedPartitions ¶
func (s *SpannerPartitionStorage) GetInterruptedPartitions(ctx context.Context, runnerID string, limit int) ([]*screamer.PartitionMetadata, error)
GetInterruptedPartitions returns partitions that are scheduled or running but have lost their runner. Assigns the current runnerID to these partitions for recovery. The limit parameter restricts the maximum number of partitions to recover.
func (*SpannerPartitionStorage) GetUnfinishedMinWatermarkPartition ¶
func (s *SpannerPartitionStorage) GetUnfinishedMinWatermarkPartition(ctx context.Context) (*screamer.PartitionMetadata, error)
GetUnfinishedMinWatermarkPartition returns the unfinished partition with the minimum watermark. Returns nil if there are no unfinished partitions.
func (*SpannerPartitionStorage) InitializeRootPartition ¶
func (s *SpannerPartitionStorage) InitializeRootPartition(ctx context.Context, startTimestamp time.Time, endTimestamp time.Time, heartbeatInterval time.Duration) error
InitializeRootPartition creates or updates the root partition metadata in the table. Used to start a new change stream subscription.
func (*SpannerPartitionStorage) RefreshRunner ¶ added in v0.3.0
func (s *SpannerPartitionStorage) RefreshRunner(ctx context.Context, runnerID string) error
RefreshRunner updates the UpdatedAt timestamp for the given runnerID in the Runner table. Used to indicate liveness of a runner.
func (*SpannerPartitionStorage) RegisterRunner ¶ added in v0.3.0
func (s *SpannerPartitionStorage) RegisterRunner(ctx context.Context, runnerID string) error
RegisterRunner registers a runner in the Runner table with the given runnerID. Used for distributed lock and partition assignment.
func (*SpannerPartitionStorage) RunMigrations ¶ added in v0.3.0
func (s *SpannerPartitionStorage) RunMigrations(ctx context.Context) error
RunMigrations creates or updates the necessary Spanner tables and indexes for partition and runner metadata management. It is idempotent and can be safely called multiple times.
func (*SpannerPartitionStorage) UpdateToFinished ¶
func (s *SpannerPartitionStorage) UpdateToFinished(ctx context.Context, partition *screamer.PartitionMetadata, runnerID string) error
UpdateToFinished marks the given partition as finished and sets the FinishedAt timestamp.
func (*SpannerPartitionStorage) UpdateToRunning ¶
func (s *SpannerPartitionStorage) UpdateToRunning(ctx context.Context, partition *screamer.PartitionMetadata) error
UpdateToRunning marks the given partition as running and sets the RunningAt timestamp.
func (*SpannerPartitionStorage) UpdateWatermark ¶
func (s *SpannerPartitionStorage) UpdateWatermark(ctx context.Context, partition *screamer.PartitionMetadata, watermark time.Time) error
UpdateWatermark updates the watermark for the given partition.