callbackstream

package
v1.18.0-rc.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 21, 2026 License: Apache-2.0 Imports: 10 Imported by: 0

Documentation

Overview

Package callbackstream coordinates the app-initiated actor callback stream. The Dapr gRPC API handler registers each incoming stream with the Manager; the actor transport sends callback requests through the Manager and awaits correlated responses.

The design mirrors the pubsub SubscribeTopicEventsAlpha1 stream: the app is the gRPC client, disconnect fails in-flight work immediately, and multiple concurrent connections from the same app are permitted.

Multi-connection routing (rolling-restart semantic): Send always picks the most recently registered connection. Older connections do not receive any new Send traffic once a newer one registers, but their in-flight requests stay routed to them — Deliver looks up the response by request id on the connection that owns it, so the work an older stream had already accepted completes naturally. This matches the rolling-restart scenario where pod B opens its stream before pod A finishes draining: A wraps up its in-flight invocations while every new callback goes to B. See TestMultipleConns_OlderConnDrainsInFlight in callbackstream_test.go for the regression guard.

Connection-set state (the active stream list and the latest registration config) is owned by a single goroutine that drains a kit/events/loop queue. Every state mutation goes through Handle, so there are no locks on the Manager itself. Per-connection request correlation uses sync.Map (lock-free) because Send and Deliver may interleave at high frequency and don't need full serialization.

Index

Constants

This section is empty.

Variables

View Source
var ErrDisconnected = errors.New("actor host disconnected before responding")

ErrDisconnected is returned when an in-flight callback fails because the app closed the stream.

View Source
var ErrNoConnection = errors.New("actor host is not connected")

ErrNoConnection is returned when the actor transport tries to send a callback but no app has opened the stream.

Functions

This section is empty.

Types

type Connection

type Connection struct {
	ID     uint64
	Config *config.ApplicationConfig

	// Outbox is drained by the gRPC handler's send loop. It is intentionally
	// never closed (concurrent senders would panic on send-on-closed).
	// Consumers should exit on conn.Done() instead of ranging on this chan.
	Outbox chan *runtimev1pb.SubscribeActorEventsResponseAlpha1
	// contains filtered or unexported fields
}

Connection represents a single live actor host stream. It exposes a send channel for outbound requests and a pending map for correlating responses. The owning handler is responsible for draining Outbox and routing incoming responses back through Deliver.

func (*Connection) Deliver

Deliver routes an app → daprd message to the pending caller that owns the correlated request id. Called by the gRPC handler's recv loop for every inbound message. The initial_request variant is consumed by the handler itself and never reaches Deliver.

func (*Connection) Done

func (c *Connection) Done() <-chan struct{}

Done returns a channel that is closed when the connection is terminated. Handlers that drain Outbox should select on this channel to exit.

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager holds the set of active actor callback streams. Concurrent callers register/close streams and pick connections without locks: state mutations are serialized through an event loop, and the hot read path (Send / CurrentConfig / HasConnection) reads an atomic.Pointer snapshot maintained by that loop.

func NewManager

func NewManager() *Manager

NewManager returns a new Manager with no active connections. The caller must invoke Run to drive the event loop.

func (*Manager) Close

func (m *Manager) Close(conn *Connection, cause error)

Close terminates a connection: cancels its context, fails every pending request, and removes it from the manager. Safe to call multiple times.

Close cancels the connection synchronously so concurrent Send callers observe the cancellation immediately. The loop only handles the bookkeeping (conns slice fixup, snapshot recompute).

func (*Manager) CurrentConfig

func (m *Manager) CurrentConfig() *config.ApplicationConfig

CurrentConfig returns the most recently registered ApplicationConfig, or nil if no stream has registered yet. It never blocks — a pure snapshot of state. Callers that need to react to stream arrival should drive the lifecycle imperatively from the API handler (see pkg/api/grpc/actorcallbacks.go), mirroring how pkg/api/grpc/subscribe.go reacts to SubscribeTopicEventsAlpha1.

func (*Manager) Handle

func (m *Manager) Handle(_ context.Context, ev event) error

Handle is the loop's serial event handler. It is the only goroutine that touches m.conns, so no locks are needed there. The atomic snapshot (m.latest) is updated here too — that's what every hot-path read sees.

func (*Manager) HasConnection

func (m *Manager) HasConnection() bool

HasConnection reports whether at least one app is currently streaming.

func (*Manager) Register

func (m *Manager) Register(ctx context.Context, cfg *config.ApplicationConfig) *Connection

Register installs a new connection. The caller is responsible for draining Outbox and calling Deliver for each inbound response. When the stream ends, the caller must invoke Close (or let the context cancel).

Register blocks until the loop has published the new connection on the snapshot read path. This guarantees that callers which immediately invoke Send observe the connection — a property the test harness and the production API handler both rely on.

func (*Manager) Run

func (m *Manager) Run(ctx context.Context) error

Run drives the Manager's event loop until ctx is cancelled. Mirrors the hotreload reconciler / scheduler stream-loop wiring: the loop runs in one goroutine, a sibling watcher closes it on ctx cancel.

func (*Manager) Send

Send issues a callback request over an available connection and blocks until the correlated response arrives, ctx is cancelled, or the connection closes. The caller supplies a builder that stamps the request id onto the concrete oneof payload — this keeps id generation under the Connection's control while leaving payload shape to callers.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL