conversation

package
v3.115.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 8, 2025 License: Apache-2.0 Imports: 4 Imported by: 0

Documentation

Overview

Package conversation contains coordination session internal code that helps implement a typical conversation-like session protocol based on a bidirectional gRPC stream.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Controller

type Controller struct {
	// contains filtered or unexported fields
}

Controller provides a simple mechanism to work with a session protocol using a gRPC bidirectional stream. Creating a bidirectional stream client may be quite tricky because messages are usually being processed independently and in parallel. Moreover, the gRPC client library puts strict limitations on an implementation of the client, e.g. multiple calls of the Send or Recv methods of the stub client must not be performed from different goroutines. Also, there are no guarantees that a message successfully dispatched by the Send method will actually reach the server, neither does the server enjoy same guarantees when delivering messages to the client. This usually ends up having two goroutines (one for sending outgoing messages and another one for receiving incoming ones) and a queue where messages are published to be eventually delivered to the server. The Controller simplifies working with this model providing generic implementation of the message queue and related routines, handling retries of sent and pending operations when the underlying gRPC stream needs to be reconnected.

A typical coordination session looks like this (we are going to skip for now how the gRPC stream is created, handled and kept alive, you can find the details on that in the Session, and focus on the protocol):

  1. The client opens a new gRPC bidirectional stream.
  2. The client sends the SessionStart request and wait until the Failure or the SessionStarted reply.
  3. The server sends the SessionStarted response with the SessionID. At this point the session is started. If the client needs to reconnect the gRPC stream in the future, it will use that SessionID to attach to the previously created session in the SessionStart request.
  4. The client sends the AcquireSemaphore request to acquire a permit to the semaphore in this session with count 5.
  5. After a moment, the client decides to acquire another semaphore, it sends one more AcquireSemaphore request with count 4.
  6. The server replies with the AcquireSemaphoreResult response to the second AcquireSemaphore request to inform the client that the semaphore was successfully acquired.
  7. The server replies with the AcquireSemaphorePending response in order to inform the client that the semaphore from the first request has been acquired by another session.
  8. After a while, the server sends the AcquireSemaphoreResult response which implies that the semaphore from the first request is acquired in the current session.
  9. Then the client sends the ReleaseSemaphore request in order to release the acquired semaphore.
  10. The server replies with the ReleaseSemaphoreResult.
  11. The client terminates the session with the SessionStop request.
  12. The server let the client know that the session is over sending the SessionStopped response and closing the gRPC stream.

We can notice five independent conversations here:

1. StartSession, SessionStarted — points 2–3; 2. AcquireSemaphore, AcquireSemaphoreResult — points 4, 6; 3. AcquireSemaphore, AcquireSemaphorePending, AcquireSemaphoreResult — points 5, 7 and 8; 4. ReleaseSemaphore, ReleaseSemaphoreResult — points 9–10; 5. SessionStop, SessionStopped — points 11–12.

If at any time the client encounters an unrecoverable error (for example, the underlying gRPC stream becomes disconnected), the client will have to replay every conversation from their very beginning. Let us see why it is actually the case. But before we go into that, let us look at the grpc.ClientStream SendMsg method:

"…SendMsg does not wait until the message is received by the server. An untimely stream closure may result in lost messages. To ensure delivery, users should ensure the RPC completed successfully using RecvMsg…"

This is true for both, the client and the server. So when the server replies to the client it does not really know if the response is received by the client. And vice versa, when the client sends a request to the server it has no way to know if the request was delivered to the server unless the server sends another message to the client in reply.

That is why conversation-like protocols typically use idempotent requests. Idempotent requests can be safely retried as long as you keep the original order of the conversations. For example, if the gRPC stream is terminated before the point 6, we cannot know if the server gets the requests. There may be one, two or none AcquireSemaphore requests successfully delivered to and handled by the server. Moreover, the server may have already sent to the client the corresponding responses. Nevertheless, if the requests are idempotent, we can safely retry them all in the newly created gRPC stream and get the same results as we would have got if we had sent them without stream termination. Note that if the stream is terminated before the point 8, we still need to replay the first AcquireSemaphore conversation because we have no knowledge if the server replied with the AcquireSemaphoreResult in the terminated stream.

However, sometimes even idempotent requests cannot be safely retried. Consider the case wherein the point 5 from the original list is:

  1. After a moment, the client decides to modify the AcquireSemaphore request and sends another one with the same semaphore but with count 4.

If then the gRPC stream terminates, there are two likely outcomes:

  1. The server received the first request but the second one was not delivered. The current semaphore count is 5.
  2. The server received and processed the both requests. The current semaphore permit count is 4.

If we retry the both requests, the observed result will be different depending on which outcome occurs:

  1. The first retry will be a noop, the second one will decrease the semaphore count to 4. This is expected behavior.
  2. The first retry will try to increase the semaphore count to 5, it causes an error. This is unexpected.

In order to avoid that we could postpone a conversation if there is another one for the same semaphore which has been sent but has not been yet delivered to the server. For more details, see the WithConflictKey option.

func NewController

func NewController() *Controller

NewController creates a new conversation controller. You usually have one controller per one session.

func (*Controller) Await

func (c *Controller) Await(
	ctx context.Context,
	conversation *Conversation,
) (*Ydb_Coordination.SessionResponse, error)

Await waits until the conversation ends. ctx can be used to cancel the call.

func (*Controller) Close

func (c *Controller) Close(byeConversation *Conversation)

Close fails all conversations if there are any in the queue. It also does not allow pushing more conversations to the queue anymore. You may optionally specify the final conversation if needed.

func (*Controller) OnAttach

func (c *Controller) OnAttach()

OnAttach retries all idempotent conversations if there are any in the queue. You should call this method when the underlying gRPC stream of the session is connected.

func (*Controller) OnDetach

func (c *Controller) OnDetach()

OnDetach fails all non-idempotent conversations if there are any in the queue. You should call this method when the underlying gRPC stream of the session is closed.

func (*Controller) OnRecv

OnRecv consumes a new conversation response and process with the corresponding conversation if any exists for it. The returned value indicates if any conversation considers the incoming message part of it or the controller is closed. You should call this method in the goroutine that handles gRPC stream Recv method.

func (*Controller) OnSend

OnSend blocks until a new conversation request becomes available at the end of the queue. You should call this method in the goroutine that handles gRPC stream Send method. ctx can be used to cancel the call.

func (*Controller) PushBack

func (c *Controller) PushBack(conversation *Conversation) error

PushBack puts a new conversation at the end of the queue.

func (*Controller) PushFront

func (c *Controller) PushFront(conversation *Conversation) error

PushFront puts a new conversation at the beginning of the queue.

type Conversation

type Conversation struct {
	// contains filtered or unexported fields
}

Conversation is a core concept of the conversation package. It is an ordered sequence of connected request/reply messages. For example, the acquiring semaphore conversation may look like this:

1. The client sends the AcquireSemaphore request. 2. The server replies with the AcquireSemaphorePending response. 3. After a while, the server replies with the AcquireSemaphoreResult response. The conversation is ended.

There may be many different conversations carried out simultaneously in one session, so the exact order of all the messages in the session is unspecified. In the example above, there may be other messages (from different conversations) between points 1 and 2, or 2 and 3.

func NewConversation

func NewConversation(request func() *Ydb_Coordination.SessionRequest, opts ...Option) *Conversation

NewConversation creates a new conversation that starts with a specified message.

type Option

type Option func(c *Conversation)

Option configures how we create a new conversation.

func WithAcknowledgeFilter

func WithAcknowledgeFilter(filter ResponseFilter) Option

WithAcknowledgeFilter returns an Option that specifies the filter function that is used to detect an intermediate response message in the conversation. If such a message was found, the conversation continues, but it lets the client know that the server successfully consumed the first request of the conversation.

func WithCancelMessage

func WithCancelMessage(
	message func(req *Ydb_Coordination.SessionRequest) *Ydb_Coordination.SessionRequest,
	filter ResponseFilter,
) Option

WithCancelMessage returns an Option that specifies the message and filter functions that are used to cancel the conversation which has been already sent. This message is sent in the background when the caller cancels the context of the Controller.Await function. The response is never received by the caller and is only used to end the conversation and remove it from the queue.

func WithConflictKey

func WithConflictKey(key string) Option

WithConflictKey returns an Option that specifies the key that is used to find out messages that cannot be delivered to the server until the server acknowledged the request. If there is a conversation with the same conflict key in the queue that has not been yet delivered to the server, the controller will temporarily suspend other conversations with the same conflict key until the first one is acknowledged.

func WithIdempotence

func WithIdempotence(idempotent bool) Option

WithIdempotence returns an Option that enabled retries for this conversation when the underlying gRPC stream reconnects. The controller will replay the whole conversation from scratch unless it is not ended.

func WithResponseFilter

func WithResponseFilter(filter ResponseFilter) Option

WithResponseFilter returns an Option that specifies the filter function that is used to detect the last response message in the conversation. If such a message was found, the conversation is immediately ended and the response becomes available by the Conversation.Await method.

type ResponseFilter

type ResponseFilter func(request *Ydb_Coordination.SessionRequest, response *Ydb_Coordination.SessionResponse) bool

ResponseFilter defines the filter function called by the controller to know if a received message relates to the conversation. If a ResponseFilter returns true, the message is considered to be part of the conversation.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL