Documentation
¶
Overview ¶
Package dsync Provides distributed synchronization support of microservices and provide common usage patterns around distributed lock, such as lock-based service leader election.
Index ¶
Constants ¶
This section is empty.
Variables ¶
var ( ErrSyncManagerStopped = fmt.Errorf("sync manager stopped") )
var Module = &bootstrap.Module{ Name: "distributed", Precedence: bootstrap.DistributedLockPrecedence, Options: []fx.Option{ appconfig.FxEmbeddedDefaults(defaultConfigFS), fx.Provide(provideSyncManager), fx.Invoke(initialize), }, }
Functions ¶
Types ¶
type ConsulLock ¶
type ConsulLock struct {
// contains filtered or unexported fields
}
ConsulLock implements Lock interface using consul lock described at https://www.consul.io/docs/guides/leader-election.html The implementation is modified api.Lock. The major difference are: - Session is created/maintained outside. There is no session creation when attempt to lock - "lock or wait" vs "try lock and return" is not pre-determined via options.
func (*ConsulLock) Key ¶
func (l *ConsulLock) Key() string
func (*ConsulLock) Lock ¶
func (l *ConsulLock) Lock(ctx context.Context) error
Lock attempts to acquire the lock and blocks while doing so. Providing a cancellable context.Context can be used to abort the lock attempt.
Returns a channel that is closed if our lock is lost or an error. This channel could be closed at any time due to session invalidation, communication errors, operator intervention, etc. It is NOT safe to assume that the lock is held until Unlock() unless the Session is specifically created without any associated health checks. By default Consul sessions prefer liveness over safety and an application must be able to handle the lock being lost.
func (*ConsulLock) Lost ¶
func (l *ConsulLock) Lost() <-chan struct{}
func (*ConsulLock) Release ¶
func (l *ConsulLock) Release() error
type ConsulLockOption ¶
type ConsulLockOption struct { Context context.Context SessionFunc func(context.Context) (string, error) Key string // Must be set and have write permissions Valuer LockValuer // cannot be nil, valuer to associate with the lock. Default to static json marshaller QueryWaitTime time.Duration // how long we block per GET to check if lock acquisition is possible RetryDelay time.Duration // how long we wait after a retryable error (usually network error) }
type ConsulLockOptions ¶
type ConsulLockOptions func(opt *ConsulLockOption)
type ConsulSessionOption ¶
type ConsulSessionOptions ¶
type ConsulSessionOptions func(opt *ConsulSessionOption)
type ConsulSyncManager ¶
type ConsulSyncManager struct {
// contains filtered or unexported fields
}
ConsulSyncManager implements SyncManager leveraging consul's session feature See https://learn.hashicorp.com/tutorials/consul/application-leader-elections?in=consul/developer-configuration
https://learn.hashicorp.com/tutorials/consul/distributed-semaphore https://www.consul.io/docs/dynamic-app-config/sessions
func NewConsulLockManager ¶
func NewConsulLockManager(ctx *bootstrap.ApplicationContext, conn *consul.Connection, opts ...ConsulSessionOptions) (ret *ConsulSyncManager)
func (*ConsulSyncManager) Lock ¶
func (m *ConsulSyncManager) Lock(key string, opts ...LockOptions) (Lock, error)
type Lock ¶
type Lock interface { // Key the unique identifier of the lock Key() string // Lock blocks until lock is acquired or context is cancelled/timed out. // Invoking Lock after lock is acquired (or re-acquired after some error) returns immediately Lock(ctx context.Context) error // TryLock differs from Lock in following ways: // - TryLock stop loop blocking when lock is held by other instance/session // - TryLock stop loop blocking when unrecoverable error happens during lock acquisition // Note: TryLock may temporarily block when connectivity to external infra service is not available TryLock(ctx context.Context) error // Release releases the lock. Stop the process from maintaining the active lock. // Release must be used after Lock or TryLock is used. Invoking Release multiple time takes no effect // Note: Lost channel would stopLoop signalling after Release, until Lock or TryLock is called again Release() error // Lost channel signals long-running goroutine when lock is lost (due to network error or operator intervention) // When Lost channel is signalled, there is no need to re-invoke Lock.Lock or Lock.TryLock for lock re-acquisition, // but all relying-tasks should stopLoop. Lost() <-chan struct{} }
Lock distributed mutex lock backed by external infrastructure service such as consul or redis. After the lock is acquired (Lock.Lock or Lock.TryLock returns without error), the lock might be revoked by operator or external infra service. The Lock would keep trying to acquire/re-acquire the lock until Lock.Release is manually invoked.
Long-running goroutine should monitor Lost channel after the lock is acquired. When Lost channel is signalled, there is no need to re-invoke Lock.Lock or Lock.TryLock, since internal loop would try to re-acquire lock. However, any existing tasks relying on this lock should be stopped because there is no guarantee that the lock will be re-acquired
func LeadershipLock ¶
func LeadershipLock() Lock
LeadershipLock returns globally maintained lock for leadership election To check leadership, use Lock.TryLock and check error. Example:
if e := LeadershipLock().TryLock(ctx); e == nil { // do what a leader should do }
This function panic if it's call too soon during startup Note: Lock.Lost() channel should be monitored for long-running goroutine, since leadership could be revoked any time by operators
func LockWithKey ¶
func LockWithKey(key string, opts ...LockOptions) Lock
LockWithKey returns a distributed Lock with given key this function panic if internal SyncManager is not initialized yet or key is not provided
type LockOption ¶
type LockOption struct {
Valuer LockValuer
}
type LockOptions ¶
type LockOptions func(opt *LockOption)
type LockValuer ¶
type LockValuer func() []byte
LockValuer is used to annotate the lock in external infra service. It's treated literally and serves as lock's metadata
func NewJsonLockValuer ¶
func NewJsonLockValuer(v interface{}) LockValuer
NewJsonLockValuer is the default implementation of LockValuer.
type SyncManager ¶
type SyncManager interface { // Lock returns a distributed lock for given key. // For same key, the same Lock is returned. The returned Lock is goroutines-safe // Note: the returned Lock is in idle mode Lock(key string, opts ...LockOptions) (Lock, error) }