actress

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 15, 2026 License: AGPL-3.0 Imports: 16 Imported by: 0

README

Actress

A Concurrent Actor framework written in Go.

NB: This is still in the idea phase, so concepts are being tested out and things might/will change rapidly. Expect breaking changes between commits.

Overview

Create custom processes where what the processes do are either your own piece of code, or it can be a command called from the Operating system. The processes can communicate by sending events to pass the result from one processes to the next for further processing, or by chaining together process as workflows to create a series of Events that together will provide some end result.

Processes

A process are like a module capable of performing a specific tasks. The nature of the process is determined by an Name and a Function attached to each process. A process have an InCh for receiving events, and an AddEvent function for sending Events. The processes can themselves spawn new processes. Processes can also send Event messages to other processes.

A process can hold state within the Process Function.

Events

To initiate and trigger the execution of the process's function, we send events. Each process has its own unique event name. Events serve as the communication within the system. They can carry data, either with the result of something a process did to pass it on to the next process for further processing, instructions for what a process should do, or both. An event can contain a chain of events to create workflows of what do do and in what order by using the NextEvent feature (see examples for usage).

type Event struct {
	Nr int
	// Name is a unique name to identify the type of the event.
	Name EventName `json:"name" yaml:"name" cbor:"name"`
	// Cmd is usually used for giving instructions or parameters for
	// what an event shall do.
	Cmd []string `json:"cmd" yaml:"cmd" cbor:"cmd"`
	// Instruction got the underlying type of string. This field can
	// be used to give for example an instruction of a single word.
	// For example in switch statements at the receiving actor, or other.
	Instruction Instruction
	// Data usually carries the data from one process to the next. Example
	// could be a file read on process1 is put in the Data field, and
	// passed on to process2 to be unmarshaled.
	Data []byte `json:"data" yaml:"data" cbor:"data"`
	// Data to be transfered internally. Example is to send config directly via
	// the channel between internal actors.
	InternalCh chan chan []byte `json:"-" yaml:"-" cbor:"-"`
	// Err is used for defining the error message when the event is used
	// as an error event.
	Err error `json:"error" yaml:"error" cbor:"error"`
	// NextEvent defines a series of events to be executed like a workflow.
	// The receiving process should check this field for what kind of event
	// to create as the next step in the workflow.
	NextEvent *Event `json:"nextEvent" yaml:"nextEvent" cbor:"nextEvent"`
	// PreviousEvent allows for keeping information about the previous event if needed.
	PreviousEvent *Event `json:"previousEvent" yaml:"previousEvent" cbor:"previousEvent"`
	// Dst node.
	DstNode Node `json:"dstNode" yaml:"dstNode" cbor:"dstNode"`
	// Src node.
	SrcNode Node `json:"srcNode" yaml:"srcNode" cbor:"srcNode"`
}
Event Functions (ETFunc)

Event Functions holds the logic for what a process shall do when an event is received, and what to do with the data the event carries. The Event functions are callback functions that are executed when a process is created.

The programmer can decide if the Process Function should depend on the input from the input channel of the process, or just continously do some work on it's own. For an event function to be triggered to work on events it should hold a for loop that listens on the Process InCh for new Events.

Examples

Check out the test files for examples for how to define an Event and it's Process function, or for more complete examples check out the examples folder.

Quick start
package main

import (
    "context"
    "fmt"
    "log"
    "strings"
    "time"  
    "github.com/postmannen/actress"
)

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    // Create a new root process.
    cfg, _ := actress.NewConfig("debug")
    rootAct := actress.NewRootProcess(ctx, nil, cfg, nil)

    // Start the root process/actor.
    err := rootAct.Act()
    if err != nil {
        log.Fatal(err)
    }

    // Create a test channel where we receive the end result.
    testCh := make(chan string)

    // Define two event's for two processes.
    const ETTest1 actress.Name = "ETTest1"
    const ETTest2 actress.Name = "ETTest2"

    // Define the first ETFunc function that will be attached to the ETTest1 Name process.
    test1Func := func(ctx context.Context, p *actress.Process) func() {
        fn := func() {
            for {
                select {
                case ev := <-p.InCh:
                    upper := strings.ToUpper(string(ev.Data))
                    // Pass on the processing to the next process, and use the NextEvent we have specified in main
                    // for the Name, and add the result of ToUpper to the data field.
                    p.AddEvent(actress.Event{Name: ev.NextEvent.Name,
                        
                        Data:      []byte(upper)})
                case <-ctx.Done():
                    return
                }
            }
        }
        return fn
    }

    // Define the second ETFunc function that will be attached to the ETTest2 Name process.
    test2Func := func(ctx context.Context, p *actress.Process) func() {
        fn := func() {
            for {
                select {
                case result := <-p.InCh:
                    dots := string(result.Data) + "..."                 
                    // Send the result on the testCh so we are able to to receive it in main().
                    testCh <- string(dots)

                    // Also create an informational error message.
                    p.AddEvent(actress.Event{Name: actress.ERDebug,
                        
                        Err:       fmt.Errorf("info: done with the acting")})

                case <-ctx.Done():
                    return
                }
            }
        }
        return fn
    }

    // Register the event names and event function as processes,
    // and start them with the Act() method.
    actress.NewProcess(ctx, rootAct, ETTest1,  test1Func).Act()
    actress.NewProcess(ctx, rootAct, ETTest2,  test2Func).Act()

    // Pass in an event destined for an ETTest1 Name process, and also specify
    // the next event to be used when passing the result on from ETTest1 to the next
    // process which here is ETTest2.
    rootAct.AddEvent(actress.Event{Name: ETTest1,
        
        Data:      []byte("test"),
        NextEvent: &actress.Event{Name: ETTest2,
            
    },
    )

    // Wait and receive the result from the ETTest2 process.
    fmt.Printf("The result: %v\n", <-testCh)

    time.Sleep(time.Second * 2)
}

Remote delivery

If the DstNode field of an event is set, the event can be sent to the remote node using the ETRemote process if an ETRemote process has been started, and a etRemoteFunc has been defined for it. If no value is set in the DstNode field, the event will be processed locally.

How this works is that when the routing logic notices that the DstNode field is set, it will create a new event of type ETRemote, and put the original even in the NextEvent field of the new ETRemote event, and the event is added to the queue with the AddEvent method of the Actress. Tip, check the NextEvent section for more information about the NextEvent field.

The ETRemote process will then receive the event, and we can take the original event that we find in the NextEvent field, use that, and send it to the remote node using the for example DstNode field as the topic.

The actress.ETRemote event type is already defined in the actress package, but no etRemoteFunc is defined for it. It is up to the programmer to define an etRemoteFunc and start the ETRemote process.

A high level overview of how registering and starting an ETRemote process works
etRemoteFunc := func(ctx context.Context, p *actress.Process) func() {
		fn := func() {
			for {
				select {
				case ev := <-p.InCh:
					// The event received here came here since an event was processed,
                    // and a value was set in the DstNode field of the event.
                    // Also, when an event is received here, the type of event is ETRemote,
                    // and the NextEvent field holds the original event that was
                    // processed when it was decided to send it to a remote node.
					//
                    // We can now take the NextEvent and choose to do what we want with the event.
                    //
                    // The DstNode field holds the name of the remote node. We can then use
                    // that as the topic if we want send the event to a remote node over MQTT.

                    // ..write some code here that will marshal the event to example JSON,
                    //  and send it via MQTT, and use the value defined
                    // in the DstNode field as the topic.
                    //
                    // NB: If for example MQTT is chosen as the communication protocol, we will
                    // also need to define an MQTT receiver Actress/Process that will be able
                    // to receive the event on the remote node.
                    
				case <-ctx.Done():
					return
				}
			}
		}
		return fn
	}

// Register the event name and event function as a process,
// and start it with the Act() method.
actress.NewProcess(ctx, rootAct, actress.ETRemote, etRemoteFunc).Act()

And then what the general MQTT actreess for the receiving side would look like.


// Define the event name for the MQTT receiver process.
const ETMQTTReceiver actress.Name = "ETMQTTReceiver"

etMQTTReceiverFunc := func(ctx context.Context, p *actress.Process) func() {
		fn := func() {
            go func() {
                // The outline of how an MQTT receiver could be implemented.
                // 1. Connect to MQTT broker.
                // 2. Subscribe to topic.
                // 3. Receive message.
                // 4. Unmarshal message, to get the actress.Event.
                // 5. Use the AddEvent() method to add the event to the queue
                //    of messages to be handled
            }()
			<-ctx.Done():
			return
		}
		return fn
	}

// Register the event name and event function as a process,
// and start it with the Act() method.
actress.NewProcess(ctx, rootAct, ETMQTTReceiver, etMQTTReceiverFunc).Act()

Details

Short intro about the Events.

The events for all processes, both static, dynamic, error, and supervisor uses the same event type and structure. The even type is identified by the firs 2 letters of the event.Name:

  • ET, static events
  • ED, dynamic event
  • ER, error events
  • EC, custom events
  • ES, supervisor events

The reason for splitting them up are for separation and use of mutex'es , for example if the event routing logic hangs on static events, it will not affect the other event kinds, so we are able to for example send errors if any of the other routers are having trouble or have massive load.

A router Actress/Process is defined for each of the event types.

Where to use an actor process of a specific kind ?

Static processes, should be used for processes/actors defined at startup. Dynamic processes, Can be used both for startup and runtime defined actors, but prefer static at startup unless you have a really good reason to not do it :). Error processes For error logging and handling. Supervisor processes For control logic and information about the whole Actress system.

NextEvent

NextEvent makes it possible to define an event as a chain of Events. An example could be that we want to get the content of a web page, and print the result to the screen. We could do that in the following way.

p.AddEvent(Event{Name: Name("ETBleeping"), NextEvent: &Event{Name: ETPrint}})

Dynamic Processes

The purpose of dynamic processes is to have short lived processes that can be quickly started, and removed again when it's job is done. The only difference between a Static process and a Dynamic process are that the dynamic processes have a mutex in the DynamicProcesses map so that we can delete the processes when they are no longer needed at runtime withhout causing a datarace.

A typical example could be that there is a processes that needs to communicate in some other way with another process that cant be done with the current process's event channel. We can then spawn a dynamic process to take care of that. Check out the test and files in the examples directory. A process can spawn as many dynamic processes as it needs.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CheckEnv

func CheckEnv[T any](key string, v T) any

Check if an env variable is set. If found, return the value. Takes the name of the env variable, and the actual variable containing a default value as it's input.

func ETReadFileFn

func ETReadFileFn(ctx context.Context, p *Process) func()

func NewUUID

func NewUUID() string

Will create and return a new UUID prefix with "ED-".

func RegisterProcessesInESProcesses

func RegisterProcessesInESProcesses(p *Process, pi *registerProcessInfo)

Register all the processes in ESProcesses.

Types

type Buffer

type Buffer struct {
	// contains filtered or unexported fields
}

func NewBuffer

func NewBuffer() *Buffer

func (*Buffer) Read

func (bu *Buffer) Read(p []byte) (int, error)

func (*Buffer) Write

func (bu *Buffer) Write(b []byte) (int, error)

type Config

type Config struct {
	CustomEvents     bool
	Metrics          bool
	CustomEventsPath string
	NodeName         Node
	LogLevel         string
}

Config holds all the configuration settings for the actress system.

func NewConfig

func NewConfig(logLevel string) (*Config, *flag.FlagSet)

New config prepare a *Config, and a *flag.FlagSet, and return the resulting actress *Config and *flag.FlagSet. The flags are checked for env variables, and if not found, the default value is used. The flagset needs to be parsed for the flags to be set.

The logLevel is the default log level for the system, and can be provided as an input argument.

type ESProcessesMap

type ESProcessesMap map[EventName]string

type ETFunc

type ETFunc func(context.Context, *Process) func()

Function type describing the signature of a function that is to be used when creating a new process.

func EDSyncFn

func EDSyncFn(syncCh chan struct{}) ETFunc

EtSyncFn is the function that will be used to syncronize events. It takes a channel that will be used to send a signal on when the EDSync event is executed.

In general EDSync is used to syncronize one-off events, so delete the process after it is done with it's sync job.

func ETTestfn

func ETTestfn(testCh chan string) ETFunc

etTestFn accepts an 'chan string' as it's input argument, and it will return the data field of the previous event on that channel. You can then listen on that channel, check the value delivered, and see if it contains the value you expected it to hold.

type Event

type Event struct {
	Nr int
	// Name is a unique name to identify the type of the event.
	Name EventName `json:"name" yaml:"name" cbor:"name"`
	// Cmd is usually used for giving instructions or parameters for
	// what an event shall do.
	Cmd []string `json:"cmd" yaml:"cmd" cbor:"cmd"`
	// Instruction got the underlying type of string. This field can
	// be used to give for example an instruction of a single word.
	// For example in switch statements at the receiving actor, or other.
	Instruction Instruction
	// Data usually carries the data from one process to the next. Example
	// could be a file read on process1 is put in the Data field, and
	// passed on to process2 to be unmarshaled.
	Data []byte `json:"data" yaml:"data" cbor:"data"`
	// Data to be transfered internally. Example is to send config directly via
	// the channel between internal actors.
	InternalCh chan chan []byte `json:"-" yaml:"-" cbor:"-"`
	// Err is used for defining the error message when the event is used
	// as an error event.
	Err error `json:"error" yaml:"error" cbor:"error"`
	// NextEvent defines a series of events to be executed like a workflow.
	// The receiving process should check this field for what kind of event
	// to create as the next step in the workflow.
	NextEvent *Event `json:"nextEvent" yaml:"nextEvent" cbor:"nextEvent"`
	// PreviousEvent allows for keeping information about the previous event if needed.
	PreviousEvent *Event `json:"previousEvent" yaml:"previousEvent" cbor:"previousEvent"`
	// Dst node.
	DstNode Node `json:"dstNode" yaml:"dstNode" cbor:"dstNode"`
	// Src node.
	SrcNode Node `json:"srcNode" yaml:"srcNode" cbor:"srcNode"`
}

Event defines an event. It holds:

  • The Name, which specifies the process are meant for.
  • The Cmd, are meant to but not limited to be a way to give instructions for what a process should do. The receiving process are responsible for parsing the string slice into something useful.
  • The Data field are ment to carry the result from the work done by a process, to the next process.
  • Both Cmd and Data can be used interchangeably if it makes more sense for a given scenario. No strict rules for this exist. Just make sure to document the use of the given Name, so the structure of how to use the fields exist.
  • Err, are used by the error event type (ER).
  • NextEvent are used when we want to define a chain of events to be executed. The processes must make use of the field for this to work. Check out the examples folder for a simple example for how it could be implemented.

func CopyEventFields

func CopyEventFields(ev *Event) *Event

Copy all the descriptive meta data fields of the Event, not channels or Data.

func NewEvent

func NewEvent(et EventName, opts ...EventOpt) *Event

type EventName

type EventName string

Name is a unique name used to identify events. It is used both for creating processes and also for routing messages to the correct process.

const ECGeneralDelivery EventName = "ECGeneralDelivery"

Primarily used for testing to check that the ECRouter properly routes events, and that custom processes start up correctly.

const ECRouter EventName = "ECRouter"

Router for custom events.

const EDRouter EventName = "EDRouter"

Router for normal events.

const EDSync EventName = "EDSync"

EDSync is used to syncronize events. The EDSyncFn that is to be used with this event type takes a signal channel, we can then use this event type to signal that another event is done before we continue by setting this event type as the NextEvent.

const ERLog EventName = "ERLog"

Log errors.

const ERNone EventName = "ERNone"

Will drop the event if it is an error event.

const ERRouter EventName = "ERRouter"

Router for error events.

const ERTest EventName = "ERTest"

Log and exit system.

const ESProcesses EventName = "ESProcesses"

Handles information about the currently running processes in the local Actress system.

const ESRouter EventName = "ESRouter"

Router for supervisor events.

const ETDone EventName = "ETDone"

Done don't currently do anything.

const ETExit EventName = "ETExit"

Will exit and kill all processes.

const ETOsSignal EventName = "ETOsSignal"

Press ctrl+c to exit.

const ETPid EventName = "ETPid"

Handling pids within the system. The structure of the ev.Cmd is a slice of string: []string{"action","pid","process name"}

const ETPidGetAll EventName = "ETPidGetAll"

Get all the current processes running. Will return a json encoded PidVsProcMap.

const ETPrint EventName = "ETPrint"

Print the content of the .Data field of the event to stdout.

const ETReadFile EventName = "ETReadFile"

Read file. The path path to read should be in Event.Cmd[0].

const ETRemote EventName = "ETRemote"

ETRemote is an Name that will be used if an event should be delivered to a remote node.

There are no ETFunc defined for ETRemote in Actress, so it is up to the user to write this function, and attach their own ETFunc when they create the process to handle the ETRemote Name.

ETRemote are for example used in the AddEvent function, and will be prepended to the current event if it should not be handled locally.

const ETRoot EventName = "ETRoot"

The main Root process. By default the root process don't have an ETFunc registered with it to handle the ETRoot eventtype, but one can be created with the normal ETFunc function signature, and defined when creating a new root process.

const ETRouter EventName = "ETRouter"

Router for normal events.

const ETTest EventName = "ETTest"

The ETTest eventype are used for testing.

const ETTestCh EventName = "ETTestCh"

Will forward the incomming event to the builtin .TestCh of the process.

type EventOpt

type EventOpt func(*Event)

func EVData

func EVData(b []byte) EventOpt

func EvCmd

func EvCmd(cmd []string) EventOpt

func EvNext

func EvNext(nev *Event) EventOpt

type EventRW

type EventRW struct {
	P    *Process
	Ev   *Event
	Info string
	Pos  int
}

func NewEventRW

func NewEventRW(p *Process, ev *Event, info string) *EventRW

NewEventRW will return a type that adds Read and Write methods to the Event type.

func (*EventRW) Read

func (m *EventRW) Read(b []byte) (int, error)

Read the data into b.

func (*EventRW) Write

func (m *EventRW) Write(b []byte) (int, error)

Write the data into Event.Data, and put the event into the StaticEventCh to be processed.

type Instruction

type Instruction string
const InstructionCmdEOF Instruction = "InstructionCmdEOF"
const InstructionDebug Instruction = "InstructionDebug"
const InstructionESProcessesAdd Instruction = "InstructionESProcessesAdd"

Will instruct to get all information about all processes.

const InstructionESProcessesDelete Instruction = "InstructionESProcessesDelete"
const InstructionESProcessesGetAll Instruction = "InstructionESProcessesGetAll"
const InstructionError Instruction = "InstructionError"

Instructions for error logging.

const InstructionFatal Instruction = "InstructionFatal"
const InstructionInfo Instruction = "InstructionInfo"

type Node

type Node string

type PidVsProcMap

type PidVsProcMap map[pidnr]*Process

type Process

type Process struct {

	// Channel to receive events into the process function.
	InCh chan Event `json:"-"`
	// Channel to send events to be picked up by other processes.
	StaticEventCh chan Event `json:"-"`
	// Channel to send error events.
	ErrorEventCh chan Event `json:"-"`
	// Channel for getting the result in tests.
	TestCh chan Event `json:"-"`
	// Channel to use for routing events for dynamic processes.
	DynamicEventCh chan Event `json:"-"`
	// Channel to use for routing events for custom processes.
	CustomEventCh chan Event `json:"-"`
	// Channel to use for routing supervisor events
	SupervisorEventCh chan Event `json:"-"`
	// The event type for the process.
	Event EventName
	// Maps for various staticProcess information.
	// NB: Added a Mutex on this structure, though it should really not be needed,
	//	since there is only reads from the static procMap. Decide later if we should
	//	remove it again.
	StaticProcesses *staticProcesses
	// Map of dynamic processes
	DynamicProcesses *dynamicProcesses
	// Map of custom processes
	CustomProcesses *customProcesses
	// Maps for various errProcess information
	ErrorProcesses *errorProcesses

	// The main counter used for assigning Nr to events.
	EventNr *eventNr

	// Holding all configuration settings.
	Config *Config

	// PID of the process
	PID pidnr
	// The context of the process
	Ctx context.Context `json:"-"`
	// Cancel func for context of the process
	Cancel context.CancelFunc `json:"-"`
	// contains filtered or unexported fields
}

Process defines a process.

func NewProcess

func NewProcess(ctx context.Context, parentP *Process, event EventName, fn ETFunc) *Process

NewProcess will prepare and return a *Process. It will copy channels and map structures from the root process.

func NewRootProcess

func NewRootProcess(ctx context.Context, fn ETFunc, conf *Config) *Process

NewRootProcess will prepare and return the root process which holds all the core elements needed, like the main channels for events and errors, and varouis registers or maps holding information about the system. Later created processes will reference these elements when they are created. The root process will also start up all the essential other processes needed, like the event router, and various standard error handling processes.

func (*Process) Act

func (p *Process) Act() error

Will start the ETFunc attached to the process.

If no ETFunc is defined for the process will just return after calling this function. The process can still be used and we can communicate with it via it's channels.

func (*Process) AddEvent

func (p *Process) AddEvent(event Event)

AddEvent will deliver the event to the correct router based on the specified Kind of the Event. If the Kind are missing the event will be handled as a static event. If the event is to be delivered to a remote node, AddEvent will also take care of that and ship the event off to the ETRemote process.

func (*Process) CurrentEventNr

func (p *Process) CurrentEventNr() int

CurrentEventNr returns the current value of the shared event counter without modifying it.

func (*Process) IncrementEventNr

func (p *Process) IncrementEventNr() int

IncrementEventNr atomically increments the shared event counter and returns the new value.

func (*Process) SignalReady

func (p *Process) SignalReady()

Signal that the function is ready.

func (*Process) Stop

func (p *Process) Stop() error

Will Cancel the context attached to the process. Will delete the process from the processes map. Will delete the pid from the pids map.

func (*Process) WaitForReady

func (p *Process) WaitForReady()

Wait for the process function to be ready and started for the specific process.

Directories

Path Synopsis
examples
2actresses command
childprocess command
httpget command
proxy command
reader-writer command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL