Documentation
¶
Overview ¶
Package actor provides a mailbox-based actor model implementation for building concurrent, message-driven systems.
Actors are the fundamental unit of computation in this package. Each actor:
- Has a unique identity
- Processes messages sequentially from its mailbox
- Can schedule background tasks via [HandlerCtx.Schedule]
- Can be paused, resumed, and stepped for debugging/testing
Creating Actors ¶
The simplest way to create an actor is using typed handlers:
actor := actor.TypedHandlers(
actor.HandleMsg[CreateUserCmd](func(hc actor.HandlerCtx, cmd CreateUserCmd) error {
// Handle the command
return nil
}),
actor.HandleRequest[GetUserQuery, *User](func(hc actor.HandlerCtx, q GetUserQuery) (*User, error) {
// Handle request and return response
return &User{ID: q.ID}, nil
}),
actor.HandleEvery(time.Minute, func(hc actor.HandlerCtx) error {
// Periodic task executed every minute
return nil
}),
).ToActor(actor.Options{})
Message Handling ¶
Messages are dispatched by type name to registered handlers:
- HandleMsg registers a one-way message handler (fire-and-forget)
- HandleRequest registers a request-response handler
- HandleEvery registers a periodic task that runs at fixed intervals
- DefaultHandler registers a fallback for unmatched message types
- Init registers initialization logic run when the actor starts
Sending Messages ¶
Use Request for request-response patterns:
user, err := actor.Request[GetUserQuery, *User](ctx, myActor, GetUserQuery{ID: "123"})
Use Publish for fire-and-forget messages:
err := actor.Publish[CreateUserCmd](ctx, myActor, CreateUserCmd{Name: "Alice"})
Background Tasks ¶
Handlers can schedule background work via [HandlerCtx.Schedule]:
actor.HandleMsg[ProcessDataCmd](func(hc actor.HandlerCtx, cmd ProcessDataCmd) error {
hc.Schedule(func() {
// This runs asynchronously, outside the actor's mailbox processing
processInBackground(cmd.Data)
})
return nil
})
The actor waits for all scheduled tasks to complete during shutdown.
Self-Request Detection ¶
The actor automatically detects when a handler attempts to send a request back to itself (which would deadlock) and returns ErrSelfRequest.
Lifecycle Control ¶
Actors support pause/resume for debugging and testing:
actor.Pause() // Stop processing messages actor.Step() // Process exactly one message actor.Resume() // Continue normal processing <-actor.Done() // Wait for actor shutdown
Index ¶
- Variables
- func Publish[IN any](ctx context.Context, r requester, i IN) error
- func RawRequest(ctx context.Context, r requester, msgType string, data []byte) (any, error)
- func Read[T any, R any](s *State[T], op func(*T) R) R
- func ReadAsync[T any, R any](s *State[T], op func(*T) R) <-chan R
- func Request[IN any, OUT any](ctx context.Context, r requester, i IN) (*OUT, error)
- type Actor
- type ActorMetrics
- type BaseActor
- func (a *BaseActor) Done() <-chan struct{}
- func (a *BaseActor) EnableStepMode() error
- func (a *BaseActor) Pause() error
- func (a *BaseActor) Resume() error
- func (a *BaseActor) Send(ctx context.Context, e Envelope) error
- func (a *BaseActor) Step() error
- func (a *BaseActor) Stop()
- func (a *BaseActor) TrySend(cmd Envelope) bool
- type Envelope
- type HandleOption
- type HandleOpts
- type HandlerCtx
- type HandlerInitFunc
- type HandlerRegistrar
- type HandlerRegistration
- func DefaultHandler(h func(HandlerCtx, any) (any, error)) HandlerRegistration
- func HandleEvery(interval time.Duration, msgHandler func(h HandlerCtx) error) HandlerRegistration
- func HandleMsg[IN any](msgHandler func(h HandlerCtx, i IN) error) HandlerRegistration
- func HandleMsgWithOpts[IN any](msgHandler func(h HandlerCtx, i IN) error, opts ...HandleOption) HandlerRegistration
- func HandleRequest[IN any, OUT any](h func(h HandlerCtx, i IN) (*OUT, error)) HandlerRegistration
- func HandleRequestWithOpts[IN any, OUT any](h func(h HandlerCtx, i IN) (*OUT, error), opts ...HandleOption) HandlerRegistration
- func Init(initFunc HandlerInitFunc) HandlerRegistration
- type MsgHandlerFunc
- type OnPanic
- type Options
- type RawHandler
- type Reply
- type Scheduler
- type State
- type StateOp
- type StateReadOp
- type StateTask
- type TypedHandlerRegistry
- func (t *TypedHandlerRegistry) HandleMessage(hc HandlerCtx, mt string, data []byte) (any, error)
- func (t *TypedHandlerRegistry) InitHandler(hc HandlerCtx) error
- func (t *TypedHandlerRegistry) Register(msgType string, typeFactory func() any, msgHandler MsgHandlerFunc, ...)
- func (t *TypedHandlerRegistry) ToActor(opts Options) Actor
Constants ¶
This section is empty.
Variables ¶
var ErrSelfRequest = errors.New("self-request would deadlock: actor cannot send request to itself")
ErrSelfRequest is returned when an actor attempts to send a request to itself, which would cause a deadlock.
Functions ¶
func Publish ¶
Publish sends a fire-and-forget message to an actor. Unlike Request, Publish does not expect a return value from the handler.
func RawRequest ¶
RawRequest sends a pre-serialized message to an actor and waits for the response. Use Request for type-safe messaging.
Types ¶
type Actor ¶
type Actor interface {
// Send enqueues a message for processing. Blocks until the message
// is enqueued, the context is canceled, or the actor is stopped.
Send(ctx context.Context, msg Envelope) error
// Pause stops message processing until Resume or Step is called.
Pause() error
// Resume continues normal message processing after a Pause.
Resume() error
// Step processes exactly one message when paused.
Step() error
// Done returns a channel that is closed when the actor stops.
Done() <-chan struct{}
}
Actor is the interface for interacting with an actor instance.
func New ¶
func New(opt Options, handler RawHandler) Actor
New creates and starts a new actor with the given options and handler. The actor begins processing messages immediately in a background goroutine.
type ActorMetrics ¶
type ActorMetrics interface {
// Message handling
MessageDuration(msgType string) metrics.Timer
MessageProcessed(msgType string, success bool)
MessagePanic(msgType string)
// Mailbox
MailboxDepth(actorID string, depth int)
// Scheduler
SchedulerInflight(actorID string, count int)
SchedulerTaskDuration() metrics.Timer
SchedulerTaskCompleted(success bool)
}
ActorMetrics defines the metrics interface for the Actor pillar. All methods are thread-safe.
func NopActorMetrics ¶
func NopActorMetrics() ActorMetrics
NopActorMetrics returns a no-op ActorMetrics implementation.
type BaseActor ¶
type BaseActor struct {
// contains filtered or unexported fields
}
BaseActor is the core actor implementation that processes messages sequentially from its mailbox. Create via New rather than directly.
func (*BaseActor) Done ¶
func (a *BaseActor) Done() <-chan struct{}
Done is closed when the actor stops.
func (*BaseActor) EnableStepMode ¶
EnableStepMode makes the actor process only when Step() is called.
func (*BaseActor) Send ¶
Send enqueues a command (blocking until enqueued, ctx canceled, or actor stopped).
type Envelope ¶
type Envelope struct {
Type string // Message type name for handler dispatch
Data []byte // JSON-encoded message payload
Reply chan Reply // Channel for sending the response
}
Envelope wraps a message for delivery to an actor's mailbox.
type HandleOption ¶
type HandleOption func(*HandleOpts)
HandleOption configures handler registration behavior.
func WithInitFunc ¶
func WithInitFunc(init HandlerInitFunc) HandleOption
WithInitFunc adds an initialization function to be called on actor startup.
func WithMessageType ¶
func WithMessageType(msgType string) HandleOption
WithMessageType overrides the message type name used for routing. By default, the type name is derived from the Go type using reflection.
type HandleOpts ¶
type HandleOpts struct {
// MessageType overrides the default type name derived from the Go type.
MessageType string
// InitFunc is called during actor initialization.
InitFunc HandlerInitFunc
}
HandleOpts configures handler registration.
type HandlerCtx ¶
type HandlerCtx interface {
context.Context
// Log returns the actor's logger.
Log() *slog.Logger
// Schedule runs f asynchronously outside the actor's mailbox processing.
// Use for I/O-bound or long-running operations that shouldn't block message handling.
Schedule(f scheduleFunc)
// Request sends a request to another actor and waits for the response.
// Returns [ErrSelfRequest] if the request would be sent to the same actor.
Request(ctx context.Context, req any) (res any, err error)
// contains filtered or unexported methods
}
HandlerCtx is the context passed to message handlers, providing access to logging, scheduling background tasks, and making requests to other actors.
type HandlerInitFunc ¶
type HandlerInitFunc func(hc HandlerCtx) error
HandlerInitFunc is called during actor initialization.
type HandlerRegistrar ¶
type HandlerRegistrar interface {
// Register adds a handler for a message type.
Register(msgType string, f func() any, handle MsgHandlerFunc, init HandlerInitFunc)
}
HandlerRegistrar allows registering message handlers with the actor.
type HandlerRegistration ¶
type HandlerRegistration func(registrar HandlerRegistrar)
HandlerRegistration is a function that registers handlers with a registrar. Create these using HandleMsg, HandleRequest, HandleEvery, etc.
func DefaultHandler ¶
func DefaultHandler(h func(HandlerCtx, any) (any, error)) HandlerRegistration
DefaultHandler registers a fallback handler for messages without a specific handler. The message is passed as-is (not deserialized) to the handler function.
func HandleEvery ¶
func HandleEvery(interval time.Duration, msgHandler func(h HandlerCtx) error) HandlerRegistration
HandleEvery registers a periodic task that runs at the given interval. The handler is called via the actor's mailbox, ensuring sequential execution with other handlers.
func HandleMsg ¶
func HandleMsg[IN any](msgHandler func(h HandlerCtx, i IN) error) HandlerRegistration
HandleMsg registers a fire-and-forget message handler for type IN. Use this for commands that don't return a value.
func HandleMsgWithOpts ¶
func HandleMsgWithOpts[IN any]( msgHandler func(h HandlerCtx, i IN) error, opts ...HandleOption, ) HandlerRegistration
HandleMsgWithOpts registers a message handler with additional options.
func HandleRequest ¶
func HandleRequest[IN any, OUT any](h func(h HandlerCtx, i IN) (*OUT, error)) HandlerRegistration
HandleRequest registers a request-response handler. The handler receives a message of type IN and returns a response of type *OUT.
func HandleRequestWithOpts ¶
func HandleRequestWithOpts[IN any, OUT any]( h func(h HandlerCtx, i IN) (*OUT, error), opts ...HandleOption, ) HandlerRegistration
HandleRequestWithOpts registers a request-response handler with additional options.
func Init ¶
func Init(initFunc HandlerInitFunc) HandlerRegistration
Init registers an initialization function called when the actor starts. Use this to set up state, start background goroutines, or perform other setup.
type MsgHandlerFunc ¶
type MsgHandlerFunc func(hc HandlerCtx, msg any) (any, error)
MsgHandlerFunc is the signature for message handler functions.
type OnPanic ¶
OnPanic is called when an actor's handler panics. It receives the recovered value, stack trace, and the message that caused the panic.
type Options ¶
type Options struct {
// MailboxSize is the capacity of the message queue. Defaults to 1024.
MailboxSize int
// ControlSize is the capacity of the control channel. Defaults to 16.
ControlSize int
// Context is the parent context for the actor's lifecycle.
Context context.Context
// Logger for actor operations. Defaults to slog.Default().
Logger *slog.Logger
// OnPanic is called when a handler panics. Defaults to logging the panic.
OnPanic OnPanic
// MaxConcurrentTasks caps the number of tasks run via HandlerCtx.Schedule.
// If 0 or negative, defaults to 32.
MaxConcurrentTasks int
// Metrics for actor instrumentation. If nil, a no-op implementation is used.
Metrics ActorMetrics
}
Options configures actor creation.
type RawHandler ¶
type RawHandler interface {
// InitHandler is called once when the actor starts, before processing messages.
InitHandler(hc HandlerCtx) error
// HandleMessage processes a message and returns a response.
HandleMessage(hc HandlerCtx, mt string, data []byte) (any, error)
}
RawHandler is the low-level interface for handling actor messages. Most users should use TypedHandlers instead of implementing this directly.
type Reply ¶
type Reply struct {
Result any // Handler return value (nil for fire-and-forget)
Error error // Handler error, if any
}
Reply carries the result of a message handler execution.
type Scheduler ¶
type Scheduler interface {
// Schedule queues a function for asynchronous execution.
Schedule(f scheduleFunc)
// Wait blocks until all in-flight tasks complete.
Wait()
}
Scheduler manages execution of background tasks with optional concurrency limits.
func NewScheduler ¶
NewScheduler creates a simple scheduler that limits the number of concurrently running tasks to max. If max <= 0, concurrency is unlimited. The scheduler respects context cancellation for graceful shutdown.
func NewSchedulerWithMetrics ¶
func NewSchedulerWithMetrics(max int, ctx context.Context, actorID string, metrics ActorMetrics) Scheduler
NewSchedulerWithMetrics creates a scheduler with metrics support.
type State ¶
type State[T any] struct { // contains filtered or unexported fields }
func (*State[T]) MarshalJSON ¶
func (*State[T]) UnmarshalJSON ¶
type StateReadOp ¶
type TypedHandlerRegistry ¶
type TypedHandlerRegistry struct {
// contains filtered or unexported fields
}
TypedHandlerRegistry manages message handlers for an actor, dispatching incoming messages to the appropriate typed handler based on message type.
func TypedHandlers ¶
func TypedHandlers(handlers ...HandlerRegistration) *TypedHandlerRegistry
TypedHandlers creates a new handler registry with the given handlers. This is the primary way to define actor message handlers.
Example:
registry := actor.TypedHandlers(
actor.HandleMsg[MyCommand](handleMyCommand),
actor.HandleRequest[MyQuery, *MyResponse](handleMyQuery),
)
myActor := registry.ToActor(actor.Options{})
func (*TypedHandlerRegistry) HandleMessage ¶
func (t *TypedHandlerRegistry) HandleMessage(hc HandlerCtx, mt string, data []byte) (any, error)
HandleMessage dispatches a message to the registered handler for its type.
func (*TypedHandlerRegistry) InitHandler ¶
func (t *TypedHandlerRegistry) InitHandler(hc HandlerCtx) error
InitHandler initializes all registered handlers. Called by the actor on startup.
func (*TypedHandlerRegistry) Register ¶
func (t *TypedHandlerRegistry) Register(msgType string, typeFactory func() any, msgHandler MsgHandlerFunc, init HandlerInitFunc)
Register adds a handler for a message type. This is typically called indirectly via HandleMsg, HandleRequest, etc.
func (*TypedHandlerRegistry) ToActor ¶
func (t *TypedHandlerRegistry) ToActor(opts Options) Actor
ToActor creates and starts an actor using this handler registry.