actor

package
v0.30.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 4, 2026 License: MIT Imports: 12 Imported by: 0

Documentation

Overview

Package actor provides a mailbox-based actor model implementation for building concurrent, message-driven systems.

Actors are the fundamental unit of computation in this package. Each actor:

  • Has a unique identity
  • Processes messages sequentially from its mailbox
  • Can schedule background tasks via [HandlerCtx.Schedule]
  • Can be paused, resumed, and stepped for debugging/testing

Creating Actors

The simplest way to create an actor is using typed handlers:

actor := actor.TypedHandlers(
    actor.HandleMsg[CreateUserCmd](func(hc actor.HandlerCtx, cmd CreateUserCmd) error {
        // Handle the command
        return nil
    }),
    actor.HandleRequest[GetUserQuery, *User](func(hc actor.HandlerCtx, q GetUserQuery) (*User, error) {
        // Handle request and return response
        return &User{ID: q.ID}, nil
    }),
    actor.HandleEvery(time.Minute, func(hc actor.HandlerCtx) error {
        // Periodic task executed every minute
        return nil
    }),
).ToActor(actor.Options{})

Message Handling

Messages are dispatched by type name to registered handlers:

  • HandleMsg registers a one-way message handler (fire-and-forget)
  • HandleRequest registers a request-response handler
  • HandleEvery registers a periodic task that runs at fixed intervals
  • DefaultHandler registers a fallback for unmatched message types
  • Init registers initialization logic run when the actor starts

Sending Messages

Use Request for request-response patterns:

user, err := actor.Request[GetUserQuery, *User](ctx, myActor, GetUserQuery{ID: "123"})

Use Publish for fire-and-forget messages:

err := actor.Publish[CreateUserCmd](ctx, myActor, CreateUserCmd{Name: "Alice"})

Background Tasks

Handlers can schedule background work via [HandlerCtx.Schedule]:

actor.HandleMsg[ProcessDataCmd](func(hc actor.HandlerCtx, cmd ProcessDataCmd) error {
    hc.Schedule(func() {
        // This runs asynchronously, outside the actor's mailbox processing
        processInBackground(cmd.Data)
    })
    return nil
})

The actor waits for all scheduled tasks to complete during shutdown.

Self-Request Detection

The actor automatically detects when a handler attempts to send a request back to itself (which would deadlock) and returns ErrSelfRequest.

Lifecycle Control

Actors support pause/resume for debugging and testing:

actor.Pause()       // Stop processing messages
actor.Step()        // Process exactly one message
actor.Resume()      // Continue normal processing
<-actor.Done()      // Wait for actor shutdown

Index

Constants

This section is empty.

Variables

View Source
var ErrSelfRequest = errors.New("self-request would deadlock: actor cannot send request to itself")

ErrSelfRequest is returned when an actor attempts to send a request to itself, which would cause a deadlock.

Functions

func Publish

func Publish[IN any](ctx context.Context, r requester, i IN) error

Publish sends a fire-and-forget message to an actor. Unlike Request, Publish does not expect a return value from the handler.

func RawRequest

func RawRequest(ctx context.Context, r requester, msgType string, data []byte) (any, error)

RawRequest sends a pre-serialized message to an actor and waits for the response. Use Request for type-safe messaging.

func Read

func Read[T any, R any](s *State[T], op func(*T) R) R

Read blocks and returns R

func ReadAsync

func ReadAsync[T any, R any](s *State[T], op func(*T) R) <-chan R

ReadAsync is non-blocking: returns a future chan R immediately

func Request

func Request[IN any, OUT any](ctx context.Context, r requester, i IN) (*OUT, error)

Request sends a request to an actor and waits for the response. The request is serialized as JSON and dispatched based on the type name of IN.

Types

type Actor

type Actor interface {
	// Send enqueues a message for processing. Blocks until the message
	// is enqueued, the context is canceled, or the actor is stopped.
	Send(ctx context.Context, msg Envelope) error
	// Pause stops message processing until Resume or Step is called.
	Pause() error
	// Resume continues normal message processing after a Pause.
	Resume() error
	// Step processes exactly one message when paused.
	Step() error
	// Done returns a channel that is closed when the actor stops.
	Done() <-chan struct{}
}

Actor is the interface for interacting with an actor instance.

func New

func New(opt Options, handler RawHandler) Actor

New creates and starts a new actor with the given options and handler. The actor begins processing messages immediately in a background goroutine.

type ActorMetrics

type ActorMetrics interface {
	// Message handling
	MessageDuration(msgType string) metrics.Timer
	MessageProcessed(msgType string, success bool)
	MessagePanic(msgType string)

	// Mailbox
	MailboxDepth(actorID string, depth int)

	// Scheduler
	SchedulerInflight(actorID string, count int)
	SchedulerTaskDuration() metrics.Timer
	SchedulerTaskCompleted(success bool)
}

ActorMetrics defines the metrics interface for the Actor pillar. All methods are thread-safe.

func NopActorMetrics

func NopActorMetrics() ActorMetrics

NopActorMetrics returns a no-op ActorMetrics implementation.

type BaseActor

type BaseActor struct {
	// contains filtered or unexported fields
}

BaseActor is the core actor implementation that processes messages sequentially from its mailbox. Create via New rather than directly.

func (*BaseActor) Done

func (a *BaseActor) Done() <-chan struct{}

Done is closed when the actor stops.

func (*BaseActor) EnableStepMode

func (a *BaseActor) EnableStepMode() error

EnableStepMode makes the actor process only when Step() is called.

func (*BaseActor) Pause

func (a *BaseActor) Pause() error

Pause prevents further processing until Resume or Step.

func (*BaseActor) Resume

func (a *BaseActor) Resume() error

Resume enables continuous processing (disables step mode).

func (*BaseActor) Send

func (a *BaseActor) Send(ctx context.Context, e Envelope) error

Send enqueues a command (blocking until enqueued, ctx canceled, or actor stopped).

func (*BaseActor) Step

func (a *BaseActor) Step() error

Step permits exactly one message/tick to be processed.

func (*BaseActor) Stop

func (a *BaseActor) Stop()

Stop requests shutdown and waits for completion.

func (*BaseActor) TrySend

func (a *BaseActor) TrySend(cmd Envelope) bool

TrySend attempts a non-blocking enqueue.

type Envelope

type Envelope struct {
	Type  string     // Message type name for handler dispatch
	Data  []byte     // JSON-encoded message payload
	Reply chan Reply // Channel for sending the response
}

Envelope wraps a message for delivery to an actor's mailbox.

type HandleOption

type HandleOption func(*HandleOpts)

HandleOption configures handler registration behavior.

func WithInitFunc

func WithInitFunc(init HandlerInitFunc) HandleOption

WithInitFunc adds an initialization function to be called on actor startup.

func WithMessageType

func WithMessageType(msgType string) HandleOption

WithMessageType overrides the message type name used for routing. By default, the type name is derived from the Go type using reflection.

type HandleOpts

type HandleOpts struct {
	// MessageType overrides the default type name derived from the Go type.
	MessageType string
	// InitFunc is called during actor initialization.
	InitFunc HandlerInitFunc
}

HandleOpts configures handler registration.

type HandlerCtx

type HandlerCtx interface {
	context.Context
	// Log returns the actor's logger.
	Log() *slog.Logger
	// Schedule runs f asynchronously outside the actor's mailbox processing.
	// Use for I/O-bound or long-running operations that shouldn't block message handling.
	Schedule(f scheduleFunc)
	// Request sends a request to another actor and waits for the response.
	// Returns [ErrSelfRequest] if the request would be sent to the same actor.
	Request(ctx context.Context, req any) (res any, err error)
	// contains filtered or unexported methods
}

HandlerCtx is the context passed to message handlers, providing access to logging, scheduling background tasks, and making requests to other actors.

type HandlerInitFunc

type HandlerInitFunc func(hc HandlerCtx) error

HandlerInitFunc is called during actor initialization.

type HandlerRegistrar

type HandlerRegistrar interface {
	// Register adds a handler for a message type.
	Register(msgType string, f func() any, handle MsgHandlerFunc, init HandlerInitFunc)
}

HandlerRegistrar allows registering message handlers with the actor.

type HandlerRegistration

type HandlerRegistration func(registrar HandlerRegistrar)

HandlerRegistration is a function that registers handlers with a registrar. Create these using HandleMsg, HandleRequest, HandleEvery, etc.

func DefaultHandler

func DefaultHandler(h func(HandlerCtx, any) (any, error)) HandlerRegistration

DefaultHandler registers a fallback handler for messages without a specific handler. The message is passed as-is (not deserialized) to the handler function.

func HandleEvery

func HandleEvery(interval time.Duration, msgHandler func(h HandlerCtx) error) HandlerRegistration

HandleEvery registers a periodic task that runs at the given interval. The handler is called via the actor's mailbox, ensuring sequential execution with other handlers.

func HandleMsg

func HandleMsg[IN any](msgHandler func(h HandlerCtx, i IN) error) HandlerRegistration

HandleMsg registers a fire-and-forget message handler for type IN. Use this for commands that don't return a value.

func HandleMsgWithOpts

func HandleMsgWithOpts[IN any](
	msgHandler func(h HandlerCtx, i IN) error,
	opts ...HandleOption,
) HandlerRegistration

HandleMsgWithOpts registers a message handler with additional options.

func HandleRequest

func HandleRequest[IN any, OUT any](h func(h HandlerCtx, i IN) (*OUT, error)) HandlerRegistration

HandleRequest registers a request-response handler. The handler receives a message of type IN and returns a response of type *OUT.

func HandleRequestWithOpts

func HandleRequestWithOpts[IN any, OUT any](
	h func(h HandlerCtx, i IN) (*OUT, error),
	opts ...HandleOption,
) HandlerRegistration

HandleRequestWithOpts registers a request-response handler with additional options.

func Init

func Init(initFunc HandlerInitFunc) HandlerRegistration

Init registers an initialization function called when the actor starts. Use this to set up state, start background goroutines, or perform other setup.

type MsgHandlerFunc

type MsgHandlerFunc func(hc HandlerCtx, msg any) (any, error)

MsgHandlerFunc is the signature for message handler functions.

type OnPanic

type OnPanic func(recovered any, stack []byte, msg any)

OnPanic is called when an actor's handler panics. It receives the recovered value, stack trace, and the message that caused the panic.

type Options

type Options struct {
	// MailboxSize is the capacity of the message queue. Defaults to 1024.
	MailboxSize int
	// ControlSize is the capacity of the control channel. Defaults to 16.
	ControlSize int
	// Context is the parent context for the actor's lifecycle.
	Context context.Context
	// Logger for actor operations. Defaults to slog.Default().
	Logger *slog.Logger
	// OnPanic is called when a handler panics. Defaults to logging the panic.
	OnPanic OnPanic
	// MaxConcurrentTasks caps the number of tasks run via HandlerCtx.Schedule.
	// If 0 or negative, defaults to 32.
	MaxConcurrentTasks int
	// Metrics for actor instrumentation. If nil, a no-op implementation is used.
	Metrics ActorMetrics
}

Options configures actor creation.

type RawHandler

type RawHandler interface {
	// InitHandler is called once when the actor starts, before processing messages.
	InitHandler(hc HandlerCtx) error
	// HandleMessage processes a message and returns a response.
	HandleMessage(hc HandlerCtx, mt string, data []byte) (any, error)
}

RawHandler is the low-level interface for handling actor messages. Most users should use TypedHandlers instead of implementing this directly.

type Reply

type Reply struct {
	Result any   // Handler return value (nil for fire-and-forget)
	Error  error // Handler error, if any
}

Reply carries the result of a message handler execution.

type Scheduler

type Scheduler interface {
	// Schedule queues a function for asynchronous execution.
	Schedule(f scheduleFunc)
	// Wait blocks until all in-flight tasks complete.
	Wait()
}

Scheduler manages execution of background tasks with optional concurrency limits.

func NewScheduler

func NewScheduler(max int, ctx context.Context) Scheduler

NewScheduler creates a simple scheduler that limits the number of concurrently running tasks to max. If max <= 0, concurrency is unlimited. The scheduler respects context cancellation for graceful shutdown.

func NewSchedulerWithMetrics

func NewSchedulerWithMetrics(max int, ctx context.Context, actorID string, metrics ActorMetrics) Scheduler

NewSchedulerWithMetrics creates a scheduler with metrics support.

type State

type State[T any] struct {
	// contains filtered or unexported fields
}

func NewState

func NewState[T any](ctx context.Context, data *T, cb func(*T)) *State[T]

func (*State[T]) MarshalJSON

func (s *State[T]) MarshalJSON() ([]byte, error)

func (*State[T]) Process

func (s *State[T]) Process(ops ...StateOp[T])

func (*State[T]) Submit

func (s *State[T]) Submit(ops ...StateOp[T]) <-chan struct{}

func (*State[T]) UnmarshalJSON

func (s *State[T]) UnmarshalJSON(data []byte) error

type StateOp

type StateOp[T any] func(*T)

type StateReadOp

type StateReadOp[T any, R any] func(*T) R

type StateTask

type StateTask[T any] interface {
	// contains filtered or unexported methods
}

type TypedHandlerRegistry

type TypedHandlerRegistry struct {
	// contains filtered or unexported fields
}

TypedHandlerRegistry manages message handlers for an actor, dispatching incoming messages to the appropriate typed handler based on message type.

func TypedHandlers

func TypedHandlers(handlers ...HandlerRegistration) *TypedHandlerRegistry

TypedHandlers creates a new handler registry with the given handlers. This is the primary way to define actor message handlers.

Example:

registry := actor.TypedHandlers(
    actor.HandleMsg[MyCommand](handleMyCommand),
    actor.HandleRequest[MyQuery, *MyResponse](handleMyQuery),
)
myActor := registry.ToActor(actor.Options{})

func (*TypedHandlerRegistry) HandleMessage

func (t *TypedHandlerRegistry) HandleMessage(hc HandlerCtx, mt string, data []byte) (any, error)

HandleMessage dispatches a message to the registered handler for its type.

func (*TypedHandlerRegistry) InitHandler

func (t *TypedHandlerRegistry) InitHandler(hc HandlerCtx) error

InitHandler initializes all registered handlers. Called by the actor on startup.

func (*TypedHandlerRegistry) Register

func (t *TypedHandlerRegistry) Register(msgType string, typeFactory func() any, msgHandler MsgHandlerFunc, init HandlerInitFunc)

Register adds a handler for a message type. This is typically called indirectly via HandleMsg, HandleRequest, etc.

func (*TypedHandlerRegistry) ToActor

func (t *TypedHandlerRegistry) ToActor(opts Options) Actor

ToActor creates and starts an actor using this handler registry.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL