Documentation
¶
Overview ¶
Package callbackstream coordinates the app-initiated actor callback stream. The Dapr gRPC API handler registers each incoming stream with the Manager; the actor transport sends callback requests through the Manager and awaits correlated responses.
The design mirrors the pubsub SubscribeTopicEventsAlpha1 stream: the app is the gRPC client, disconnect fails in-flight work immediately, and multiple concurrent connections from the same app are permitted.
Multi-connection routing (rolling-restart semantic): Send always picks the most recently registered connection. Older connections do not receive any new Send traffic once a newer one registers, but their in-flight requests stay routed to them — Deliver looks up the response by request id on the connection that owns it, so the work an older stream had already accepted completes naturally. This matches the rolling-restart scenario where pod B opens its stream before pod A finishes draining: A wraps up its in-flight invocations while every new callback goes to B. See TestMultipleConns_OlderConnDrainsInFlight in callbackstream_test.go for the regression guard.
Connection-set state (the active stream list and the latest registration config) is owned by a single goroutine that drains a kit/events/loop queue. Every state mutation goes through Handle, so there are no locks on the Manager itself. Per-connection request correlation uses sync.Map (lock-free) because Send and Deliver may interleave at high frequency and don't need full serialization.
Index ¶
- Variables
- type Connection
- type Manager
- func (m *Manager) Close(conn *Connection, cause error)
- func (m *Manager) CurrentConfig() *config.ApplicationConfig
- func (m *Manager) Handle(_ context.Context, ev event) error
- func (m *Manager) HasConnection() bool
- func (m *Manager) Register(ctx context.Context, cfg *config.ApplicationConfig) *Connection
- func (m *Manager) Run(ctx context.Context) error
- func (m *Manager) Send(ctx context.Context, ...) (*runtimev1pb.SubscribeActorEventsRequestAlpha1, error)
Constants ¶
This section is empty.
Variables ¶
var ErrDisconnected = errors.New("actor host disconnected before responding")
ErrDisconnected is returned when an in-flight callback fails because the app closed the stream.
var ErrNoConnection = errors.New("actor host is not connected")
ErrNoConnection is returned when the actor transport tries to send a callback but no app has opened the stream.
Functions ¶
This section is empty.
Types ¶
type Connection ¶
type Connection struct {
ID uint64
Config *config.ApplicationConfig
// Outbox is drained by the gRPC handler's send loop. It is intentionally
// never closed (concurrent senders would panic on send-on-closed).
// Consumers should exit on conn.Done() instead of ranging on this chan.
Outbox chan *runtimev1pb.SubscribeActorEventsResponseAlpha1
// contains filtered or unexported fields
}
Connection represents a single live actor host stream. It exposes a send channel for outbound requests and a pending map for correlating responses. The owning handler is responsible for draining Outbox and routing incoming responses back through Deliver.
func (*Connection) Deliver ¶
func (c *Connection) Deliver(msg *runtimev1pb.SubscribeActorEventsRequestAlpha1)
Deliver routes an app → daprd message to the pending caller that owns the correlated request id. Called by the gRPC handler's recv loop for every inbound message. The initial_request variant is consumed by the handler itself and never reaches Deliver.
func (*Connection) Done ¶
func (c *Connection) Done() <-chan struct{}
Done returns a channel that is closed when the connection is terminated. Handlers that drain Outbox should select on this channel to exit.
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
Manager holds the set of active actor callback streams. Concurrent callers register/close streams and pick connections without locks: state mutations are serialized through an event loop, and the hot read path (Send / CurrentConfig / HasConnection) reads an atomic.Pointer snapshot maintained by that loop.
func NewManager ¶
func NewManager() *Manager
NewManager returns a new Manager with no active connections. The caller must invoke Run to drive the event loop.
func (*Manager) Close ¶
func (m *Manager) Close(conn *Connection, cause error)
Close terminates a connection: cancels its context, fails every pending request, and removes it from the manager. Safe to call multiple times.
Close cancels the connection synchronously so concurrent Send callers observe the cancellation immediately. The loop only handles the bookkeeping (conns slice fixup, snapshot recompute).
func (*Manager) CurrentConfig ¶
func (m *Manager) CurrentConfig() *config.ApplicationConfig
CurrentConfig returns the most recently registered ApplicationConfig, or nil if no stream has registered yet. It never blocks — a pure snapshot of state. Callers that need to react to stream arrival should drive the lifecycle imperatively from the API handler (see pkg/api/grpc/actorcallbacks.go), mirroring how pkg/api/grpc/subscribe.go reacts to SubscribeTopicEventsAlpha1.
func (*Manager) Handle ¶
Handle is the loop's serial event handler. It is the only goroutine that touches m.conns, so no locks are needed there. The atomic snapshot (m.latest) is updated here too — that's what every hot-path read sees.
func (*Manager) HasConnection ¶
HasConnection reports whether at least one app is currently streaming.
func (*Manager) Register ¶
func (m *Manager) Register(ctx context.Context, cfg *config.ApplicationConfig) *Connection
Register installs a new connection. The caller is responsible for draining Outbox and calling Deliver for each inbound response. When the stream ends, the caller must invoke Close (or let the context cancel).
Register blocks until the loop has published the new connection on the snapshot read path. This guarantees that callers which immediately invoke Send observe the connection — a property the test harness and the production API handler both rely on.
func (*Manager) Run ¶
Run drives the Manager's event loop until ctx is cancelled. Mirrors the hotreload reconciler / scheduler stream-loop wiring: the loop runs in one goroutine, a sibling watcher closes it on ctx cancel.
func (*Manager) Send ¶
func (m *Manager) Send( ctx context.Context, build func(id string) *runtimev1pb.SubscribeActorEventsResponseAlpha1, ) (*runtimev1pb.SubscribeActorEventsRequestAlpha1, error)
Send issues a callback request over an available connection and blocks until the correlated response arrives, ctx is cancelled, or the connection closes. The caller supplies a builder that stamps the request id onto the concrete oneof payload — this keeps id generation under the Connection's control while leaving payload shape to callers.