actor

package module
v0.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 13, 2025 License: MIT Imports: 6 Imported by: 0

README

Actor Package

Introduction to Actors

The actor model is a conceptual model for concurrent computation that treats "actors" as the universal primitives of concurrent computation. Originating from Carl Hewitt's work in the 1970s and popularized by languages like Erlang and frameworks like Akka, actors provide a high-level abstraction for building robust, concurrent, and distributed systems.

At its core, an actor is an independent unit of computation that encapsulates:

  • State: An actor can maintain private state that it alone can modify.
  • Behavior: An actor defines how it reacts to messages it receives.
  • Mailbox: Each actor has a mailbox to queue incoming messages.

Actors communicate exclusively through asynchronous message passing. When an actor receives a message, it can:

  1. Send a finite number of messages to other actors.
  2. Create a finite number of new actors.
  3. Designate the behavior to be used for the next message it receives (which can be the same behavior).

Concurrency is managed by the actor system, allowing many actors to This model inherently promotes loose coupling, as actors do not share state execute concurrently without explicit lock management by the developer for actor state.

Motivation for this Package

In large, long-lived systems like lnd, managing complexity, concurrency, and component lifecycles becomes increasingly challenging. This actor package is introduced to address several key motivations:

Structured Message Passing

To move away from direct, synchronous method calls between major components, especially where concurrency or complex state interactions are involved. Message passing encourages clearer, more auditable interactions and helps manage concurrent access to component state.

Eliminating "God Structs"

Over time, systems can develop large "god structs" that hold references to numerous sub-systems. This leads to tight coupling, makes dependency management difficult, and can obscure the flow of control and data. Actors, by encapsulating state and behavior and interacting via messages, help break down these monolithic structures into more manageable, independent units.

Decoupled Lifecycles

Often, the lifecycle of a sub-system is unnecessarily tied to a parent system, or access to a sub-system requires traversing through a central "manager" object. Actors can have independent lifecycles managed by an actor system, allowing for more granular control over starting, stopping, and restarting components.

An example of such interaction is when an RPC call needs to go through several other structs to obtain a reference to a given sub-system, in order to make a direct method call on that sub-system.

With the model described in this document, the RPC server just needs to know about what is effectively an abstract address of that sub-system. It can then use that to obtain something similar to a mailbox to do the method call.

This allows for a more decoupled architecture, as the RPC server doesn't need to know the exact "shape" of the method to call, just which message to send. Refactors of the sub-system won't break the RPC server, as long as the message (which can be constructed via a dedicated constructor) is the same.


This package provides a foundational actor framework tailored for Go, enabling developers to build components that are easier to reason about, test, and maintain in a concurrent environment.

Core Concepts

Let's explore the fundamental building blocks provided by this package.

Messages

Actors communicate by sending and receiving messages. Any type that an actor needs to process must implement the actor.Message interface. A simple way to do this is by embedding actor.BaseMessage:

package mymodule

import "github.com/lightningnetwork/lnd/actor"

// MyRequest is a custom message type.
type MyRequest struct {
    // Embed BaseMessage to satisfy the Message interface.
    actor.BaseMessage 
    Data string
}

// MessageType returns a string identifier for this message type.
func (m *MyRequest) MessageType() string {
    return "MyRequest"
}

// MyResponse might be a corresponding response type.
type MyResponse struct {
    actor.BaseMessage
    Reply string
}

func (m *MyResponse) MessageType() string {
    return "MyResponse"
}

The MessageType() method provides a string representation of the message type, which can be useful for debugging or routing.

Actor Behavior

The logic of an actor (how it responds to messages) is defined by its ActorBehavior. This is an interface that you implement:

package actor

// ActorBehavior defines the logic for how an actor processes incoming messages.
type ActorBehavior[M Message, R any] interface {
    Receive(actorCtx context.Context, msg M) fn.Result[R]
}

The Receive method passes in a caller context (useful for shutdown detection) and the incoming message. It returns an fn.Result[R], which can encapsulate either a successful response of type R or an error.

For simple cases, you can use actor.FunctionBehavior to adapt a Go function into an ActorBehavior:

import (
    "context"
    "fmt"
    "github.com/lightningnetwork/lnd/actor"
    "github.com/lightningnetwork/lnd/fn/v2"
)

// myActorLogic defines the processing for MyRequest messages.
func myActorLogic(ctx context.Context, msg *MyRequest) fn.Result[*MyResponse] {
    // In a real actor, you might interact with state or other services.
    // The actor's context (ctx) can be checked for shutdown signals.
    select {
    case <-ctx.Done():
        return fn.Err[*MyResponse](errors.New("actor shutting down"))
    default:
    }

    response := &MyResponse{Reply: fmt.Sprintf("Processed: %s", msg.Data)}
    return fn.Ok(response)
}

// Create a behavior from the function.
behavior := actor.NewFunctionBehavior(myActorLogic)

For more complex cases, you can implement the Receive method on a new struct, and pass that around directly.

Service Keys and Actor References: The Interaction Layer

Direct interaction with an actor's internal state or its concrete struct is discouraged. Instead, communication and discovery are managed through two key abstractions: ServiceKey and ActorRef. These provide a layer of indirection, promoting loose coupling and location transparency (though the current implementation is in-process).

ServiceKey[M Message, R any]

A ServiceKey is a type-safe identifier used for registering actors that provide a particular service and for discovering them later. The generic type parameters M (the type of message the actor handles) and R (the type of response the actor produces for Ask operations) ensure that you discover actors compatible with the interactions you intend to perform.

// Define a service key for actors that handle MyRequest and produce MyResponse.
myServiceKey := actor.NewServiceKey[*MyRequest, *MyResponse]("my-custom-service")

// Later, this key would be used with a Receptionist (part of an ActorSystem)
// to find ActorRefs for actors offering this service.
ActorRef[M Message, R any]

An ActorRef is a lightweight, shareable reference to an actor. It's the primary means by which you send messages to an actor. It is also generic over the message type M and response type R that the target actor handles.

You typically obtain an ActorRef by looking it up in a Receptionist using a ServiceKey (covered later when discussing the ActorSystem), or directly from an actor instance via its .Ref() method (e.g., sampleActor.Ref() if you have the Actor instance).

There are two main ways to send messages using an ActorRef:

  1. Tell (Fire-and-Forget): Used for sending messages when you don't need a direct reply. The call returns immediately after attempting to enqueue the message.

    // Assuming 'actorRef' is an ActorRef[*MyRequest, *MyResponse] obtained for an actor.
    requestMsg := &MyRequest{Data: "A fire-and-forget message"}
    actorRef.Tell(context.Background(), requestMsg)
    // The message is now in the actor's mailbox (or will be shortly).
    

    The context.Context passed to Tell can be used to cancel the send operation if, for example, the actor's mailbox is full and the send would block for too long.

  2. Ask (Request-Response): Used when you need a response from the actor. This returns a Future[R], which represents the eventual reply.

    // Assuming 'actorRef' is an ActorRef[*MyRequest, *MyResponse].
    askMsg := &MyRequest{Data: "A request needing a response"}
    futureResponse := actorRef.Ask(context.Background(), askMsg)
    

    A Future[R] represents a result that will be available at some point. You can block until it's ready using Await:

    // Await the result. It's good practice to use a context with a timeout.
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()
    
    result := futureResponse.Await(ctx)
    response, err := result.Unpack() 
    if err != nil {
        fmt.Printf("Ask failed: %v\n", err)
        // return or handle error
    } else {
        fmt.Printf("Received reply: %s\n", response.Reply)
    }
    

    The Future interface also offers non-blocking ways to handle results, like OnComplete (for callbacks) and ThenApply (for chaining transformations). A more restricted TellOnlyRef[M] is also available if only fire-and-forget semantics are required (obtained via an actor's TellRef() method).

Actors

An Actor is the concrete entity that runs a behavior, manages a mailbox, and has a lifecycle. You create an actor using actor.NewActor with an ActorConfig:

cfg := actor.ActorConfig[*MyRequest, *MyResponse]{
    ID:          "my-sample-actor",
    Behavior:    behavior,
    MailboxSize: 10,
    // Dead Letter Office (covered later)
    DLO:         nil,
}
sampleActor := actor.NewActor(cfg)

An actor doesn't start processing messages until its Start() method is called. This launches a dedicated goroutine for the actor.

sampleActor.Start()

To stop an actor, you call its Stop() method. This cancels the actor's internal context, causing its goroutine to clean up and exit.

// Sometime later...
sampleActor.Stop()

Visualizing Actor Relationships

The following diagram illustrates the primary components of the actor package and their relationships. It provides a high-level overview of how actors are managed, discovered, and interacted with.

classDiagram
    direction TB
    
    class ActorSystem {
      +Receptionist
      +DeadLetters
      +Shutdown()
    }
    
    class Receptionist {
      +Find(ServiceKey) ActorRef[]
      +Register(ServiceKey, ActorRef)
    }
    
    class DeadLetterOffice {
      +Receive(undeliverable Message)
    }
    
    class ServiceKey {
      +Spawn(ActorSystem, Behavior) ActorRef
    }
    
    class Actor {
      -mailbox
      -behavior
      +Ref() ActorRef
      +Start()
      +Stop()
    }
    
    class ActorRef {
      <<Interface>>
      +Tell(Message)
      +Ask(Message) Future
    }
    
    class Message {
      <<Interface>>
    }
    
    class Future {
      +Await() Result
    }
    
    class Router {
      +Tell(Message)
      +Ask(Message) Future
    }
    
    %% Core system relationships
    ActorSystem *-- Receptionist : has
    ActorSystem *-- DeadLetterOffice : provides
    ActorSystem o-- "manages" Actor
    
    %% Actor and communication
    Actor --> ActorRef : provides
    Actor ..> Message : processes
    ActorRef ..> Message : sends
    ActorRef ..> Future : returns for Ask
    
    %% Service discovery and routing
    Receptionist o-- ServiceKey : uses for lookup
    ServiceKey ..> Actor : creates
    Router --> ActorRef : routes to
    Router --> Receptionist : discovers actors via
    
    note for ActorSystem "Central manager for actor lifecycle and service discovery"
    note for Actor "Independent unit with encapsulated state and behavior"
    note for ActorRef "Location-transparent handle for sending messages"
    note for Message "Data exchanged between actors"
    note for ServiceKey "Type-safe identifier for actor registration and discovery"
    note for Router "Distributes messages among multiple actors"
    note for DeadLetterOffice "Handles messages that cannot be delivered"

The Actor System

While individual actors are useful, they often need to be managed and coordinated. The ActorSystem serves this purpose.

system := actor.NewActorSystem()
// Ensures all actors in the system are stopped.
defer system.Shutdown()
Actor Lifecycle and Registration

The ActorSystem can manage the lifecycle of actors. You can register actors with the system:

// Using 'behavior' from earlier and 'myServiceKey' defined in the 
// "Service Keys and Actor References" section.

// RegisterWithSystem creates, starts, and registers the actor.
actorRefFromSystem := actor.RegisterWithSystem(
    system, "system-managed-actor", myServiceKey, behavior,
)

Alternatively, a ServiceKey itself provides a Spawn method for convenience:

actorRefSpawned := myServiceKey.Spawn(system, "spawned-actor", behavior)

Actors registered with the system are automatically stopped when system.Shutdown() is called. You can also stop and remove individual actors using system.StopAndRemoveActor(actorID).

A ServiceKey is essentially the mailbox address of an actor.

Receptionist: Service Discovery

Actors often need to find other actors to communicate with. The Receptionist facilitates this. Actors are registered with the receptionist using a ServiceKey, which is type-safe.

// Get the system's receptionist.
receptionist := system.Receptionist()

// Find actors registered for a specific service key.
foundRefs := actor.FindInReceptionist(receptionist, myServiceKey)
if len(foundRefs) > 0 {
    targetActor := foundRefs[0]
    targetActor.Tell(context.Background(), &MyRequest{Data: "Hello from a discoverer!"})
} else {
    fmt.Println("No actors found for service key:", myServiceKey)
}

When an actor is stopped (e.g., via ServiceKey.Unregister or system shutdown), it should also be unregistered from the receptionist.

Dead Letter Office (DLO)

What happens to messages that cannot be delivered? For example, if an actor is stopped while messages are still in its mailbox, or if a message is sent to an actor that doesn't exist (though the current ActorRef design makes the latter less likely for direct sends).

The ActorSystem provides a default DeadLetterActor. When an actor is configured (via ActorConfig.DLO), undeliverable messages (e.g., those drained from its mailbox upon shutdown) can be routed to this DLO. This allows for logging, auditing, or potential manual intervention for "lost" messages.

// Actors created via RegisterWithSystem or ServiceKey.Spawn
// are automatically configured to use the system's DLO.
// system.DeadLetters() returns an ActorRef to the system's DLO.

Routers: Distributing Work

Sometimes, you might have multiple actors performing the same kind of task, and you want to distribute messages among them. A Router can do this. It's not an actor itself but acts as a dispatcher.

A Router uses a RoutingStrategy to pick one actor from a group registered under a ServiceKey.

// Assume 'system' and 'myServiceKey' are set up, and multiple actors
// are registered with 'myServiceKey'.

// Create a round-robin routing strategy.
roundRobinStrategy := actor.NewRoundRobinStrategy[*MyRequest, *MyResponse]()

// Create a router for 'myServiceKey' using this strategy.
// Messages sent to this router will be forwarded to one of the actors
// registered under 'myServiceKey'.
// The router also needs a DLO for messages it can't route (e.g., if no actors are available).
serviceRouter := actor.NewRouter(
    system.Receptionist(),
    myServiceKey,
    roundRobinStrategy,
    system.DeadLetters(),
)

// Now, interact with the router as if it were an ActorRef:
serviceRouter.Tell(context.Background(), &MyRequest{Data: "Message via router"})

futureReplyFromRouter := serviceRouter.Ask(context.Background(), &MyRequest{Data: "Ask via router"})
// ... await futureReplyFromRouter ...

If the router cannot find any available actors for the ServiceKey (e.g., none are registered or running), Tell operations will typically send the message to the router's configured DLO, and Ask operations will return a Future completed with ErrNoActorsAvailable.

Documentation

Index

Examples

Constants

This section is empty.

Variables

View Source
var ErrActorTerminated = fmt.Errorf("actor terminated")

ErrActorTerminated indicates that an operation failed because the target actor was terminated or in the process of shutting down.

View Source
var ErrNoActorsAvailable = errors.New("no actors available for service key")

ErrNoActorsAvailable is returned when a router cannot find any actors registered for its service key to forward a message to.

Functions

func RegisterWithReceptionist

func RegisterWithReceptionist[M Message, R any](
	r *Receptionist, key ServiceKey[M, R], ref ActorRef[M, R])

RegisterWithReceptionist registers an actor with a service key in the given receptionist. This is a package-level generic function because methods cannot have their own type parameters in Go (as of the current version). It appends the actor reference to the list associated with the key's name.

func UnregisterFromReceptionist

func UnregisterFromReceptionist[M Message, R any](r *Receptionist,
	key ServiceKey[M, R], refToRemove ActorRef[M, R]) bool

UnregisterFromReceptionist removes an actor reference from a service key in the given receptionist. It returns true if the reference was found and removed, and false otherwise. This is a package-level generic function because methods cannot have their own type parameters in Go.

Types

type Actor

type Actor[M Message, R any] struct {
	// contains filtered or unexported fields
}

Actor represents a concrete actor implementation. It encapsulates a behavior, manages its internal state implicitly through that behavior, and processes messages from its mailbox sequentially in its own goroutine.

Example

ExampleActor demonstrates creating a single actor, sending it a message directly using Ask, and then unregistering and stopping it.

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/lightningnetwork/lnd/actor"
	"github.com/lightningnetwork/lnd/fn/v2"
)

// BasicGreetingMsg is a simple message type for the basic actor example.
type BasicGreetingMsg struct {
	actor.BaseMessage
	Name string
}

// MessageType implements actor.Message.
func (m BasicGreetingMsg) MessageType() string { return "BasicGreetingMsg" }

// BasicGreetingResponse is a simple response type.
type BasicGreetingResponse struct {
	Greeting string
}

// ExampleActor demonstrates creating a single actor, sending it a message
// directly using Ask, and then unregistering and stopping it.
func main() {
	system := actor.NewActorSystem()
	defer system.Shutdown()

	//nolint:ll
	greeterKey := actor.NewServiceKey[BasicGreetingMsg, BasicGreetingResponse](
		"basic-greeter",
	)

	actorID := "my-greeter"
	greeterBehavior := actor.NewFunctionBehavior(
		func(ctx context.Context,
			msg BasicGreetingMsg) fn.Result[BasicGreetingResponse] {

			return fn.Ok(BasicGreetingResponse{
				Greeting: "Hello, " + msg.Name + " from " +
					actorID,
			})
		},
	)

	// Spawn the actor. This registers it with the system and receptionist,
	// and starts it. It returns an ActorRef.
	greeterRef := greeterKey.Spawn(system, actorID, greeterBehavior)
	fmt.Printf("Actor %s spawned.\n", greeterRef.ID())

	// Send a message directly to the actor's reference.
	askCtx, askCancel := context.WithTimeout(
		context.Background(), 1*time.Second,
	)
	defer askCancel()
	futureResponse := greeterRef.Ask(
		askCtx, BasicGreetingMsg{Name: "World"},
	)

	awaitCtx, awaitCancel := context.WithTimeout(
		context.Background(), 1*time.Second,
	)
	defer awaitCancel()
	result := futureResponse.Await(awaitCtx)

	result.WhenErr(func(err error) {
		fmt.Printf("Error awaiting response: %v\n", err)
	})
	result.WhenOk(func(response BasicGreetingResponse) {
		fmt.Printf("Received: %s\n", response.Greeting)
	})

	// Unregister the actor. This also stops the actor.
	unregistered := greeterKey.Unregister(system, greeterRef)
	if unregistered {
		fmt.Printf("Actor %s unregistered and stopped.\n",
			greeterRef.ID())
	} else {
		fmt.Printf("Failed to unregister actor %s.\n", greeterRef.ID())
	}

	// Verify it's no longer in the receptionist.
	refsAfterUnregister := actor.FindInReceptionist(
		system.Receptionist(), greeterKey,
	)
	fmt.Printf("Actors for key '%s' after unregister: %d\n",
		"basic-greeter", len(refsAfterUnregister))

}
Output:

Actor my-greeter spawned.
Received: Hello, World from my-greeter
Actor my-greeter unregistered and stopped.
Actors for key 'basic-greeter' after unregister: 0
Example (Stateful)

ExampleActor_stateful demonstrates creating an actor whose behavior is defined by a struct with methods, allowing it to maintain internal state.

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/lightningnetwork/lnd/actor"
	"github.com/lightningnetwork/lnd/fn/v2"
)

// CounterMsg is a message type for the stateful counter actor.
// It can be used to increment the counter or get its current value.
type CounterMsg struct {
	actor.BaseMessage
	Increment int
	GetValue  bool
	Who       string
}

// MessageType implements actor.Message.
func (m CounterMsg) MessageType() string { return "CounterMsg" }

// CounterResponse is a response type for the counter actor.
type CounterResponse struct {
	Value     int
	Responder string
}

// StatefulCounterActor demonstrates an actor that maintains internal state (a
// counter) and processes messages to modify or query that state.
type StatefulCounterActor struct {
	counter int
	actorID string
}

// NewStatefulCounterActor creates a new counter actor.
func NewStatefulCounterActor(id string) *StatefulCounterActor {
	return &StatefulCounterActor{
		actorID: id,
	}
}

// Receive is the message handler for the StatefulCounterActor.
// It implements the actor.ActorBehavior interface implicitly when wrapped.
func (s *StatefulCounterActor) Receive(ctx context.Context,
	msg CounterMsg) fn.Result[CounterResponse] {

	if msg.Increment > 0 {
		// For increment, we can just acknowledge or return the new
		// value. Messages are sent serially, so we don't need to worry
		// about a mutex here.
		s.counter += msg.Increment

		return fn.Ok(CounterResponse{
			Value:     s.counter,
			Responder: s.actorID,
		})
	}

	if msg.GetValue {
		return fn.Ok(CounterResponse{
			Value:     s.counter,
			Responder: s.actorID,
		})
	}

	return fn.Err[CounterResponse](fmt.Errorf("invalid CounterMsg"))
}

// ExampleActor_stateful demonstrates creating an actor whose behavior is defined
// by a struct with methods, allowing it to maintain internal state.
func main() {
	system := actor.NewActorSystem()
	defer system.Shutdown()

	counterServiceKey := actor.NewServiceKey[CounterMsg, CounterResponse](
		"struct-counter-service",
	)

	// Create an instance of our stateful actor logic.
	actorID := "counter-actor-1"
	counterLogic := NewStatefulCounterActor(actorID)

	// Spawn the actor.
	// The counterLogic instance itself satisfies the ActorBehavior
	// interface because its Receive method matches the required signature.
	counterRef := counterServiceKey.Spawn(system, actorID, counterLogic)
	fmt.Printf("Actor %s spawned.\n", counterRef.ID())

	// Send messages to increment the counter.
	for i := 1; i <= 3; i++ {
		askCtx, askCancel := context.WithTimeout(
			context.Background(), 1*time.Second,
		)
		futureResp := counterRef.Ask(askCtx,
			CounterMsg{
				Increment: i,
				Who:       fmt.Sprintf("Incrementer-%d", i),
			},
		)
		awaitCtx, awaitCancel := context.WithTimeout(
			context.Background(), 1*time.Second,
		)
		resp := futureResp.Await(awaitCtx)

		resp.WhenOk(func(r CounterResponse) {
			fmt.Printf("Incremented by %d, new value: %d "+
				"(from %s)\n", i, r.Value, r.Responder)
		})
		resp.WhenErr(func(e error) {
			fmt.Printf("Error incrementing: %v\n", e)
		})
		awaitCancel()
		askCancel()
	}

	// Send a message to get the current value.
	askCtx, askCancel := context.WithTimeout(
		context.Background(), 1*time.Second,
	)
	futureResp := counterRef.Ask(
		askCtx, CounterMsg{GetValue: true, Who: "Getter"},
	)

	awaitCtx, awaitCancel := context.WithTimeout(
		context.Background(), 1*time.Second,
	)

	finalValueResp := futureResp.Await(awaitCtx)
	finalValueResp.WhenOk(func(r CounterResponse) {
		fmt.Printf("Final counter value: %d (from %s)\n",
			r.Value, r.Responder)
	})
	finalValueResp.WhenErr(func(e error) {
		fmt.Printf("Error getting value: %v\n", e)
	})
	awaitCancel()
	askCancel()

}
Output:

Actor counter-actor-1 spawned.
Incremented by 1, new value: 1 (from counter-actor-1)
Incremented by 2, new value: 3 (from counter-actor-1)
Incremented by 3, new value: 6 (from counter-actor-1)
Final counter value: 6 (from counter-actor-1)

func NewActor

func NewActor[M Message, R any](cfg ActorConfig[M, R]) *Actor[M, R]

NewActor creates a new actor instance with the given ID and behavior. It initializes the actor's internal structures but does not start its message processing goroutine. The Start() method must be called to begin processing messages.

func (*Actor[M, R]) Ref

func (a *Actor[M, R]) Ref() ActorRef[M, R]

Ref returns an ActorRef for this actor. This allows clients to interact with the actor (send messages) without having direct access to the Actor struct itself, promoting encapsulation and location transparency.

func (*Actor[M, R]) Start

func (a *Actor[M, R]) Start()

Start initiates the actor's message processing loop in a new goroutine. This method should be called once after the actor is created.

func (*Actor[M, R]) Stop

func (a *Actor[M, R]) Stop()

Stop signals the actor to terminate its processing loop and shut down. This is achieved by cancelling the actor's internal context. The actor's goroutine will exit once it detects the context cancellation.

func (*Actor[M, R]) TellRef

func (a *Actor[M, R]) TellRef() TellOnlyRef[M]

TellRef returns a TellOnlyRef for this actor. This allows clients to send messages to the actor using only the "tell" pattern (fire-and-forget), without having access to "ask" capabilities.

type ActorBehavior

type ActorBehavior[M Message, R any] interface {
	// Receive processes a message and returns a Result. The provided
	// context is the actor's internal context, which can be used to
	// detect actor shutdown requests.
	Receive(actorCtx context.Context, msg M) fn.Result[R]
}

ActorBehavior defines the logic for how an actor processes incoming messages. It is a strategy interface that encapsulates the actor's reaction to messages.

type ActorConfig

type ActorConfig[M Message, R any] struct {
	// ID is the unique identifier for the actor.
	ID string

	// Behavior defines how the actor responds to messages.
	Behavior ActorBehavior[M, R]

	// DLO is a reference to the dead letter office for this actor system.
	// If nil, undeliverable messages during shutdown or due to a full
	// mailbox (if such logic were added) might be dropped.
	DLO ActorRef[Message, any]

	// MailboxSize defines the buffer capacity of the actor's mailbox.
	MailboxSize int
}

ActorConfig holds the configuration parameters for creating a new Actor. It is generic over M (Message type) and R (Response type) to accommodate the actor's specific behavior.

type ActorFunc

type ActorFunc[M Message, R any] func(context.Context, M) fn.Result[R]

ActorFunc is a function type that represents an actor which functions purely based on a simple function processor.

type ActorRef

type ActorRef[M Message, R any] interface {
	TellOnlyRef[M]

	// Ask sends a message and returns a Future for the response.
	// The Future will be completed with the actor's reply or an error
	// if the operation fails (e.g., context cancellation before send).
	Ask(ctx context.Context, msg M) Future[R]
}

ActorRef is a reference to an actor that supports both "tell" and "ask" operations. It embeds TellOnlyRef and adds the Ask method for request-response interactions.

func FindInReceptionist

func FindInReceptionist[M Message, R any](
	r *Receptionist, key ServiceKey[M, R]) []ActorRef[M, R]

FindInReceptionist returns all actors registered with a service key in the given receptionist. This is a package-level generic function because methods cannot have their own type parameters. It performs a type assertion to ensure that only ActorRefs matching the ServiceKey's generic types (M, R) are returned, providing type safety.

func RegisterWithSystem

func RegisterWithSystem[M Message, R any](as *ActorSystem, id string, key ServiceKey[M, R],
	behavior ActorBehavior[M, R],
) ActorRef[M, R]

RegisterWithSystem creates an actor with the given ID, service key, and behavior within the specified ActorSystem. It starts the actor, adds it to the system's management, registers it with the receptionist using the provided key, and returns its ActorRef.

type ActorSystem

type ActorSystem struct {
	// contains filtered or unexported fields
}

ActorSystem manages the lifecycle of actors and provides coordination services such as a receptionist for actor discovery and a dead letter office for undeliverable messages. It also handles the graceful shutdown of all managed actors.

func NewActorSystem

func NewActorSystem() *ActorSystem

NewActorSystem creates a new actor system using the default configuration.

func NewActorSystemWithConfig

func NewActorSystemWithConfig(config SystemConfig) *ActorSystem

NewActorSystemWithConfig creates a new actor system with custom configuration

func (*ActorSystem) DeadLetters

func (as *ActorSystem) DeadLetters() ActorRef[Message, any]

DeadLetters returns a reference to the system's dead letter actor. Messages that cannot be delivered to their intended recipient (e.g., if an Ask context is cancelled before enqueuing) may be routed here if not otherwise handled.

func (*ActorSystem) Receptionist

func (as *ActorSystem) Receptionist() *Receptionist

Receptionist returns the system's receptionist, which can be used for actor service discovery (finding actors by ServiceKey).

func (*ActorSystem) Shutdown

func (as *ActorSystem) Shutdown() error

Shutdown gracefully stops the actor system. It iterates through all managed actors, including the dead letter actor, and calls their Stop method. After initiating the stop for all actors, it cancels the main system context. This method is safe for concurrent use.

func (*ActorSystem) StopAndRemoveActor

func (as *ActorSystem) StopAndRemoveActor(id string) bool

StopAndRemoveActor stops a specific actor by its ID and removes it from the ActorSystem's management. It returns true if the actor was found and stopped, false otherwise.

type BaseMessage

type BaseMessage struct{}

BaseMessage is a helper struct that can be embedded in message types defined outside the actor package to satisfy the Message interface's unexported messageMarker method.

type FunctionBehavior

type FunctionBehavior[M Message, R any] struct {
	// contains filtered or unexported fields
}

FunctionBehavior adapts a function to the ActorBehavior interface.

func FunctionBehaviorFromSimple

func FunctionBehaviorFromSimple[M Message, R any](
	sFunc func(M) (R, error)) *FunctionBehavior[M, R]

FunctionBehaviorFromSimple adapts a simpler function to the ActorBehavior interface.

func NewFunctionBehavior

func NewFunctionBehavior[M Message, R any](
	fn ActorFunc[M, R]) *FunctionBehavior[M, R]

NewFunctionBehavior creates a behavior from a function.

func (*FunctionBehavior[M, R]) Receive

func (b *FunctionBehavior[M, R]) Receive(ctx context.Context,
	msg M) fn.Result[R]

Receive implements ActorBehavior interface for the function.

TODO(roasbeef): just base it off the function direct instead?

type Future

type Future[T any] interface {
	// Await blocks until the result is available or the context is
	// cancelled, then returns it.
	Await(ctx context.Context) fn.Result[T]

	// ThenApply registers a function to transform the result of a future.
	// The original future is not modified, a new instance of the future is
	// returned. If the passed context is cancelled while waiting for the
	// original future to complete, the new future will complete with the
	// context's error.
	ThenApply(ctx context.Context, fn func(T) T) Future[T]

	// OnComplete registers a function to be called when the result of the
	// future is ready. If the passed context is cancelled before the future
	// completes, the callback function will be invoked with the context's
	// error.
	OnComplete(ctx context.Context, fn func(fn.Result[T]))
}

Future represents the result of an asynchronous computation. It allows consumers to wait for the result (Await), apply transformations upon completion (ThenApply), or register a callback to be executed when the result is available (OnComplete).

type MapInputRef added in v0.0.3

type MapInputRef[In Message, Out Message] struct {
	// contains filtered or unexported fields
}

MapInputRef wraps a TellOnlyRef and transforms incoming messages before forwarding them to the target ref. This allows adapting a ref that expects message type Out to accept message type In, eliminating the need for intermediate adapter actors.

This is particularly useful for notification patterns where a source actor sends events of a specific type, but consumers want to receive events in their own domain-specific type.

Example usage:

// roundActorRef accepts round.ConfirmationEvent
// chainsource sends chainsource.ConfirmationEvent
adaptedRef := actor.NewMapInputRef(
    roundActorRef,
    func(cs chainsource.ConfirmationEvent) round.ConfirmationEvent {
        return round.ConfirmationEvent{
            TxID:        cs.Txid,
            BlockHeight: cs.BlockHeight,
            // ... transform fields
        }
    },
)
// Now adaptedRef can be used as TellOnlyRef[chainsource.ConfirmationEvent]

func NewMapInputRef added in v0.0.3

func NewMapInputRef[In Message, Out Message](
	targetRef TellOnlyRef[Out], mapFn func(In) Out) *MapInputRef[In, Out]

NewMapInputRef creates a new message-transforming wrapper around a TellOnlyRef. The mapFn function is called for each message to transform it from type In to type Out before forwarding to targetRef.

func (*MapInputRef[In, Out]) ID added in v0.0.3

func (m *MapInputRef[In, Out]) ID() string

ID returns a unique identifier for this actor. The ID includes the "map-input-" prefix to indicate this is a transformation wrapper.

func (*MapInputRef[In, Out]) Tell added in v0.0.3

func (m *MapInputRef[In, Out]) Tell(ctx context.Context, msg In)

Tell transforms the incoming message using the map function and forwards it to the target ref. If the context is cancelled before the message can be sent to the target actor's mailbox, the message may be dropped.

type Message

type Message interface {

	// MessageType returns the type name of the message for
	// routing/filtering.
	MessageType() string
	// contains filtered or unexported methods
}

Message is a sealed interface for actor messages. Actors will receive messages conforming to this interface. The interface is "sealed" by the unexported messageMarker method, meaning only types that can satisfy it (e.g., by embedding BaseMessage or being in the same package) can be Messages.

type PriorityMessage

type PriorityMessage interface {
	Message

	// Priority returns the processing priority of this message (higher =
	// more important).
	Priority() int
}

PriorityMessage is an extension of the Message interface for messages that carry a priority level. This can be used by actor mailboxes or schedulers to prioritize message processing.

type Promise

type Promise[T any] interface {
	// Future returns the Future interface associated with this Promise.
	// Consumers can use this to Await the result or register callbacks.
	Future() Future[T]

	// Complete attempts to set the result of the future. It returns true if
	// this call successfully set the result (i.e., it was the first to
	// complete it), and false if the future had already been completed.
	Complete(result fn.Result[T]) bool
}

Promise is an interface that allows for the completion of an associated Future. It provides a way to set the result of an asynchronous operation. The producer of an asynchronous result uses a Promise to set the outcome, while consumers use the associated Future to retrieve it.

func NewPromise

func NewPromise[T any]() Promise[T]

NewPromise creates a new Promise. The associated Future, which consumers can use to await the result, can be obtained via the Future() method. The Future is completed by calling the Complete() method on this Promise.

type Receptionist

type Receptionist struct {
	// contains filtered or unexported fields
}

Receptionist provides service discovery for actors. Actors can be registered under a ServiceKey and later discovered by other actors or system components.

type RoundRobinStrategy

type RoundRobinStrategy[M Message, R any] struct {
	// contains filtered or unexported fields
}

RoundRobinStrategy implements a round-robin selection strategy. It is generic over M and R to match the RoutingStrategy interface, though its logic doesn't depend on these types directly for the selection mechanism itself.

func NewRoundRobinStrategy

func NewRoundRobinStrategy[M Message, R any]() *RoundRobinStrategy[M, R]

NewRoundRobinStrategy creates a new RoundRobinStrategy, initialized for round-robin selection.

func (*RoundRobinStrategy[M, R]) Select

func (s *RoundRobinStrategy[M, R]) Select(refs []ActorRef[M, R]) (ActorRef[M, R], error)

Select picks an actor from the list using a round-robin algorithm.

type Router

type Router[M Message, R any] struct {
	// contains filtered or unexported fields
}

Router is a message-dispatching component that fronts multiple actors registered under a specific ServiceKey. It uses a RoutingStrategy to distribute messages to one of the available actors. It is generic over M (Message type) and R (Response type) to match the actors it routes to.

Example

ExampleRouter demonstrates creating multiple actors under the same service key and using a router to dispatch messages to them.

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/lightningnetwork/lnd/actor"
	"github.com/lightningnetwork/lnd/fn/v2"
)

// RouterGreetingMsg is a message type for the router example.
type RouterGreetingMsg struct {
	actor.BaseMessage
	Name string
}

// MessageType implements actor.Message.
func (m RouterGreetingMsg) MessageType() string { return "RouterGreetingMsg" }

// RouterGreetingResponse is a response type for the router example.
type RouterGreetingResponse struct {
	Greeting  string
	HandlerID string
}

// ExampleRouter demonstrates creating multiple actors under the same service
// key and using a router to dispatch messages to them.
func main() {
	system := actor.NewActorSystem()
	defer system.Shutdown()

	//nolint:ll
	routerGreeterKey := actor.NewServiceKey[RouterGreetingMsg, RouterGreetingResponse](
		"router-greeter-service",
	)

	// Behavior for the first greeter actor.
	actorID1 := "router-greeter-1"
	greeterBehavior1 := actor.NewFunctionBehavior(
		func(ctx context.Context,
			msg RouterGreetingMsg) fn.Result[RouterGreetingResponse] {

			return fn.Ok(RouterGreetingResponse{
				Greeting:  "Greetings, " + msg.Name + "!",
				HandlerID: actorID1,
			})
		},
	)
	routerGreeterKey.Spawn(system, actorID1, greeterBehavior1)
	fmt.Printf("Actor %s spawned.\n", actorID1)

	// Behavior for the second greeter actor.
	actorID2 := "router-greeter-2"
	greeterBehavior2 := actor.NewFunctionBehavior(
		func(ctx context.Context,
			msg RouterGreetingMsg) fn.Result[RouterGreetingResponse] {

			return fn.Ok(RouterGreetingResponse{
				Greeting:  "Salutations, " + msg.Name + "!",
				HandlerID: actorID2,
			})
		},
	)
	routerGreeterKey.Spawn(system, actorID2, greeterBehavior2)
	fmt.Printf("Actor %s spawned.\n", actorID2)

	// Create a router for the "router-greeter-service".
	greeterRouter := actor.NewRouter(
		system.Receptionist(), routerGreeterKey,
		actor.NewRoundRobinStrategy[RouterGreetingMsg,
			RouterGreetingResponse](),
		system.DeadLetters(),
	)
	fmt.Printf("Router %s created for service key '%s'.\n",
		greeterRouter.ID(), "router-greeter-service")

	// Send messages through the router.
	names := []string{"Alice", "Bob", "Charlie", "David"}
	for _, name := range names {
		askCtx, askCancel := context.WithTimeout(
			context.Background(), 1*time.Second,
		)
		futureResponse := greeterRouter.Ask(
			askCtx, RouterGreetingMsg{Name: name},
		)

		awaitCtx, awaitCancel := context.WithTimeout(
			context.Background(), 1*time.Second,
		)
		result := futureResponse.Await(awaitCtx)

		result.WhenErr(func(err error) {
			fmt.Printf("For %s: Error - %v\n", name, err)
		})
		result.WhenOk(func(response RouterGreetingResponse) {
			fmt.Printf("For %s: Received '%s' from %s\n",
				name, response.Greeting, response.HandlerID)
		})
		awaitCancel()
		askCancel()
	}

}
Output:

Actor router-greeter-1 spawned.
Actor router-greeter-2 spawned.
Router router(router-greeter-service) created for service key 'router-greeter-service'.
For Alice: Received 'Greetings, Alice!' from router-greeter-1
For Bob: Received 'Salutations, Bob!' from router-greeter-2
For Charlie: Received 'Greetings, Charlie!' from router-greeter-1
For David: Received 'Salutations, David!' from router-greeter-2

func NewRouter

func NewRouter[M Message, R any](receptionist *Receptionist,
	key ServiceKey[M, R], strategy RoutingStrategy[M, R],
	dlo ActorRef[Message, any]) *Router[M, R]

NewRouter creates a new Router for a given service key and strategy. The receptionist is used to discover actors registered with the service key. The router itself is not an actor but a message dispatcher that behaves like an ActorRef from the sender's perspective.

func (*Router[M, R]) Ask

func (r *Router[M, R]) Ask(ctx context.Context, msg M) Future[R]

Ask sends a message to one of the actors managed by the router, selected by the routing strategy, and returns a Future for the response. If no actors are available (ErrNoActorsAvailable), the Future will be completed with this error. If the send context is cancelled before the message can be enqueued in the chosen actor's mailbox, the Future will be completed with the context's error.

func (*Router[M, R]) ID

func (r *Router[M, R]) ID() string

ID provides an identifier for the router. Since a router isn't an actor itself but a dispatcher for a service, its ID can be based on the service key.

func (*Router[M, R]) Tell

func (r *Router[M, R]) Tell(ctx context.Context, msg M)

Tell sends a message to one of the actors managed by the router, selected by the routing strategy. If no actors are available or the send context is cancelled before the message can be enqueued in the target actor's mailbox, the message may be dropped. Errors during actor selection (e.g., ErrNoActorsAvailable) are currently not propagated from Tell, aligning with its fire-and-forget nature. Such errors could be logged internally if needed.

type RoutingStrategy

type RoutingStrategy[M Message, R any] interface {
	// Select chooses an ActorRef from the provided slice. It returns the
	// selected actor or an error if no actor can be selected (e.g., if the
	// list is empty or another strategy-specific issue occurs).
	Select(refs []ActorRef[M, R]) (ActorRef[M, R], error)
}

RoutingStrategy defines the interface for selecting an actor from a list of available actors. The M (Message) and R (Response) type parameters ensure that the strategy is compatible with the types of actors it will be selecting.

type ServiceKey

type ServiceKey[M Message, R any] struct {
	// contains filtered or unexported fields
}

ServiceKey is a type-safe identifier used for registering and discovering actors via the Receptionist. The generic type parameters M (Message) and R (Response) ensure that only actors handling compatible message/response types are associated with and retrieved for this key.

func NewServiceKey

func NewServiceKey[M Message, R any](name string) ServiceKey[M, R]

NewServiceKey creates a new service key with the given name. The name is used as the lookup key within the Receptionist.

func (ServiceKey[M, R]) Spawn

func (sk ServiceKey[M, R]) Spawn(as *ActorSystem, id string,
	behavior ActorBehavior[M, R]) ActorRef[M, R]

Spawn registers an actor for this service key within the given ActorSystem. It's a convenience method that calls RegisterWithSystem, starting the actor and registering it with the receptionist.

func (ServiceKey[M, R]) Unregister

func (sk ServiceKey[M, R]) Unregister(as *ActorSystem,
	refToRemove ActorRef[M, R]) bool

Unregister removes an actor reference associated with this service key from the ActorSystem's receptionist and also stops the actor. It returns true if the actor was successfully unregistered from the receptionist AND successfully stopped and removed from the system's management. Otherwise, it returns false.

func (ServiceKey[M, R]) UnregisterAll

func (sk ServiceKey[M, R]) UnregisterAll(as *ActorSystem) int

UnregisterAll finds all actor references associated with this service key in the ActorSystem's receptionist. For each found actor, it attempts to stop it and remove it from system management, and also unregisters it from the receptionist.

type SystemConfig

type SystemConfig struct {
	// MailboxCapacity is the default capacity for actor mailboxes.
	MailboxCapacity int
}

SystemConfig holds configuration parameters for the ActorSystem.

func DefaultConfig

func DefaultConfig() SystemConfig

DefaultConfig returns a default configuration for the ActorSystem.

type TellOnlyRef

type TellOnlyRef[M Message] interface {
	// Tell sends a message without waiting for a response. If the
	// context is cancelled before the message can be sent to the actor's
	// mailbox, the message may be dropped.
	Tell(ctx context.Context, msg M)

	// ID returns the unique identifier for this actor.
	ID() string
}

TellOnlyRef is a reference to an actor that only supports "tell" operations. This is useful for scenarios where only fire-and-forget message passing is needed, or to restrict capabilities.

Example

ExampleTellOnlyRef demonstrates using a TellOnlyRef for fire-and-forget messaging with an actor.

package main

import (
	"context"
	"fmt"
	"strings"
	"sync"
	"time"

	"github.com/lightningnetwork/lnd/actor"
	"github.com/lightningnetwork/lnd/fn/v2"
)

// LogMsg is a message type for the TellOnly example.
type LogMsg struct {
	actor.BaseMessage
	Text string
}

// MessageType implements actor.Message.
func (m LogMsg) MessageType() string { return "LogMsg" }

// LoggerActorBehavior is a simple actor behavior that logs messages. It doesn't
// produce a meaningful response for Ask, so it's a good candidate for TellOnly
// interactions.
type LoggerActorBehavior struct {
	mu      sync.Mutex
	logs    []string
	actorID string
}

func NewLoggerActorBehavior(id string) *LoggerActorBehavior {
	return &LoggerActorBehavior{actorID: id}
}

// Receive processes LogMsg messages by appending them to an internal log. The
// response type is 'any' as it's not typically used with Ask.
func (l *LoggerActorBehavior) Receive(ctx context.Context,
	msg actor.Message) fn.Result[any] {

	logMessage, ok := msg.(LogMsg)
	if !ok {
		return fn.Err[any](fmt.Errorf("unexpected message "+
			"type: %s", msg.MessageType()))
	}

	l.mu.Lock()
	defer l.mu.Unlock()

	entry := fmt.Sprintf("[%s from %s]: %s", time.Now().Format("15:04:05"),
		l.actorID, logMessage.Text)
	l.logs = append(l.logs, entry)

	// For Tell, the result is often ignored, but we must return something.
	return fn.Ok[any](nil)
}

func (l *LoggerActorBehavior) GetLogs() []string {
	l.mu.Lock()
	defer l.mu.Unlock()

	copiedLogs := make([]string, len(l.logs))
	copy(copiedLogs, l.logs)

	return copiedLogs
}

// ExampleTellOnlyRef demonstrates using a TellOnlyRef for fire-and-forget
// messaging with an actor.
func main() {
	system := actor.NewActorSystem()
	defer system.Shutdown()

	// The logger actor doesn't really have a response type for Ask, so we
	// use 'any'.
	loggerServiceKey := actor.NewServiceKey[actor.Message, any](
		"tell-only-logger-service",
	)

	actorID := "my-logger"
	loggerLogic := NewLoggerActorBehavior(actorID)

	// Spawn the actor.
	fullRef := loggerServiceKey.Spawn(system, actorID, loggerLogic)
	fmt.Printf("Actor %s spawned.\n", fullRef.ID())

	// Get a TellOnlyRef for the actor. We can get this from the Actor
	// instance itself if we had it, or by type assertion if we know the
	// underlying ref supports it. Since fullRef is ActorRef[actor.Message,
	// any], it already satisfies TellOnlyRef[actor.Message].
	//
	// Or, if we had the *Actor instance: tellOnlyLogger =
	// actorInstance.TellRef()
	var tellOnlyLogger actor.TellOnlyRef[actor.Message] = fullRef

	fmt.Printf("Obtained TellOnlyRef for %s.\n", tellOnlyLogger.ID())

	// Send messages using Tell.
	tellOnlyLogger.Tell(
		context.Background(), LogMsg{Text: "First log entry."},
	)
	tellOnlyLogger.Tell(
		context.Background(), LogMsg{Text: "Second log entry."},
	)

	// Allow some time for messages to be processed.
	time.Sleep(10 * time.Millisecond)

	// Retrieve logs directly from the behavior for verification in this
	// example. In a real scenario, this might not be possible or desired.
	logs := loggerLogic.GetLogs()
	fmt.Println("Logged entries:")
	for _, entry := range logs {
		// Strip the timestamp and actor ID for consistent example
		// output. Example entry: "[15:04:05 from my-logger]: Actual log
		// text"
		parts := strings.SplitN(entry, "]: ", 2)
		if len(parts) == 2 {
			fmt.Println(parts[1])
		}
	}

	// Attempting to Ask using tellOnlyLogger would be a compile-time error:
	// tellOnlyLogger.Ask(context.Background(), LogMsg{Text: "This would
	// fail"})

}
Output:

Actor my-logger spawned.
Obtained TellOnlyRef for my-logger.
Logged entries:
First log entry.
Second log entry.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL