coordination

package
v3.113.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 31, 2025 License: Apache-2.0 Imports: 8 Imported by: 1

Documentation

Overview

Example (CreateDropNode)
ctx := context.TODO()
db, err := ydb.Open(ctx, "grpc://localhost:2136/local")
if err != nil {
	fmt.Printf("failed to connect: %v", err)

	return
}
defer db.Close(ctx) // cleanup resources
// create node
err = db.Coordination().CreateNode(ctx, "/local/test", coordination.NodeConfig{
	Path:                     "",
	SelfCheckPeriodMillis:    1000,
	SessionGracePeriodMillis: 1000,
	ReadConsistencyMode:      coordination.ConsistencyModeRelaxed,
	AttachConsistencyMode:    coordination.ConsistencyModeRelaxed,
	RatelimiterCountersMode:  coordination.RatelimiterCountersModeDetailed,
})
if err != nil {
	fmt.Printf("failed to create node: %v", err)

	return
}
defer db.Coordination().DropNode(ctx, "/local/test")
e, c, err := db.Coordination().DescribeNode(ctx, "/local/test")
if err != nil {
	fmt.Printf("failed to describe node: %v", err)

	return
}
fmt.Printf("node description: %+v\nnode config: %+v\n", e, c)
Example (Semaphore)
ctx := context.TODO()
db, err := ydb.Open(ctx, "grpc://localhost:2136/local")
if err != nil {
	fmt.Printf("failed to connect: %v", err)

	return
}
defer db.Close(ctx) // cleanup resources
// create node
err = db.Coordination().CreateNode(ctx, "/local/test", coordination.NodeConfig{
	Path:                     "",
	SelfCheckPeriodMillis:    1000,
	SessionGracePeriodMillis: 1000,
	ReadConsistencyMode:      coordination.ConsistencyModeStrict,
	AttachConsistencyMode:    coordination.ConsistencyModeStrict,
	RatelimiterCountersMode:  coordination.RatelimiterCountersModeDetailed,
})
if err != nil {
	fmt.Printf("failed to create node: %v", err)

	return
}
defer func() {
	dropNodeErr := db.Coordination().DropNode(ctx, "/local/test")
	if dropNodeErr != nil {
		fmt.Printf("failed to drop node: %v\n", dropNodeErr)
	}
}()

e, c, err := db.Coordination().DescribeNode(ctx, "/local/test")
if err != nil {
	fmt.Printf("failed to describe node: %v\n", err)

	return
}
fmt.Printf("node description: %+v\nnode config: %+v\n", e, c)

s, err := db.Coordination().Session(ctx, "/local/test")
if err != nil {
	fmt.Printf("failed to create session: %v\n", err)

	return
}
defer s.Close(ctx)
fmt.Printf("session 1 created, id: %d\n", s.SessionID())

err = s.CreateSemaphore(ctx, "my-semaphore", 20, options.WithCreateData([]byte{1, 2, 3}))
if err != nil {
	fmt.Printf("failed to create semaphore: %v", err)

	return
}
fmt.Printf("semaphore my-semaphore created\n")

lease, err := s.AcquireSemaphore(ctx, "my-semaphore", 10)
if err != nil {
	fmt.Printf("failed to acquire semaphore: %v", err)

	return
}
defer func() {
	releaseErr := lease.Release()
	if releaseErr != nil {
		fmt.Printf("failed to release lease: %v", releaseErr)
	}
}()

fmt.Printf("session 1 acquired semaphore 10\n")

s.Reconnect()
fmt.Printf("session 1 reconnected\n")

desc, err := s.DescribeSemaphore(
	ctx,
	"my-semaphore",
	options.WithDescribeOwners(true),
	options.WithDescribeWaiters(true),
)
if err != nil {
	fmt.Printf("failed to describe semaphore: %v", err)

	return
}
fmt.Printf("session 1 described semaphore %v\n", desc)

err = lease.Release()
if err != nil {
	fmt.Printf("failed to release semaphore: %v", err)

	return
}
fmt.Printf("session 1 released semaphore my-semaphore\n")

err = s.DeleteSemaphore(ctx, "my-semaphore", options.WithForceDelete(true))
if err != nil {
	fmt.Printf("failed to delete semaphore: %v", err)

	return
}
fmt.Printf("deleted semaphore my-semaphore\n")

Index

Examples

Constants

View Source
const (
	// MaxSemaphoreLimit defines the maximum value of the limit parameter in the Session.CreateSemaphore method.
	MaxSemaphoreLimit = math.MaxUint64

	// Exclusive is just a shortcut for the maximum semaphore limit value. You can use this to acquire a semaphore in
	// the exclusive mode if it was created with the limit value of MaxSemaphoreLimit, which is always true for
	// ephemeral semaphores.
	Exclusive = math.MaxUint64

	// Shared is just a shortcut for the minimum semaphore limit value (1). You can use this to acquire a semaphore in
	// the shared mode if it was created with the limit value of MaxSemaphoreLimit, which is always true for ephemeral
	// semaphores.
	Shared = 1
)

Variables

View Source
var (
	// ErrOperationStatusUnknown indicates that the request has been sent to the server but no reply has been received.
	// The client usually automatically retries calls of that kind, but there are cases when it is not possible:
	// - the request is not idempotent, non-idempotent requests are never retried,
	// - the session was lost and its context is canceled.
	ErrOperationStatusUnknown = errors.New("operation status is unknown")

	// ErrSessionClosed indicates that the Session object is closed.
	ErrSessionClosed = errors.New("session is closed")

	// ErrAcquireTimeout indicates that the Session.AcquireSemaphore method could not acquire the semaphore before the
	// operation timeout (see options.WithAcquireTimeout).
	ErrAcquireTimeout = errors.New("acquire semaphore timeout")
)

Functions

This section is empty.

Types

type Client added in v3.7.0

type Client interface {
	CreateNode(ctx context.Context, path string, config NodeConfig) (err error)
	AlterNode(ctx context.Context, path string, config NodeConfig) (err error)
	DropNode(ctx context.Context, path string) (err error)
	DescribeNode(ctx context.Context, path string) (_ *scheme.Entry, _ *NodeConfig, err error)

	// Session starts a new session. This method blocks until the server session is created. The context provided
	// may be used to cancel the invocation. If the method completes successfully, the session remains alive even if
	// the context is canceled.
	//
	// To ensure resources are not leaked, one of the following actions must be performed:
	//
	// - call Close on the Session,
	// - close the Client which the session was created with,
	// - call any method of the Session until the ErrSessionClosed is returned.
	Session(ctx context.Context, path string, opts ...options.SessionOption) (Session, error)
}

type ConsistencyMode

type ConsistencyMode uint
const (
	ConsistencyModeUnset ConsistencyMode = iota
	ConsistencyModeStrict
	ConsistencyModeRelaxed
)

func (ConsistencyMode) String

func (t ConsistencyMode) String() string

func (ConsistencyMode) To

type Lease added in v3.60.0

type Lease interface {
	// Context returns the context of the lease. It is canceled when the session it was created by was lost or closed,
	// or if the lease was released by calling the Release method.
	Context() context.Context

	// Release releases the acquired lease to the semaphore. It also cancels the context of the lease. This method does
	// not take a ctx argument, but you can cancel the execution of it by closing the session or canceling its context.
	Release() error

	// Session returns the session which this lease was created by.
	Session() Session
}

Lease is the object which defines the rights of the session to the acquired semaphore. Lease is alive until its context is not canceled. This may happen implicitly, when the associated session becomes lost or closed, or explicitly, if someone calls the Release method of the lease.

type NodeConfig added in v3.10.0

type NodeConfig struct {
	Path                     string
	SelfCheckPeriodMillis    uint32
	SessionGracePeriodMillis uint32
	ReadConsistencyMode      ConsistencyMode
	AttachConsistencyMode    ConsistencyMode
	RatelimiterCountersMode  RatelimiterCountersMode
}

type RatelimiterCountersMode added in v3.7.0

type RatelimiterCountersMode uint
const (
	RatelimiterCountersModeUnset RatelimiterCountersMode = iota
	RatelimiterCountersModeAggregated
	RatelimiterCountersModeDetailed
)

func (RatelimiterCountersMode) String added in v3.7.0

func (t RatelimiterCountersMode) String() string

func (RatelimiterCountersMode) To added in v3.7.0

type SemaphoreDescription added in v3.60.0

type SemaphoreDescription struct {
	// Name is the name of the semaphore.
	Name string

	// Limit is the maximum number of tokens that may be acquired.
	Limit uint64

	// Count is the number of tokens currently acquired by its owners.
	Count uint64

	// Ephemeral semaphores are deleted when there are no owners and waiters left.
	Ephemeral bool

	// Data is user-defined data attached to the semaphore.
	Data []byte

	// Owner is the list of current owners of the semaphore.
	Owners []*SemaphoreSession

	// Waiter is the list of current waiters of the semaphore.
	Waiters []*SemaphoreSession
}

SemaphoreDescription describes the state of a semaphore.

func (*SemaphoreDescription) String added in v3.60.0

func (d *SemaphoreDescription) String() string

type SemaphoreSession added in v3.60.0

type SemaphoreSession struct {
	// SessionID is the id of the session which tried to acquire the semaphore.
	SessionID uint64

	// Count is the number of tokens for the acquire operation.
	Count uint64

	// OrderId is a monotonically increasing id which determines locking order.
	OrderID uint64

	// Data is user-defined data attached to the acquire operation.
	Data []byte

	// Timeout is the timeout for the operation in the waiter queue. If this is time.Duration(math.MaxInt64) the session
	// will wait for the semaphore until the operation is canceled.
	Timeout time.Duration
}

SemaphoreSession describes an owner or a waiter of this semaphore.

func (*SemaphoreSession) String added in v3.60.0

func (s *SemaphoreSession) String() string

type Session added in v3.60.0

type Session interface {
	// Close closes the coordination service session. It cancels all active requests on the server and notifies every
	// pending or waiting for response request on the client side. It also cancels the session context and tries to
	// stop the session gracefully on the server. If the ctx is canceled, this will not wait for the server session to
	// become stopped and returns immediately with an error. Once this function returns with no error, all subsequent
	// calls will be noop.
	Close(ctx context.Context) error

	// Context returns the context of the session. It is canceled when the underlying server session is over or if the
	// client could not get any successful response from the server before the session timeout (see
	// options.WithSessionTimeout).
	Context() context.Context

	// CreateSemaphore creates a new semaphore. This method waits until the server successfully creates a new semaphore
	// or returns an error.
	//
	// This method is not idempotent. If the request has been sent to the server but no reply has been received, it
	// returns the ErrOperationStatusUnknown error.
	CreateSemaphore(ctx context.Context, name string, limit uint64, opts ...options.CreateSemaphoreOption) error

	// UpdateSemaphore changes semaphore data. This method waits until the server successfully updates the semaphore or
	// returns an error.
	//
	// This method is not idempotent. The client will automatically retry in the case of network or server failure
	// unless it leaves the client state inconsistent.
	UpdateSemaphore(ctx context.Context, name string, opts ...options.UpdateSemaphoreOption) error

	// DeleteSemaphore deletes an existing semaphore. This method waits until the server successfully deletes the
	// semaphore or returns an error.
	//
	// This method is not idempotent. If the request has been sent to the server but no reply has been received, it
	// returns the ErrOperationStatusUnknown error.
	DeleteSemaphore(ctx context.Context, name string, opts ...options.DeleteSemaphoreOption) error

	// DescribeSemaphore returns the state of the semaphore.
	//
	// This method is idempotent. The client will automatically retry in the case of network or server failure.
	DescribeSemaphore(
		ctx context.Context,
		name string,
		opts ...options.DescribeSemaphoreOption,
	) (*SemaphoreDescription, error)

	// AcquireSemaphore acquires the semaphore. If you acquire an ephemeral semaphore (see options.WithEphemeral), its
	// limit will be set to MaxSemaphoreLimit. Later requests override previous operations with the same semaphore, e.g.
	// to reduce acquired count, change timeout or attached data.
	//
	// This method blocks until the semaphore is acquired, an error is returned from the server or the session is
	// closed. If the operation context was canceled but the server replied that the semaphore was actually acquired,
	// the client will automatically release the semaphore.
	//
	// Semaphore waiting is fair: the semaphore guarantees that other sessions invoking the AcquireSemaphore method
	// acquire permits in the order which they were called (FIFO). If a session invokes the AcquireSemaphore method
	// multiple times while the first invocation is still in process, the position in the queue remains unchanged.
	//
	// This method is idempotent. The client will automatically retry in the case of network or server failure.
	AcquireSemaphore(
		ctx context.Context,
		name string,
		count uint64,
		opts ...options.AcquireSemaphoreOption,
	) (Lease, error)

	// SessionID returns a server-generated identifier of the session. This value is permanent and unique within the
	// coordination service node.
	SessionID() uint64

	// Reconnect forcibly shuts down the underlying gRPC stream and initiates a new one. This method is highly unlikely
	// to be of use in a typical application but is extremely useful for testing an API implementation.
	Reconnect()
}

Session defines a coordination service backed session.

In general, Session API is concurrency-friendly, you can safely access all of its methods concurrently.

The client guarantees that sequential calls of the methods are sent to the server in the same order. However, the session client may reorder and suspend some of the requests without violating correctness of the execution. This also applies to the situations when the underlying gRPC stream has been recreated due to network or server issues.

The client automatically keeps the underlying gRPC stream alive by sending keep-alive (ping-pong) requests. If the client can no longer consider the session alive, it immediately cancels the session context which also leads to cancellation of contexts of all semaphore leases created by this session.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL