replica

package
v1.31.21 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 19, 2025 License: BSD-3-Clause Imports: 30 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// RequestKey is used to marshalling request IDs
	RequestKey       = "request_id"
	SchemaVersionKey = "schema_version"
)
View Source
const (
	StatusOK            = 0
	StatusClassNotFound = iota + 200
	StatusShardNotFound
	StatusNotFound
	StatusAlreadyExisted
	StatusNotReady
	StatusConflict = iota + 300
	StatusPreconditionFailed
	StatusReadOnly
	StatusObjectNotFound
)

Variables

View Source
var (
	// MsgCLevel consistency level cannot be achieved
	MsgCLevel = "cannot achieve consistency level"

	ErrReplicas = errors.New("cannot reach enough replicas")
	ErrRepair   = errors.New("read repair error")
	ErrRead     = errors.New("read error")

	ErrNoDiffFound = errors.New("no diff found")
)
View Source
var (
	ErrConflictExistOrDeleted = errors.New("conflict: object has been deleted on another replica")

	// ErrConflictObjectChanged object changed since last time and cannot be repaired
	ErrConflictObjectChanged = errors.New("source object changed during repair")
)

Functions

func StatusText added in v1.31.1

func StatusText(code StatusCode) string

StatusText returns a text for the status code. It returns the empty string if the code is unknown.

func ValidateConfig

func ValidateConfig(class *models.Class, globalCfg replication.GlobalConfig) error

func ValidateConfigUpdate

func ValidateConfigUpdate(old, updated *models.Class, nodeCounter nodeCounter) error

Types

type BatchReply added in v1.31.1

type BatchReply struct {
	// Sender hostname of the Sender
	Sender string
	// IsDigest is this reply from a digest read?
	IsDigest bool
	// FullData returned from a full read request
	FullData []Replica
	// DigestData returned from a digest read request
	DigestData []types.RepairResponse
}

BatchReply is a container of the batch received from a replica The returned data may result from a full or digest read request

func (BatchReply) UpdateTimeAt added in v1.31.1

func (r BatchReply) UpdateTimeAt(idx int) int64

UpdateTimeAt gets update time from reply

type BoolTuple added in v1.31.1

type BoolTuple tuple[types.RepairResponse]

type Client

type Client interface {
	RClient
	WClient
}

Client is used to read and write objects on replicas

type DeleteBatchResponse

type DeleteBatchResponse struct {
	Batch []UUID2Error `json:"batch,omitempty"`
}

DeleteBatchResponse represents the response returned by DeleteObjects

func (*DeleteBatchResponse) FirstError

func (r *DeleteBatchResponse) FirstError() error

FirstError returns the first found error

type DigestObjectsInRangeReq added in v1.28.5

type DigestObjectsInRangeReq struct {
	InitialUUID strfmt.UUID `json:"initialUUID,omitempty"`
	FinalUUID   strfmt.UUID `json:"finalUUID,omitempty"`
	Limit       int         `json:"limit,omitempty"`
}

type DigestObjectsInRangeResp added in v1.28.5

type DigestObjectsInRangeResp struct {
	Digests []types.RepairResponse `json:"digests,omitempty"`
}

type Error

type Error struct {
	Code StatusCode `json:"code"`
	Msg  string     `json:"msg,omitempty"`
	Err  error      `json:"-"`
}

Error reports error happening during replication

func NewError

func NewError(code StatusCode, msg string) *Error

NewError create new replication error

func (*Error) Clone

func (e *Error) Clone() *Error

func (*Error) Empty

func (e *Error) Empty() bool

Empty checks whether e is an empty error which equivalent to e == nil

func (*Error) Error

func (e *Error) Error() string

func (*Error) IsStatusCode

func (e *Error) IsStatusCode(sc StatusCode) bool

func (*Error) Timeout

func (e *Error) Timeout() bool

func (*Error) Unwrap

func (e *Error) Unwrap() error

Unwrap underlying error

type Finder

type Finder struct {
	// contains filtered or unexported fields
}

Finder finds replicated objects

func NewFinder

func NewFinder(className string,
	router router,
	nodeName string,
	client RClient,
	metrics *Metrics,
	l logrus.FieldLogger,
	coordinatorPullBackoffInitialInterval time.Duration,
	coordinatorPullBackoffMaxElapsedTime time.Duration,
	getDeletionStrategy func() string,
) *Finder

NewFinder constructs a new finder instance

func (*Finder) CheckConsistency added in v1.19.0

func (f *Finder) CheckConsistency(ctx context.Context,
	l types.ConsistencyLevel, xs []*storobj.Object,
) error

CheckConsistency for objects belonging to different physical shards.

For each x in xs the fields BelongsToNode and BelongsToShard must be set non empty

func (*Finder) CollectShardDifferences added in v1.26.0

func (f *Finder) CollectShardDifferences(ctx context.Context,
	shardName string, ht hashtree.AggregatedHashTree, diffTimeoutPerNode time.Duration,
	targetNodeOverrides []additional.AsyncReplicationTargetNodeOverride,
) (diffReader *ShardDifferenceReader, err error)

CollectShardDifferences collects the differences between the local node and the target nodes. It returns a ShardDifferenceReader that contains the differences and the target node name/address. If no differences are found, it returns ErrNoDiffFound. When ErrNoDiffFound is returned as the error, the returned *ShardDifferenceReader may exist and have some (but not all) of its fields set.

func (*Finder) DigestObjectsInRange added in v1.28.5

func (f *Finder) DigestObjectsInRange(ctx context.Context,
	shardName string, host string, initialUUID, finalUUID strfmt.UUID, limit int,
) (ds []types.RepairResponse, err error)

func (*Finder) Exists added in v1.18.0

func (f *Finder) Exists(ctx context.Context,
	l types.ConsistencyLevel,
	shard string,
	id strfmt.UUID,
) (bool, error)

Exists checks if an object exists which satisfies the giving consistency

func (*Finder) FindUUIDs added in v1.24.18

func (f *Finder) FindUUIDs(ctx context.Context,
	className, shard string, filters *filters.LocalFilter, l types.ConsistencyLevel,
) (uuids []strfmt.UUID, err error)

func (*Finder) GetOne added in v1.18.0

GetOne gets object which satisfies the giving consistency

func (*Finder) LocalNodeName added in v1.31.0

func (f *Finder) LocalNodeName() string

func (*Finder) NodeObject

func (f *Finder) NodeObject(ctx context.Context,
	nodeName,
	shard string,
	id strfmt.UUID,
	props search.SelectProperties, adds additional.Properties,
) (*storobj.Object, error)

NodeObject gets object from a specific node. it is used mainly for debugging purposes

func (*Finder) Overwrite added in v1.26.0

func (f *Finder) Overwrite(ctx context.Context,
	host, index, shard string, xs []*objects.VObject,
) ([]types.RepairResponse, error)

Overwrite specified object with most recent contents

type FinderClient added in v1.31.1

type FinderClient struct {
	// contains filtered or unexported fields
}

FinderClient extends RClient with consistency checks

func (FinderClient) DigestObjectsInRange added in v1.31.1

func (fc FinderClient) DigestObjectsInRange(ctx context.Context,
	host, index, shard string,
	initialUUID, finalUUID strfmt.UUID, limit int,
) ([]types.RepairResponse, error)

func (FinderClient) DigestReads added in v1.31.1

func (fc FinderClient) DigestReads(ctx context.Context,
	host, index, shard string,
	ids []strfmt.UUID, numRetries int,
) ([]types.RepairResponse, error)

DigestReads reads digests of all specified objects

func (FinderClient) FindUUIDs added in v1.31.1

func (fc FinderClient) FindUUIDs(ctx context.Context,
	host, class, shard string, filters *filters.LocalFilter,
) ([]strfmt.UUID, error)

func (FinderClient) FullRead added in v1.31.1

func (fc FinderClient) FullRead(ctx context.Context,
	host, index, shard string,
	id strfmt.UUID,
	props search.SelectProperties,
	additional additional.Properties,
	numRetries int,
) (Replica, error)

FullRead reads full object

func (FinderClient) FullReads added in v1.31.1

func (fc FinderClient) FullReads(ctx context.Context,
	host, index, shard string,
	ids []strfmt.UUID,
) ([]Replica, error)

FullReads read full objects

func (FinderClient) HashTreeLevel added in v1.31.1

func (fc FinderClient) HashTreeLevel(ctx context.Context,
	host, index, shard string, level int, discriminant *hashtree.Bitset,
) (digests []hashtree.Digest, err error)

func (FinderClient) Overwrite added in v1.31.1

func (fc FinderClient) Overwrite(ctx context.Context,
	host, index, shard string,
	xs []*objects.VObject,
) ([]types.RepairResponse, error)

Overwrite specified object with most recent contents

type IndexedBatch added in v1.31.1

type IndexedBatch struct {
	Data []*storobj.Object
	// Index is z-index used to maintain object's order
	Index []int
}

IndexedBatch holds an indexed list of objects

type Metrics added in v1.31.16

type Metrics struct {
	// contains filtered or unexported fields
}

func NewMetrics added in v1.31.16

func NewMetrics(prom *monitoring.PrometheusMetrics) (*Metrics, error)

func (*Metrics) IncReadRepairCount added in v1.31.16

func (m *Metrics) IncReadRepairCount()

func (*Metrics) IncReadRepairFailure added in v1.31.16

func (m *Metrics) IncReadRepairFailure()

func (*Metrics) IncReadsFailed added in v1.31.16

func (m *Metrics) IncReadsFailed()

func (*Metrics) IncReadsSucceedAll added in v1.31.16

func (m *Metrics) IncReadsSucceedAll()

func (*Metrics) IncReadsSucceedSome added in v1.31.16

func (m *Metrics) IncReadsSucceedSome()

func (*Metrics) IncWritesFailed added in v1.31.16

func (m *Metrics) IncWritesFailed()

func (*Metrics) IncWritesSucceedAll added in v1.31.16

func (m *Metrics) IncWritesSucceedAll()

func (*Metrics) IncWritesSucceedSome added in v1.31.16

func (m *Metrics) IncWritesSucceedSome()

func (*Metrics) ObserveReadDuration added in v1.31.16

func (m *Metrics) ObserveReadDuration(d time.Duration)

func (*Metrics) ObserveReadRepairDuration added in v1.31.16

func (m *Metrics) ObserveReadRepairDuration(d time.Duration)

func (*Metrics) ObserveWriteDuration added in v1.31.16

func (m *Metrics) ObserveWriteDuration(d time.Duration)

type ObjResult added in v1.31.1

type ObjResult = _Result[*storobj.Object]

type ObjTuple added in v1.31.1

type ObjTuple tuple[Replica]

type RClient

type RClient interface {
	// FetchObject fetches one object
	FetchObject(_ context.Context, host, index, shard string,
		id strfmt.UUID, props search.SelectProperties,
		additional additional.Properties, numRetries int) (Replica, error)

	// FetchObjects fetches objects specified in ids list.
	FetchObjects(_ context.Context, host, index, shard string,
		ids []strfmt.UUID) ([]Replica, error)

	// OverwriteObjects conditionally updates existing objects.
	OverwriteObjects(_ context.Context, host, index, shard string,
		_ []*objects.VObject) ([]types.RepairResponse, error)

	// DigestObjects finds a list of objects and returns a compact representation
	// of a list of the objects. This is used by the replicator to optimize the
	// number of bytes transferred over the network when fetching a replicated
	// object
	DigestObjects(ctx context.Context, host, index, shard string,
		ids []strfmt.UUID, numRetries int) ([]types.RepairResponse, error)

	FindUUIDs(ctx context.Context, host, index, shard string,
		filters *filters.LocalFilter) ([]strfmt.UUID, error)

	DigestObjectsInRange(ctx context.Context, host, index, shard string,
		initialUUID, finalUUID strfmt.UUID, limit int) ([]types.RepairResponse, error)

	HashTreeLevel(ctx context.Context, host, index, shard string, level int,
		discriminant *hashtree.Bitset) (digests []hashtree.Digest, err error)
}

RClient is the client used to read from remote replicas

type RemoteIncomingRepo added in v1.19.0

type RemoteIncomingRepo interface {
	GetIndexForIncomingReplica(className schema.ClassName) RemoteIndexIncomingRepo
}

type RemoteIncomingSchema added in v1.25.0

type RemoteIncomingSchema interface {
	// WaitForUpdate ensures that the local schema has caught up to schemaVersion
	WaitForUpdate(ctx context.Context, schemaVersion uint64) error
}

type RemoteIndexIncomingRepo added in v1.25.0

type RemoteIndexIncomingRepo interface {
	// Write endpoints
	ReplicateObject(ctx context.Context, shardName, requestID string, object *storobj.Object) SimpleResponse
	ReplicateObjects(ctx context.Context, shardName, requestID string, objects []*storobj.Object, schemaVersion uint64) SimpleResponse
	ReplicateUpdate(ctx context.Context, shardName, requestID string, mergeDoc *objects.MergeDocument) SimpleResponse
	ReplicateDeletion(ctx context.Context, shardName, requestID string, uuid strfmt.UUID, deletionTime time.Time) SimpleResponse
	ReplicateDeletions(ctx context.Context, shardName, requestID string, uuids []strfmt.UUID, deletionTime time.Time, dryRun bool, schemaVersion uint64) SimpleResponse
	ReplicateReferences(ctx context.Context, shardName, requestID string, refs []objects.BatchReference) SimpleResponse
	CommitReplication(shardName, requestID string) interface{}
	AbortReplication(shardName, requestID string) interface{}
	OverwriteObjects(ctx context.Context, shard string, vobjects []*objects.VObject) ([]types.RepairResponse, error)
	// Read endpoints
	FetchObject(ctx context.Context, shardName string, id strfmt.UUID) (Replica, error)
	FetchObjects(ctx context.Context, shardName string, ids []strfmt.UUID) ([]Replica, error)
	DigestObjects(ctx context.Context, shardName string, ids []strfmt.UUID) (result []types.RepairResponse, err error)
	DigestObjectsInRange(ctx context.Context, shardName string,
		initialUUID, finalUUID strfmt.UUID, limit int) (result []types.RepairResponse, err error)
	HashTreeLevel(ctx context.Context, shardName string,
		level int, discriminant *hashtree.Bitset) (digests []hashtree.Digest, err error)
}

type RemoteReplicaIncoming

type RemoteReplicaIncoming struct {
	// contains filtered or unexported fields
}

func (*RemoteReplicaIncoming) AbortReplication

func (rri *RemoteReplicaIncoming) AbortReplication(indexName,
	shardName, requestID string,
) interface{}

func (*RemoteReplicaIncoming) CommitReplication

func (rri *RemoteReplicaIncoming) CommitReplication(indexName,
	shardName, requestID string,
) interface{}

func (*RemoteReplicaIncoming) DigestObjects added in v1.18.0

func (rri *RemoteReplicaIncoming) DigestObjects(ctx context.Context,
	indexName, shardName string, ids []strfmt.UUID,
) (result []types.RepairResponse, err error)

func (*RemoteReplicaIncoming) DigestObjectsInRange added in v1.28.5

func (rri *RemoteReplicaIncoming) DigestObjectsInRange(ctx context.Context,
	indexName, shardName string, initialUUID, finalUUID strfmt.UUID, limit int,
) (result []types.RepairResponse, err error)

func (*RemoteReplicaIncoming) FetchObject added in v1.18.0

func (rri *RemoteReplicaIncoming) FetchObject(ctx context.Context,
	indexName, shardName string, id strfmt.UUID,
) (Replica, error)

func (*RemoteReplicaIncoming) FetchObjects added in v1.18.0

func (rri *RemoteReplicaIncoming) FetchObjects(ctx context.Context,
	indexName, shardName string, ids []strfmt.UUID,
) ([]Replica, error)

func (*RemoteReplicaIncoming) HashTreeLevel added in v1.26.0

func (rri *RemoteReplicaIncoming) HashTreeLevel(ctx context.Context,
	indexName, shardName string, level int, discriminant *hashtree.Bitset,
) (digests []hashtree.Digest, err error)

func (*RemoteReplicaIncoming) OverwriteObjects added in v1.18.0

func (rri *RemoteReplicaIncoming) OverwriteObjects(ctx context.Context,
	indexName, shardName string, vobjects []*objects.VObject,
) ([]types.RepairResponse, error)

func (*RemoteReplicaIncoming) ReplicateDeletion

func (rri *RemoteReplicaIncoming) ReplicateDeletion(ctx context.Context, indexName,
	shardName, requestID string, uuid strfmt.UUID, deletionTime time.Time, schemaVersion uint64,
) SimpleResponse

func (*RemoteReplicaIncoming) ReplicateDeletions

func (rri *RemoteReplicaIncoming) ReplicateDeletions(ctx context.Context, indexName,
	shardName, requestID string, uuids []strfmt.UUID, deletionTime time.Time, dryRun bool, schemaVersion uint64,
) SimpleResponse

func (*RemoteReplicaIncoming) ReplicateObject

func (rri *RemoteReplicaIncoming) ReplicateObject(ctx context.Context, indexName,
	shardName, requestID string, object *storobj.Object, schemaVersion uint64,
) SimpleResponse

func (*RemoteReplicaIncoming) ReplicateObjects

func (rri *RemoteReplicaIncoming) ReplicateObjects(ctx context.Context, indexName,
	shardName, requestID string, objects []*storobj.Object, schemaVersion uint64,
) SimpleResponse

func (*RemoteReplicaIncoming) ReplicateReferences

func (rri *RemoteReplicaIncoming) ReplicateReferences(ctx context.Context, indexName,
	shardName, requestID string, refs []objects.BatchReference, schemaVersion uint64,
) SimpleResponse

func (*RemoteReplicaIncoming) ReplicateUpdate

func (rri *RemoteReplicaIncoming) ReplicateUpdate(ctx context.Context, indexName,
	shardName, requestID string, mergeDoc *objects.MergeDocument, schemaVersion uint64,
) SimpleResponse

type Replica added in v1.31.1

type Replica struct {
	ID                      strfmt.UUID     `json:"id,omitempty"`
	Deleted                 bool            `json:"deleted"`
	Object                  *storobj.Object `json:"object,omitempty"`
	LastUpdateTimeUnixMilli int64           `json:"lastUpdateTimeUnixMilli"`
}

Replica represents a replicated data item

func (*Replica) MarshalBinary added in v1.31.1

func (r *Replica) MarshalBinary() ([]byte, error)

func (*Replica) UnmarshalBinary added in v1.31.1

func (r *Replica) UnmarshalBinary(data []byte) error

func (Replica) UpdateTime added in v1.31.1

func (r Replica) UpdateTime() int64

UpdateTime return update time if it exists and 0 otherwise

type Replicas added in v1.31.1

type Replicas []Replica

func (Replicas) MarshalBinary added in v1.31.1

func (ro Replicas) MarshalBinary() ([]byte, error)

func (*Replicas) UnmarshalBinary added in v1.31.1

func (ro *Replicas) UnmarshalBinary(data []byte) error

type Replicator

type Replicator struct {
	*Finder
	// contains filtered or unexported fields
}

func NewReplicator

func NewReplicator(className string,
	router router,
	nodeName string,
	getDeletionStrategy func() string,
	client Client,
	promMetrics *monitoring.PrometheusMetrics,
	l logrus.FieldLogger,
) (*Replicator, error)

func (*Replicator) AddReferences

func (r *Replicator) AddReferences(ctx context.Context,
	shard string,
	refs []objects.BatchReference,
	l types.ConsistencyLevel,
	schemaVersion uint64,
) []error

func (*Replicator) AllHostnames added in v1.26.0

func (r *Replicator) AllHostnames() []string

func (*Replicator) DeleteObject

func (r *Replicator) DeleteObject(ctx context.Context,
	shard string,
	id strfmt.UUID,
	deletionTime time.Time,
	l types.ConsistencyLevel,
	schemaVersion uint64,
) error

func (*Replicator) DeleteObjects

func (r *Replicator) DeleteObjects(ctx context.Context,
	shard string,
	uuids []strfmt.UUID,
	deletionTime time.Time,
	dryRun bool,
	l types.ConsistencyLevel,
	schemaVersion uint64,
) []objects.BatchSimpleObject

func (*Replicator) MergeObject

func (r *Replicator) MergeObject(ctx context.Context,
	shard string,
	doc *objects.MergeDocument,
	l types.ConsistencyLevel,
	schemaVersion uint64,
) error

func (*Replicator) PutObject

func (r *Replicator) PutObject(ctx context.Context,
	shard string,
	obj *storobj.Object,
	l types.ConsistencyLevel,
	schemaVersion uint64,
) error

func (*Replicator) PutObjects

func (r *Replicator) PutObjects(ctx context.Context,
	shard string,
	objs []*storobj.Object,
	l types.ConsistencyLevel,
	schemaVersion uint64,
) []error

type ShardDesc added in v1.19.0

type ShardDesc struct {
	Name string
	Node string
}

type ShardDifferenceReader added in v1.26.0

type ShardDifferenceReader struct {
	TargetNodeName    string
	TargetNodeAddress string
	RangeReader       hashtree.AggregatedHashTreeRangeReader
}

type ShardPart added in v1.31.1

type ShardPart struct {
	Shard string // one-to-one mapping between Shard and Node
	Node  string

	Data  []*storobj.Object
	Index []int // index for data
}

ShardPart represents a data partition belonging to a physical shard

func (*ShardPart) Extract added in v1.31.1

func (b *ShardPart) Extract() ([]Replica, []strfmt.UUID)

func (*ShardPart) ObjectIDs added in v1.31.1

func (b *ShardPart) ObjectIDs() []strfmt.UUID

type SimpleResponse

type SimpleResponse struct {
	Errors []Error `json:"errors,omitempty"`
}

func (*SimpleResponse) FirstError

func (r *SimpleResponse) FirstError() error

type StatusCode

type StatusCode int

StatusCode is communicate the cause of failure during replication

type UUID2Error

type UUID2Error struct {
	UUID  string `json:"uuid,omitempty"`
	Error Error  `json:"error,omitempty"`
}

type Vote added in v1.31.1

type Vote struct {
	BatchReply       // reply from a replica
	Count      []int // number of votes per object
	Err        error
}

Vote represents objects received from a specific replica and the number of votes per object.

type WClient added in v1.18.0

type WClient interface {
	PutObject(ctx context.Context, host, index, shard, requestID string,
		obj *storobj.Object, schemaVersion uint64) (SimpleResponse, error)
	DeleteObject(ctx context.Context, host, index, shard, requestID string,
		id strfmt.UUID, deletionTime time.Time, schemaVersion uint64) (SimpleResponse, error)
	PutObjects(ctx context.Context, host, index, shard, requestID string,
		objs []*storobj.Object, schemaVersion uint64) (SimpleResponse, error)
	MergeObject(ctx context.Context, host, index, shard, requestID string,
		mergeDoc *objects.MergeDocument, schemaVersion uint64) (SimpleResponse, error)
	DeleteObjects(ctx context.Context, host, index, shard, requestID string,
		uuids []strfmt.UUID, deletionTime time.Time, dryRun bool, schemaVersion uint64) (SimpleResponse, error)
	AddReferences(ctx context.Context, host, index, shard, requestID string,
		refs []objects.BatchReference, schemaVersion uint64) (SimpleResponse, error)
	Commit(ctx context.Context, host, index, shard, requestID string, resp interface{}) error
	Abort(ctx context.Context, host, index, shard, requestID string) (SimpleResponse, error)
}

WClient is the client used to write to replicas

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL