dsync

package
v0.13.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 27, 2024 License: Apache-2.0 Imports: 14 Imported by: 0

Documentation

Overview

Package dsync Provides distributed synchronization support of microservices and provide common usage patterns around distributed lock, such as lock-based service leader election.

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrLockUnavailable    = fmt.Errorf("lock is held by another session")
	ErrSessionUnavailable = fmt.Errorf("session is not available")
	ErrSyncManagerStopped = fmt.Errorf("sync manager stopped")
)
View Source
var Module = &bootstrap.Module{
	Name:       "distributed",
	Precedence: bootstrap.DistributedLockPrecedence,
	Options: []fx.Option{
		appconfig.FxEmbeddedDefaults(defaultConfigFS),
		fx.Provide(provideSyncManager),
		fx.Invoke(initialize),
	},
}

Functions

func Use

func Use()

Types

type ConsulLock

type ConsulLock struct {
	// contains filtered or unexported fields
}

ConsulLock implements Lock interface using consul lock described at https://www.consul.io/docs/guides/leader-election.html The implementation is modified api.Lock. The major difference are: - Session is created/maintained outside. There is no session creation when attempt to lock - "lock or wait" vs "try lock and return" is not pre-determined via options.

func (*ConsulLock) Key

func (l *ConsulLock) Key() string

func (*ConsulLock) Lock

func (l *ConsulLock) Lock(ctx context.Context) error

Lock attempts to acquire the lock and blocks while doing so. Providing a cancellable context.Context can be used to abort the lock attempt.

Returns a channel that is closed if our lock is lost or an error. This channel could be closed at any time due to session invalidation, communication errors, operator intervention, etc. It is NOT safe to assume that the lock is held until Unlock() unless the Session is specifically created without any associated health checks. By default Consul sessions prefer liveness over safety and an application must be able to handle the lock being lost.

func (*ConsulLock) Lost

func (l *ConsulLock) Lost() <-chan struct{}

func (*ConsulLock) Release

func (l *ConsulLock) Release() error

func (*ConsulLock) TryLock

func (l *ConsulLock) TryLock(ctx context.Context) error

type ConsulLockOption

type ConsulLockOption struct {
	Context       context.Context
	SessionFunc   func(context.Context) (string, error)
	Key           string        // Must be set and have write permissions
	Valuer        LockValuer    // cannot be nil, valuer to associate with the lock. Default to static json marshaller
	QueryWaitTime time.Duration // how long we block per GET to check if lock acquisition is possible
	RetryDelay    time.Duration // how long we wait after a retryable error (usually network error)
}

type ConsulLockOptions

type ConsulLockOptions func(opt *ConsulLockOption)

type ConsulSessionOption

type ConsulSessionOption struct {
	Name       string
	TTL        time.Duration
	LockDelay  time.Duration
	RetryDelay time.Duration
}

type ConsulSessionOptions

type ConsulSessionOptions func(opt *ConsulSessionOption)

type ConsulSyncManager

type ConsulSyncManager struct {
	// contains filtered or unexported fields
}

ConsulSyncManager implements SyncManager leveraging consul's session feature See https://learn.hashicorp.com/tutorials/consul/application-leader-elections?in=consul/developer-configuration

https://learn.hashicorp.com/tutorials/consul/distributed-semaphore
https://www.consul.io/docs/dynamic-app-config/sessions

func NewConsulLockManager

func NewConsulLockManager(ctx *bootstrap.ApplicationContext, conn *consul.Connection, opts ...ConsulSessionOptions) (ret *ConsulSyncManager)

func (*ConsulSyncManager) Lock

func (m *ConsulSyncManager) Lock(key string, opts ...LockOptions) (Lock, error)

func (*ConsulSyncManager) Start

func (m *ConsulSyncManager) Start(_ context.Context) error

func (*ConsulSyncManager) Stop

func (m *ConsulSyncManager) Stop(ctx context.Context) error

type Lock

type Lock interface {
	// Key the unique identifier of the lock
	Key() string

	// Lock blocks until lock is acquired or context is cancelled/timed out.
	// Invoking Lock after lock is acquired (or re-acquired after some error) returns immediately
	Lock(ctx context.Context) error

	// TryLock differs from Lock in following ways:
	// - TryLock stop loop blocking when lock is held by other instance/session
	// - TryLock stop loop blocking when unrecoverable error happens during lock acquisition
	// Note: TryLock may temporarily block when connectivity to external infra service is not available
	TryLock(ctx context.Context) error

	// Release releases the lock. Stop the process from maintaining the active lock.
	// Release must be used after Lock or TryLock is used. Invoking Release multiple time takes no effect
	// Note: Lost channel would stopLoop signalling after Release, until Lock or TryLock is called again
	Release() error

	// Lost channel signals long-running goroutine when lock is lost (due to network error or operator intervention)
	// When Lost channel is signalled, there is no need to re-invoke Lock.Lock or Lock.TryLock for lock re-acquisition,
	// but all relying-tasks should stopLoop.
	Lost() <-chan struct{}
}

Lock distributed mutex lock backed by external infrastructure service such as consul or redis. After the lock is acquired (Lock.Lock or Lock.TryLock returns without error), the lock might be revoked by operator or external infra service. The Lock would keep trying to acquire/re-acquire the lock until Lock.Release is manually invoked.

Long-running goroutine should monitor Lost channel after the lock is acquired. When Lost channel is signalled, there is no need to re-invoke Lock.Lock or Lock.TryLock, since internal loop would try to re-acquire lock. However, any existing tasks relying on this lock should be stopped because there is no guarantee that the lock will be re-acquired

func LeadershipLock

func LeadershipLock() Lock

LeadershipLock returns globally maintained lock for leadership election To check leadership, use Lock.TryLock and check error. Example:

if e := LeadershipLock().TryLock(ctx); e == nil {
	// do what a leader should do
}

This function panic if it's call too soon during startup Note: Lock.Lost() channel should be monitored for long-running goroutine, since leadership could be revoked any time by operators

func LockWithKey

func LockWithKey(key string, opts ...LockOptions) Lock

LockWithKey returns a distributed Lock with given key this function panic if internal SyncManager is not initialized yet or key is not provided

type LockOption

type LockOption struct {
	Valuer LockValuer
}

type LockOptions

type LockOptions func(opt *LockOption)

type LockValuer

type LockValuer func() []byte

LockValuer is used to annotate the lock in external infra service. It's treated literally and serves as lock's metadata

func NewJsonLockValuer

func NewJsonLockValuer(v interface{}) LockValuer

NewJsonLockValuer is the default implementation of LockValuer.

type SyncManager

type SyncManager interface {
	// Lock returns a distributed lock for given key.
	// For same key, the same Lock is returned. The returned Lock is goroutines-safe
	// Note: the returned Lock is in idle mode
	Lock(key string, opts ...LockOptions) (Lock, error)
}

type SyncManagerLifecycle

type SyncManagerLifecycle interface {
	Start(ctx context.Context) error
	Stop(ctx context.Context) error
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL