lifecycle

package
v0.10.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 26, 2026 License: Apache-2.0 Imports: 56 Imported by: 0

Documentation

Overview

Package lifecycle provides the lifecycle migration service.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewCommand

func NewCommand() *cobra.Command

NewCommand creates a new lifecycle command.

func NewService

func NewService(meta metadata.Repo) run.Unit

NewService creates a new lifecycle service.

Types

type GroupConfig added in v0.9.0

type GroupConfig struct {
	*commonv1.Group
	NodeSelector    node.Selector
	QueueClient     queue.Client
	AccumulatedTTL  *commonv1.IntervalRule
	SegmentInterval *commonv1.IntervalRule
	TargetShardNum  uint32
	TargetReplicas  uint32
}

GroupConfig encapsulates the parsed lifecycle configuration for a Group. It contains all necessary information for migration and deletion operations.

func (*GroupConfig) Close added in v0.9.0

func (gc *GroupConfig) Close()

Close releases resources held by the GroupConfig.

type Progress

type Progress struct {
	CompletedGroups             map[string]bool                                            `json:"completed_groups"`
	DeletedStreamGroups         map[string]bool                                            `json:"deleted_stream_groups"`
	DeletedMeasureGroups        map[string]bool                                            `json:"deleted_measure_groups"`
	DeletedTraceGroups          map[string]bool                                            `json:"deleted_trace_groups"`
	CompletedStreamParts        map[string]map[string]map[common.ShardID]map[uint64]bool   `json:"completed_stream_parts"`
	StreamPartErrors            map[string]map[string]map[common.ShardID]map[uint64]string `json:"stream_part_errors"`
	CompletedStreamSeries       map[string]map[string]map[common.ShardID]bool              `json:"completed_stream_series"`
	StreamSeriesErrors          map[string]map[string]map[common.ShardID]string            `json:"stream_series_errors"`
	CompletedStreamElementIndex map[string]map[string]map[common.ShardID]bool              `json:"completed_stream_element_index"`
	StreamElementIndexErrors    map[string]map[string]map[common.ShardID]string            `json:"stream_element_index_errors"`
	StreamPartCounts            map[string]int                                             `json:"stream_part_counts"`
	StreamPartProgress          map[string]int                                             `json:"stream_part_progress"`
	StreamSeriesCounts          map[string]int                                             `json:"stream_series_counts"`
	StreamSeriesProgress        map[string]int                                             `json:"stream_series_progress"`
	StreamElementIndexCounts    map[string]int                                             `json:"stream_element_index_counts"`
	StreamElementIndexProgress  map[string]int                                             `json:"stream_element_index_progress"`
	// Measure part-specific progress tracking
	CompletedMeasureParts  map[string]map[string]map[common.ShardID]map[uint64]bool   `json:"completed_measure_parts"`
	MeasurePartErrors      map[string]map[string]map[common.ShardID]map[uint64]string `json:"measure_part_errors"`
	MeasurePartCounts      map[string]int                                             `json:"measure_part_counts"`
	MeasurePartProgress    map[string]int                                             `json:"measure_part_progress"`
	CompletedMeasureSeries map[string]map[string]map[common.ShardID]bool              `json:"completed_measure_series"`
	MeasureSeriesErrors    map[string]map[string]map[common.ShardID]string            `json:"measure_series_errors"`
	MeasureSeriesCounts    map[string]int                                             `json:"measure_series_counts"`
	MeasureSeriesProgress  map[string]int                                             `json:"measure_series_progress"`
	// Trace part-specific progress tracking
	CompletedTraceShards map[string]map[string]map[common.ShardID]bool   `json:"completed_trace_shards"`
	TraceShardErrors     map[string]map[string]map[common.ShardID]string `json:"trace_shard_errors"`
	TraceShardCounts     map[string]int                                  `json:"trace_shard_counts"`
	TraceShardProgress   map[string]int                                  `json:"trace_shard_progress"`
	CompletedTraceSeries map[string]map[string]map[common.ShardID]bool   `json:"completed_trace_series"`
	TraceSeriesErrors    map[string]map[string]map[common.ShardID]string `json:"trace_series_errors"`
	TraceSeriesCounts    map[string]int                                  `json:"trace_series_counts"`
	TraceSeriesProgress  map[string]int                                  `json:"trace_series_progress"`

	SnapshotStreamDir  string `json:"snapshot_stream_dir"`
	SnapshotMeasureDir string `json:"snapshot_measure_dir"`
	SnapshotTraceDir   string `json:"snapshot_trace_dir"`
	// contains filtered or unexported fields
}

Progress tracks the lifecycle migration progress to support resume after crash.

func LoadProgress

func LoadProgress(path string, l *logger.Logger) *Progress

LoadProgress loads progress from a file if it exists.

func NewProgress

func NewProgress(path string, l *logger.Logger) *Progress

NewProgress creates a new Progress tracker.

func (*Progress) AllGroupsNotFullyCompleted added in v0.9.0

func (p *Progress) AllGroupsNotFullyCompleted(groups []*commonv1.Group) []string

AllGroupsNotFullyCompleted find is there have any group not fully completed.

func (*Progress) ClearErrors added in v0.9.0

func (p *Progress) ClearErrors()

ClearErrors clears all errors for a specific group.

func (*Progress) ClearStreamSeriesErrors added in v0.9.0

func (p *Progress) ClearStreamSeriesErrors(group string)

ClearStreamSeriesErrors clears all errors for a specific stream series.

func (*Progress) GetMeasurePartCount added in v0.9.0

func (p *Progress) GetMeasurePartCount(group string) int

GetMeasurePartCount returns the total number of parts for a measure.

func (*Progress) GetMeasurePartProgress added in v0.9.0

func (p *Progress) GetMeasurePartProgress(group string) int

GetMeasurePartProgress returns the number of completed parts for a measure.

func (*Progress) GetMeasureSeriesCount added in v0.9.0

func (p *Progress) GetMeasureSeriesCount(group string) int

GetMeasureSeriesCount returns the total number of series segments for a measure.

func (*Progress) GetMeasureSeriesProgress added in v0.9.0

func (p *Progress) GetMeasureSeriesProgress(group string) int

GetMeasureSeriesProgress returns the number of completed series segments for a measure.

func (*Progress) GetStreamElementIndexCount added in v0.9.0

func (p *Progress) GetStreamElementIndexCount(group string) int

GetStreamElementIndexCount returns the total number of element index files for a stream.

func (*Progress) GetStreamElementIndexProgress added in v0.9.0

func (p *Progress) GetStreamElementIndexProgress(group string) int

GetStreamElementIndexProgress returns the number of completed element index files for a stream.

func (*Progress) GetStreamPartCount added in v0.9.0

func (p *Progress) GetStreamPartCount(group string) int

GetStreamPartCount returns the total number of parts for a stream.

func (*Progress) GetStreamPartProgress added in v0.9.0

func (p *Progress) GetStreamPartProgress(group string) int

GetStreamPartProgress returns the number of completed parts for a stream.

func (*Progress) GetStreamSeriesCount added in v0.9.0

func (p *Progress) GetStreamSeriesCount(group string) int

GetStreamSeriesCount returns the total number of series segments for a stream.

func (*Progress) GetStreamSeriesErrors added in v0.9.0

func (p *Progress) GetStreamSeriesErrors(group string) map[string]map[common.ShardID]string

GetStreamSeriesErrors returns all errors for a specific stream series.

func (*Progress) GetStreamSeriesProgress added in v0.9.0

func (p *Progress) GetStreamSeriesProgress(group string) int

GetStreamSeriesProgress returns the number of completed series segments for a stream.

func (*Progress) GetTraceSeriesCount added in v0.10.0

func (p *Progress) GetTraceSeriesCount(group string) int

GetTraceSeriesCount gets the total number of series segments for the current trace.

func (*Progress) GetTraceSeriesProgress added in v0.10.0

func (p *Progress) GetTraceSeriesProgress(group string) int

GetTraceSeriesProgress gets the number of completed series segments for the current trace.

func (*Progress) GetTraceShardProgress added in v0.10.0

func (p *Progress) GetTraceShardProgress(group string) int

GetTraceShardProgress gets the number of completed shards for the current trace.

func (*Progress) GetTraceShards added in v0.10.0

func (p *Progress) GetTraceShards(group string) int

GetTraceShards gets the total number of shards for the current trace.

func (*Progress) IsGroupCompleted

func (p *Progress) IsGroupCompleted(group string) bool

IsGroupCompleted checks if a group has been completed.

func (*Progress) IsMeasureGroupDeleted

func (p *Progress) IsMeasureGroupDeleted(group string) bool

IsMeasureGroupDeleted checks if a measure group segments have been deleted.

func (*Progress) IsMeasurePartCompleted added in v0.9.0

func (p *Progress) IsMeasurePartCompleted(group string, segmentID string, shardID common.ShardID, partID uint64) bool

IsMeasurePartCompleted checks if a specific part of a measure has been completed.

func (*Progress) IsMeasureSeriesCompleted added in v0.9.0

func (p *Progress) IsMeasureSeriesCompleted(group string, segmentID string, shardID common.ShardID) bool

IsMeasureSeriesCompleted checks if a specific series segment of a measure has been completed.

func (*Progress) IsStreamElementIndexCompleted added in v0.9.0

func (p *Progress) IsStreamElementIndexCompleted(group string, segmentID string, shardID common.ShardID) bool

IsStreamElementIndexCompleted checks if a specific element index file of a stream has been completed.

func (*Progress) IsStreamGroupDeleted

func (p *Progress) IsStreamGroupDeleted(group string) bool

IsStreamGroupDeleted checks if a stream group segments have been deleted.

func (*Progress) IsStreamPartCompleted added in v0.9.0

func (p *Progress) IsStreamPartCompleted(group string, segmentID string, shardID common.ShardID, partID uint64) bool

IsStreamPartCompleted checks if a specific part of a stream has been completed.

func (*Progress) IsStreamSeriesCompleted added in v0.9.0

func (p *Progress) IsStreamSeriesCompleted(group string, segmentID string, shardID common.ShardID) bool

IsStreamSeriesCompleted checks if a specific series segment of a stream has been completed.

func (*Progress) IsTraceGroupDeleted added in v0.10.0

func (p *Progress) IsTraceGroupDeleted(group string) bool

IsTraceGroupDeleted checks if a trace group has been deleted.

func (*Progress) IsTraceSeriesCompleted added in v0.10.0

func (p *Progress) IsTraceSeriesCompleted(group string, segmentID string, shardID common.ShardID) bool

IsTraceSeriesCompleted checks if a specific series segment of a trace has been completed.

func (*Progress) IsTraceShardCompleted added in v0.10.0

func (p *Progress) IsTraceShardCompleted(group string, segmentID string, shardID common.ShardID) bool

IsTraceShardCompleted checks if a specific part of a trace has been completed.

func (*Progress) MarkGroupCompleted

func (p *Progress) MarkGroupCompleted(group string)

MarkGroupCompleted marks a group as completed.

func (*Progress) MarkMeasureGroupDeleted

func (p *Progress) MarkMeasureGroupDeleted(group string)

MarkMeasureGroupDeleted marks a measure group segments as deleted.

func (*Progress) MarkMeasurePartCompleted added in v0.9.0

func (p *Progress) MarkMeasurePartCompleted(group string, segmentID string, shardID common.ShardID, partID uint64)

MarkMeasurePartCompleted marks a specific part of a measure as completed.

func (*Progress) MarkMeasurePartError added in v0.9.0

func (p *Progress) MarkMeasurePartError(group string, segmentID string, shardID common.ShardID, partID uint64, errorMsg string)

MarkMeasurePartError records an error for a specific part of a measure.

func (*Progress) MarkMeasureSeriesCompleted added in v0.9.0

func (p *Progress) MarkMeasureSeriesCompleted(group string, segmentID string, shardID common.ShardID)

MarkMeasureSeriesCompleted marks a specific series segment of a measure as completed.

func (*Progress) MarkMeasureSeriesError added in v0.9.0

func (p *Progress) MarkMeasureSeriesError(group string, segmentID string, shardID common.ShardID, errorMsg string)

MarkMeasureSeriesError records an error for a specific series segment of a measure.

func (*Progress) MarkStreamElementIndexCompleted added in v0.9.0

func (p *Progress) MarkStreamElementIndexCompleted(group string, segmentID string, shardID common.ShardID)

MarkStreamElementIndexCompleted marks a specific element index file of a stream as completed.

func (*Progress) MarkStreamElementIndexError added in v0.9.0

func (p *Progress) MarkStreamElementIndexError(group string, segmentID string, shardID common.ShardID, errorMsg string)

MarkStreamElementIndexError records an error for a specific element index file of a stream.

func (*Progress) MarkStreamGroupDeleted

func (p *Progress) MarkStreamGroupDeleted(group string)

MarkStreamGroupDeleted marks a stream group segments as deleted.

func (*Progress) MarkStreamPartCompleted added in v0.9.0

func (p *Progress) MarkStreamPartCompleted(group string, segmentID string, shardID common.ShardID, partID uint64)

MarkStreamPartCompleted marks a specific part of a stream as completed.

func (*Progress) MarkStreamPartError added in v0.9.0

func (p *Progress) MarkStreamPartError(group string, segmentID string, shardID common.ShardID, partID uint64, errorMsg string)

MarkStreamPartError records an error for a specific part of a stream.

func (*Progress) MarkStreamSeriesCompleted added in v0.9.0

func (p *Progress) MarkStreamSeriesCompleted(group string, segmentID string, shardID common.ShardID)

MarkStreamSeriesCompleted marks a specific series segment of a stream as completed.

func (*Progress) MarkStreamSeriesError added in v0.9.0

func (p *Progress) MarkStreamSeriesError(group string, segmentID string, shardID common.ShardID, errorMsg string)

MarkStreamSeriesError records an error for a specific series segment of a stream.

func (*Progress) MarkTraceGroupDeleted added in v0.10.0

func (p *Progress) MarkTraceGroupDeleted(group string)

MarkTraceGroupDeleted marks a trace group as deleted.

func (*Progress) MarkTraceSeriesCompleted added in v0.10.0

func (p *Progress) MarkTraceSeriesCompleted(group string, segmentID string, shardID common.ShardID)

MarkTraceSeriesCompleted marks a specific series segment of a trace as completed.

func (*Progress) MarkTraceSeriesError added in v0.10.0

func (p *Progress) MarkTraceSeriesError(group string, segmentID string, shardID common.ShardID, errorMsg string)

MarkTraceSeriesError marks an error for a specific series segment of a trace.

func (*Progress) MarkTraceShardCompleted added in v0.10.0

func (p *Progress) MarkTraceShardCompleted(group string, segmentID string, shardID common.ShardID)

MarkTraceShardCompleted marks a specific shard of a trace as completed.

func (*Progress) MarkTraceShardError added in v0.10.0

func (p *Progress) MarkTraceShardError(group string, segmentID string, shardID common.ShardID, errorMsg string)

MarkTraceShardError marks an error for a specific part of a trace.

func (*Progress) Remove

func (p *Progress) Remove(path string, l *logger.Logger)

Remove deletes the progress file.

func (*Progress) Save

func (p *Progress) Save(path string, l *logger.Logger)

Save writes the progress to the specified file.

func (*Progress) SetMeasurePartCount added in v0.9.0

func (p *Progress) SetMeasurePartCount(group string, totalParts int)

SetMeasurePartCount sets the total number of parts for a measure.

func (*Progress) SetMeasureSeriesCount added in v0.9.0

func (p *Progress) SetMeasureSeriesCount(group string, totalSegments int)

SetMeasureSeriesCount sets the total number of series segments for a measure.

func (*Progress) SetStreamElementIndexCount added in v0.9.0

func (p *Progress) SetStreamElementIndexCount(group string, totalIndexFiles int)

SetStreamElementIndexCount sets the total number of element index files for a stream.

func (*Progress) SetStreamPartCount added in v0.9.0

func (p *Progress) SetStreamPartCount(group string, totalParts int)

SetStreamPartCount sets the total number of parts for a stream.

func (*Progress) SetStreamSeriesCount added in v0.9.0

func (p *Progress) SetStreamSeriesCount(group string, totalSegments int)

SetStreamSeriesCount sets the total number of series segments for a stream.

func (*Progress) SetTraceSeriesCount added in v0.10.0

func (p *Progress) SetTraceSeriesCount(group string, totalSegments int)

SetTraceSeriesCount sets the total number of series segments for the current trace.

func (*Progress) SetTraceShardCount added in v0.10.0

func (p *Progress) SetTraceShardCount(group string, totalShards int)

SetTraceShardCount sets the total number of shards for the current trace.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL