integrations

package
v0.18.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 21, 2026 License: MIT Imports: 6 Imported by: 0

README

🦾 /pkg/integrations

cd /

[!NOTE] asyncmachine-go is a batteries-included graph control flow library (AOP, actor model, state-machine).

/pkg/integrations is responsible for exposing state machines over various JSON transports, with currently only NATS being implemented. In the future, this may include email, Kafka, or HTTP.

JSON

JSON types cover mutations, subscriptions, and data getters. Each of these is divided in request and response objects which have a (/docs/jsonschema). Their usage depends on the specific implementation, eg in NATS each machine has a dedicated subtopic for mutation requests.

import amjson "github.com/pancsta/asyncmachine-go/pkg/integration"

// create a subscription to Foo
reqSub := integrations.NewWaitingReq()
reqSub.States = am.S{"Foo"}
j, err := json.Marshal(reqSub)

NATS

NATS is a popular and high-performance messaging system made in Go. State machines are exposed under a topic, with each state machine also being subscribed to a dedicated subtopic "[topic].[machineID]" for mutation requests. Optional [queue] allows to load-balance requests across multiple subscribers.

import am "github.com/pancsta/asyncmachine-go"
import nats "github.com/pancsta/asyncmachine-go/pkg/integration/nats"

// ...

// var mach *am.Machine
// var ctx context.Context
// var nc *nats.Conn

// expose mach under mytopic
_ = nats.ExposeMachine(ctx, mach, nc, "mytopic", "")
// mutate - add Foo
res, _ := nats.Add(ctx, nc, topic, mach.Id(), am.S{"Foo"}, nil)
if res == am.Executed {
    print("Foo added to mach")
}

TODO

  • recipient matching (filters similar to the REPL ones)
  • better error handling (avoid overreporting)

Status

Alpha, work in progress, not semantically versioned.

monorepo

Go back to the monorepo root to continue reading.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	KindReqGetter    = Kind{"am_req_getter"}
	KindReqMutation  = Kind{"am_req_mutation"}
	KindReqWaiting   = Kind{"am_req_waiting"}
	KindRespGetter   = Kind{"am_resp_getter"}
	KindRespMutation = Kind{"am_resp_mutation"}
	KindRespWaiting  = Kind{"am_resp_waiting"}

	KindEnum = enum.New(KindReqGetter, KindReqMutation, KindReqWaiting,
		KindRespGetter, KindRespMutation, KindRespWaiting)
)

Functions

This section is empty.

Types

type GetterReq

type GetterReq struct {
	// The kind of the request.
	Kind Kind `json:"kind" jsonschema:"required,enum=am_req_getter"`
	// Request ticks of the passed states
	Time am.S `json:"time,omitempty"`
	// Request the sum of ticks of the passed states
	TimeSum am.S `json:"time_sum,omitempty"`
	// Request named clocks of the passed states
	Clocks am.S `json:"clocks,omitempty"`
	// Request the tags of the state machine
	Tags bool `json:"tags,omitempty"`
	// Request an importable version of the state machine
	Export bool `json:"export,omitempty"`
	// Request the ID of the state machine
	Id bool `json:"id,omitempty"`
	// Request the ID of the parent state machine
	ParentId bool `json:"parent_id,omitempty"`
}

GetterReq is a generic request, which results in GetterResp with respective fields filled out.

func NewGetterReq

func NewGetterReq() *GetterReq

NewGetterReq creates a new getter request.

type GetterResp

type GetterResp struct {
	// The kind of the response.
	Kind Kind `json:"kind" jsonschema:"required,enum=am_resp_getter"`
	// The ID of the state machine.
	MachId string `json:"mach_id,omitempty"`
	// The ticks of the passed states
	Time am.Time `json:"time,omitempty"`
	// The sum of ticks of the passed states
	TimeSum int `json:"time_sum,omitempty"`
	// The named clocks of the passed states
	Clocks am.Clock `json:"clocks,omitempty"`
	// The tags of the state machine
	Tags []string `json:"tags,omitempty"`
	// The importable version of the state machine
	Export *am.Serialized `json:"export,omitempty"`
	// The ID of the state machine
	Id string `json:"id,omitempty"`
	// The ID of the parent state machine
	ParentId string `json:"parent_id,omitempty"`
}

GetterResp is a response to GetterReq.

func HandlerGetter

func HandlerGetter(
	ctx context.Context, mach am.Api, req *GetterReq,
) (*GetterResp, error)

type Kind

type Kind enum.Member[string]

func (*Kind) MarshalJSON

func (k *Kind) MarshalJSON() ([]byte, error)

func (*Kind) UnmarshalJSON

func (k *Kind) UnmarshalJSON(b []byte) error

type MsgKindReq

type MsgKindReq struct {
	// The kind of the request.
	Kind Kind `json:"kind" jsonschema:"required,enum=am_req_getter,enum=am_req_mutation,enum=am_req_waiting"`
}

MsgKindReq is a decoding helper.

type MsgKindResp

type MsgKindResp struct {
	// The kind of the response.
	Kind Kind `json:"kind" jsonschema:"required,enum=am_resp_waiting,enum=am_resp_mutation,enum=am_resp_getter"`
}

MsgKindResp is a decoding helper.

type MutationReq

type MutationReq struct {
	// The kind of the request.
	Kind Kind `json:"kind" jsonschema:"required,enum=am_req_mutation"`
	// The states to add to the state machine.
	Add am.S `json:"add,omitempty" jsonschema:"oneof_required=add"`
	// The states to remove from the state machine.
	Remove am.S `json:"remove,omitempty" jsonschema:"oneof_required=remove"`
	// Arguments passed to transition handlers.
	Args map[string]any `json:"args,omitempty"`
}

func NewMutationReq

func NewMutationReq() *MutationReq

NewMutationReq creates a new mutation request. TODO sugar for NewAddReq and NewRemoveReq

type MutationResp

type MutationResp struct {
	// The kind of the request.
	Kind Kind `json:"kind" jsonschema:"required,enum=am_req_mutation"`
	// The result of the mutation request.
	Result am.Result `json:"result"`
}

func HandlerMutation

func HandlerMutation(
	ctx context.Context, mach am.Api, req *MutationReq,
) (*MutationResp, error)

type WaitingReq

type WaitingReq struct {
	// The kind of the request.
	Kind Kind `json:"kind" jsonschema:"required,enum=am_req_waiting"`
	// The states to wait for, the default is to all states being active simultaneously (if no time passed).
	States am.S `json:"states,omitempty" jsonschema:"oneof_required=states"`
	// The states names to wait for to be inactive. Ignores the Time field.
	StatesNot am.S `json:"states_not,omitempty" jsonschema:"oneof_required=statesNot"`
	// The specific (minimal) time to wait for.
	Time am.Time `json:"time,omitempty"`
}

func NewWaitingReq

func NewWaitingReq() *WaitingReq

NewWaitingReq creates a new waiting request.

type WaitingResp

type WaitingResp struct {
	WaitingRespUnsafe
}

func HandlerWaiting

func HandlerWaiting(
	ctx context.Context, mach am.Api, req *WaitingReq,
) (*WaitingResp, error)

func (*WaitingResp) UnmarshalJSON

func (w *WaitingResp) UnmarshalJSON(b []byte) error

type WaitingRespUnsafe

type WaitingRespUnsafe struct {
	// The kind of the response.
	Kind Kind `json:"kind" jsonschema:"required,enum=am_resp_waiting"`
	// The ID of the state machine.
	MachId string `json:"mach_id"`
	// The active states waited for. If time is empty, all these states are active simultaneously.
	States am.S `json:"states,omitempty" jsonschema:"oneof_required=states"`
	// The inactive states waited for.
	StatesNot am.S `json:"states_not,omitempty" jsonschema:"oneof_required=states"`
	// The requested machine time (the current one may be higher).
	Time am.Time `json:"time,omitempty"`
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL