tasks

package
v0.6.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 15, 2025 License: Apache-2.0 Imports: 12 Imported by: 0

Documentation

Index

Constants

View Source
const (
	TypeBucketCreate   = "bucket:create"
	TypeBucketDelete   = "bucket:delete"
	TypeBucketSyncTags = "bucket:sync:tags"
	TypeBucketSyncACL  = "bucket:sync:acl"

	TypeObjectSync     = "object:sync"
	TypeObjectSyncTags = "object:sync:tags"
	TypeObjectSyncACL  = "object:sync:acl"

	TypeMigrateBucketListObjects  = "migrate:bucket:list_objects"
	TypeMigrateObjCopy            = "migrate:object:copy"
	TypeMigrateObjectListVersions = "migrate:object:list_versions"
	TypeMigrateVersionedObject    = "migrate:object:copy_versioned"

	TypeConsistencyCheck             = "consistency"
	TypeConsistencyCheckListObjects  = "consistency:list_objects"
	TypeConsistencyCheckListVersions = "consistency:list_versions"

	TypeApiZeroDowntimeSwitch = "api:switch_zero_downtime"
	TypeApiSwitchWithDowntime = "api:switch_w_downtime"
)

A list of task types.

Variables

View Source
var Priority = map[string]int{
	string(QueueAPI): 200,
	string(QueueMigrateListObjectsPrefix) + ":*": 100,
	string(QueueConsistencyCheck) + ":*":         50,
	string(QueueMigrateCopyObjectPrefix) + ":*":  10,
	string(QueueEventsPrefix) + ":*":             5,
	"*":                                          1,
}

Priority defines the priority of the queues from highest to lowest.

Functions

func AllReplicationQueues added in v0.6.0

func AllReplicationQueues(id entity.UniversalReplicationID) []string

func ConsistencyCheckQueue added in v0.6.0

func ConsistencyCheckQueue(id entity.ConsistencyCheckID) string

func EventMigrationQueues added in v0.6.0

func EventMigrationQueues(id entity.UniversalReplicationID) []string

func InitMigrationQueues added in v0.6.0

func InitMigrationQueues(id entity.UniversalReplicationID) []string

func NewQueueService added in v0.6.0

func NewQueueService(client *asynq.Client, inspector *asynq.Inspector) *queueService

func Reset added in v0.6.0

func Reset(q *QueueServiceMock)

Types

type BucketCreatePayload

type BucketCreatePayload struct {
	Bucket   string
	Location string
	// contains filtered or unexported fields
}

func (*BucketCreatePayload) GetReplicationID added in v0.6.0

func (r *BucketCreatePayload) GetReplicationID() entity.UniversalReplicationID

func (*BucketCreatePayload) SetReplicationID added in v0.6.0

func (r *BucketCreatePayload) SetReplicationID(id entity.UniversalReplicationID)

type BucketDeletePayload

type BucketDeletePayload struct {
	Bucket string
	// contains filtered or unexported fields
}

func (*BucketDeletePayload) GetReplicationID added in v0.6.0

func (r *BucketDeletePayload) GetReplicationID() entity.UniversalReplicationID

func (*BucketDeletePayload) SetReplicationID added in v0.6.0

func (r *BucketDeletePayload) SetReplicationID(id entity.UniversalReplicationID)

type BucketSyncACLPayload

type BucketSyncACLPayload struct {
	Bucket string
	// contains filtered or unexported fields
}

func (*BucketSyncACLPayload) GetReplicationID added in v0.6.0

func (r *BucketSyncACLPayload) GetReplicationID() entity.UniversalReplicationID

func (*BucketSyncACLPayload) SetReplicationID added in v0.6.0

func (r *BucketSyncACLPayload) SetReplicationID(id entity.UniversalReplicationID)

type BucketSyncTagsPayload

type BucketSyncTagsPayload struct {
	Bucket string
	// contains filtered or unexported fields
}

func (*BucketSyncTagsPayload) GetReplicationID added in v0.6.0

func (r *BucketSyncTagsPayload) GetReplicationID() entity.UniversalReplicationID

func (*BucketSyncTagsPayload) SetReplicationID added in v0.6.0

func (r *BucketSyncTagsPayload) SetReplicationID(id entity.UniversalReplicationID)

type ConsistencyCheckListObjectsPayload added in v0.6.0

type ConsistencyCheckListObjectsPayload struct {
	User        string
	Prefix      string
	Locations   []MigrateLocation
	Index       int
	Versioned   bool
	IgnoreEtags bool
	IgnoreSizes bool
}

type ConsistencyCheckListVersionsPayload added in v0.6.0

type ConsistencyCheckListVersionsPayload struct {
	User         string
	Prefix       string
	Locations    []MigrateLocation
	Index        int
	IgonoreEtags bool
	IgnoreSizes  bool
}

type ConsistencyCheckPayload added in v0.5.14

type ConsistencyCheckPayload struct {
	User        string
	Locations   []MigrateLocation
	Versioned   bool
	IgnoreEtags bool
	IgnoreSizes bool
}

type ListObjectVersionsPayload added in v0.6.0

type ListObjectVersionsPayload struct {
	Bucket string
	Prefix string
	// contains filtered or unexported fields
}

func (*ListObjectVersionsPayload) GetReplicationID added in v0.6.0

func (r *ListObjectVersionsPayload) GetReplicationID() entity.UniversalReplicationID

func (*ListObjectVersionsPayload) SetReplicationID added in v0.6.0

func (r *ListObjectVersionsPayload) SetReplicationID(id entity.UniversalReplicationID)

type MigrateBucketListObjectsPayload

type MigrateBucketListObjectsPayload struct {
	Bucket    string
	Prefix    string
	Versioned bool
	// contains filtered or unexported fields
}

func (*MigrateBucketListObjectsPayload) GetReplicationID added in v0.6.0

func (r *MigrateBucketListObjectsPayload) GetReplicationID() entity.UniversalReplicationID

func (*MigrateBucketListObjectsPayload) SetReplicationID added in v0.6.0

func (r *MigrateBucketListObjectsPayload) SetReplicationID(id entity.UniversalReplicationID)

type MigrateLocation added in v0.5.14

type MigrateLocation struct {
	Storage string
	Bucket  string
}

type MigrateObjCopyPayload

type MigrateObjCopyPayload struct {
	Bucket string
	Obj    ObjPayload
	// contains filtered or unexported fields
}

func (*MigrateObjCopyPayload) GetReplicationID added in v0.6.0

func (r *MigrateObjCopyPayload) GetReplicationID() entity.UniversalReplicationID

func (*MigrateObjCopyPayload) SetReplicationID added in v0.6.0

func (r *MigrateObjCopyPayload) SetReplicationID(id entity.UniversalReplicationID)

type MigrateVersionedObjectPayload added in v0.6.0

type MigrateVersionedObjectPayload struct {
	Bucket string
	Prefix string
	// contains filtered or unexported fields
}

func (*MigrateVersionedObjectPayload) GetReplicationID added in v0.6.0

func (r *MigrateVersionedObjectPayload) GetReplicationID() entity.UniversalReplicationID

func (*MigrateVersionedObjectPayload) SetReplicationID added in v0.6.0

func (r *MigrateVersionedObjectPayload) SetReplicationID(id entity.UniversalReplicationID)

type ObjInfo

type ObjInfo struct {
	Name      string
	VersionID string
}

type ObjPayload

type ObjPayload struct {
	Name        string
	VersionID   string
	ETag        string
	ContentType string
	Size        int64
}

type ObjSyncACLPayload

type ObjSyncACLPayload struct {
	Object dom.Object
	// contains filtered or unexported fields
}

func (*ObjSyncACLPayload) GetReplicationID added in v0.6.0

func (r *ObjSyncACLPayload) GetReplicationID() entity.UniversalReplicationID

func (*ObjSyncACLPayload) SetReplicationID added in v0.6.0

func (r *ObjSyncACLPayload) SetReplicationID(id entity.UniversalReplicationID)

type ObjSyncTagsPayload

type ObjSyncTagsPayload struct {
	Object dom.Object
	// contains filtered or unexported fields
}

func (*ObjSyncTagsPayload) GetReplicationID added in v0.6.0

func (r *ObjSyncTagsPayload) GetReplicationID() entity.UniversalReplicationID

func (*ObjSyncTagsPayload) SetReplicationID added in v0.6.0

func (r *ObjSyncTagsPayload) SetReplicationID(id entity.UniversalReplicationID)

type ObjectSyncPayload

type ObjectSyncPayload struct {
	Object dom.Object

	//FromVersion int64
	ObjSize int64
	Deleted bool
	// contains filtered or unexported fields
}

func (*ObjectSyncPayload) GetReplicationID added in v0.6.0

func (r *ObjectSyncPayload) GetReplicationID() entity.UniversalReplicationID

func (*ObjectSyncPayload) SetReplicationID added in v0.6.0

func (r *ObjectSyncPayload) SetReplicationID(id entity.UniversalReplicationID)

type Queue added in v0.6.0

type Queue string
const (
	QueueAPI                      Queue = "api"
	QueueMigrateListObjectsPrefix Queue = "migr_list_obj"
	QueueConsistencyCheck         Queue = "consistency_check"
	QueueMigrateCopyObjectPrefix  Queue = "migr_copy_obj"
	QueueEventsPrefix             Queue = "event"
)

type QueueService added in v0.6.0

type QueueService interface {
	UnprocessedCount(ctx context.Context, ignoreNotFound bool, queues ...string) (int, error)
	IsPaused(ctx context.Context, queueName string) (bool, error)
	Resume(ctx context.Context, queueName string) error
	Pause(ctx context.Context, queueName string) error
	Delete(ctx context.Context, queueName string, force bool) error
	Stats(ctx context.Context, queueName string) (*QueueStats, error)
	EnqueueTask(ctx context.Context, task any) error
}

type QueueServiceMock added in v0.6.0

type QueueServiceMock struct {
	Queues map[string]int
	Paused map[string]bool
}

func (*QueueServiceMock) Delete added in v0.6.0

func (q *QueueServiceMock) Delete(ctx context.Context, queueName string, force bool) error

func (*QueueServiceMock) EnqueueTask added in v0.6.0

func (q *QueueServiceMock) EnqueueTask(ctx context.Context, task any) error

func (*QueueServiceMock) EventReplicationDone added in v0.6.0

func (q *QueueServiceMock) EventReplicationDone(id entity.UniversalReplicationID)

EventReplicationDone test helper to make event replication queue empty

func (*QueueServiceMock) EventReplicationInProgress added in v0.6.0

func (q *QueueServiceMock) EventReplicationInProgress(id entity.UniversalReplicationID)

EventReplicationInProgress test helper to make event replication queue non-empty

func (*QueueServiceMock) EventReplicationLag added in v0.6.0

func (q *QueueServiceMock) EventReplicationLag(id entity.UniversalReplicationID, lag int)

func (*QueueServiceMock) InitReplicationDone added in v0.6.0

func (q *QueueServiceMock) InitReplicationDone(id entity.UniversalReplicationID)

InitReplicationDone test helper to make init replication queues empty

func (*QueueServiceMock) InitReplicationInProgress added in v0.6.0

func (q *QueueServiceMock) InitReplicationInProgress(id entity.UniversalReplicationID)

InitReplicationInProgress test helper to initialize queues for replication in progress

func (*QueueServiceMock) IsPaused added in v0.6.0

func (q *QueueServiceMock) IsPaused(ctx context.Context, queueName string) (bool, error)

func (*QueueServiceMock) Pause added in v0.6.0

func (q *QueueServiceMock) Pause(ctx context.Context, queueName string) error

func (*QueueServiceMock) Resume added in v0.6.0

func (q *QueueServiceMock) Resume(ctx context.Context, queueName string) error

func (*QueueServiceMock) Stats added in v0.6.0

func (q *QueueServiceMock) Stats(ctx context.Context, queueName string) (*QueueStats, error)

func (*QueueServiceMock) UnprocessedCount added in v0.6.0

func (q *QueueServiceMock) UnprocessedCount(ctx context.Context, ignoreNotfound bool, queueName ...string) (int, error)

type QueueStats added in v0.6.0

type QueueStats struct {
	// Number of tasks to be processed in the queue.
	// Includes includes in_progress, not_started, and retied tasks.
	// In other words, all tasks except failed and processed tasks.
	Unprocessed int
	// Total number of tasks processed.
	ProcessedTotal int

	// Paused indicates whether the queue is paused.
	// If true, tasks in the queue will not be processed.
	Paused bool

	// Total number of bytes that the queue and its tasks require to be stored in redis.
	// It is an approximate memory usage value in bytes since the value is computed by sampling.
	MemoryUsage int64

	// Latency of the queue, measured by the oldest pending task in the queue.
	Latency time.Duration
}

type ReplicationTask added in v0.6.0

type ReplicationTask interface {
	SetReplicationID(id entity.UniversalReplicationID)
	GetReplicationID() entity.UniversalReplicationID
}

type SwitchWithDowntimePayload added in v0.5.14

type SwitchWithDowntimePayload struct {
	ID entity.UniversalReplicationID
}

type ZeroDowntimeReplicationSwitchPayload added in v0.5.14

type ZeroDowntimeReplicationSwitchPayload struct {
	ID entity.UniversalReplicationID
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL