Documentation
¶
Index ¶
- Constants
- func ApplyDeclaration(s *Scope, nodeID string, od workflow.OutputDeclaration, val expr.Value) error
- func ApplyOutput(s *Scope, nodeID, slotID string, binding workflow.OutputBinding, ...) error
- func FilterEmitted(raw map[string]workflow.DataType, bindings map[string]workflow.OutputBinding) map[string]workflow.DataType
- func HeartbeatLoop(ctx context.Context, lc Supervisor, address string, cfg RetryConfig)
- func RegisterNodeOutputs(scp *Scope, em Emitter)
- func RegisterWithRetry(ctx context.Context, lc Supervisor, reg AgentRegistration, cfg RetryConfig)
- type ADCConfig
- type AgentRegistration
- type AgentStatus
- type BranchingNode
- type BuildFunc
- type DACConfig
- type DeploymentMapping
- type DeviceManifest
- type Emitter
- type Engine
- type Event
- type Executable
- type ExternalResources
- type Function
- type GPIOConfig
- type HasSetup
- type LLMProviderConfig
- type LinearNode
- type LlmClient
- type MQTTConnection
- type MQTTWill
- type MemorySync
- type MissingFieldError
- type PWMConfig
- type RAGQueryParams
- type RAGQueryResult
- type ResourceBinding
- type Retriever
- type RetryConfig
- type Runner
- type Scope
- type SerialConfig
- type Supervisor
- type ToolNode
- type ToolProvider
- type Transition
- type TransportRegistry
- type Trigger
- type TriggerNode
- type Wirable
Constants ¶
const ( PortCtrl = "ctrl" PortTrue = "true" PortFalse = "false" )
const ( // SrcDeclared is the source ID for user-declared variables. SrcDeclared = "declared" // SrcFnArg is the source ID reserved for function-argument references. // References with this source ID resolve against a function Call's arguments rather than against any scope variable. SrcFnArg = "fnarg" )
Well-known source IDs used in context variable lookups and expression references.
const StateIdle = ""
const SubBufSize = 64
SubBufSize is the buffer size used in subscription channels. Events are dropped when this buffer size is exceeded.
Variables ¶
This section is empty.
Functions ¶
func ApplyDeclaration ¶
ApplyDeclaration stores a value into the scope according to the declaration mode.
func ApplyOutput ¶
func ApplyOutput(s *Scope, nodeID, slotID string, binding workflow.OutputBinding, val expr.Value) error
ApplyOutput stores a value into the scope according to the binding mode.
func FilterEmitted ¶
func FilterEmitted(raw map[string]workflow.DataType, bindings map[string]workflow.OutputBinding) map[string]workflow.DataType
FilterEmitted filters raw declared output slots to only those whose binding is emit-mode (or unbound, which defaults to emit). Used by EmitsVariables implementations to produce the seedable slot map.
func HeartbeatLoop ¶
func HeartbeatLoop(ctx context.Context, lc Supervisor, address string, cfg RetryConfig)
HeartbeatLoop ticks cfg.Interval and posts one heartbeat per tick. Returns when ctx is canceled; failed ticks log at warn and continue.
func RegisterNodeOutputs ¶
RegisterNodeOutputs declares zero values for an emitter node's outputs into the scope.
func RegisterWithRetry ¶
func RegisterWithRetry(ctx context.Context, lc Supervisor, reg AgentRegistration, cfg RetryConfig)
RegisterWithRetry calls lc.Register until success or ctx cancellation. Each attempt runs under its own cfg.AttemptTimeout.
Types ¶
type AgentRegistration ¶
type AgentRegistration struct {
Address string
Status AgentStatus
Manifest *DeviceManifest
Error *string
}
AgentRegistration is the per-boot state passed to Supervisor.Register.
type AgentStatus ¶
type AgentStatus string
AgentStatus is the boot outcome reported through Supervisor.Register.
const ( StatusOnline AgentStatus = "online" StatusBootError AgentStatus = "booterror" )
type BranchingNode ¶
type BranchingNode struct {
// contains filtered or unexported fields
}
BranchingNode is a base for a node that can decide between multiple state transitions per port (e.g. LLM agent)
func NewBranchingNode ¶
func NewBranchingNode(id string) BranchingNode
NewBranchingNode creates a new BranchingNode
func (*BranchingNode) AddTransition ¶
func (b *BranchingNode) AddTransition(port string, tr Transition) error
func (*BranchingNode) ID ¶
func (b *BranchingNode) ID() string
func (*BranchingNode) Transitions ¶
func (b *BranchingNode) Transitions(port string) []Transition
type BuildFunc ¶
type BuildFunc func(ctx context.Context, wf *workflow.Workflow, dm DeploymentMapping, ext *ExternalResources) (*Runner, error)
BuildFunc builds a Runner from a binding-free workflow, the deploy mapping that binds its resources, and the resolved external-resource configs. Injected at Engine construction to avoid import cycle
type DeploymentMapping ¶
type DeploymentMapping map[string]ResourceBinding
DeploymentMapping binds a binding-free workflow's logical resource ids to concrete platform resources for one deploy, keyed by workflow resource id. Mirrors the engineapi wire shape.
type DeviceManifest ¶
type DeviceManifest struct {
GPIOs map[string]GPIOConfig `json:"gpios,omitempty"`
ADCs map[string]ADCConfig `json:"adcs,omitempty"`
DACs map[string]DACConfig `json:"dacs,omitempty"`
Serials map[string]SerialConfig `json:"serials,omitempty"`
PWMs map[string]PWMConfig `json:"pwms,omitempty"`
}
DeviceManifest is the hardware the engine opens drivers for, keyed by driver instance ID. JSON tags match the fh-backend wire shape.
type Emitter ¶
type Emitter interface {
Wirable
// Outputs returns the output a node does actually emit (bindingMode = emit)
Outputs() map[string]workflow.DataType
}
Emitter marks nodes that can emit variables to a scope
type Engine ¶
type Engine struct {
Secret string // shared with backend; used as Authorization bearer for /deploy + /stop
Builder BuildFunc // constructs a Runner in /deploy from a workflow + network manifest
// contains filtered or unexported fields
}
Engine is the long-lived host for one workflow Runner. It owns runner lifecycle (start/stop/swap on /deploy and /stop) and the HTTP surface that drives that lifecycle.
func (*Engine) Deploy ¶
func (e *Engine) Deploy(wf *workflow.Workflow, dm DeploymentMapping, ext *ExternalResources) error
Deploy stops any running workflow and starts the new one.
type Event ¶
type Event struct {
TargetState string // Node ID to transition to
Apply func(*Scope) // Optional function to apply event data into the runner's scope
}
Event is produced by a Trigger and consumed by the runner's state loop.
type Executable ¶
type Executable interface {
Wirable
Execute(ctx context.Context, scope *Scope) (nextState string, err error)
}
Executable is implemented by action nodes that run on the state-runner goroutine.
type ExternalResources ¶
type ExternalResources struct {
MQTTs map[string]MQTTConnection
Providers map[string]LLMProviderConfig
}
ExternalResources holds the resolved, deploy-delivered configs for a workflow's non-device external resources, keyed by the platform resource id the DeploymentMapping points at. The engine builds transports from MQTTs and per-deploy LLM providers from Providers (the connection for each declared custom/self-hosted model).
type Function ¶
type Function struct {
Info workflow.FunctionInfo
DeclaredVars []workflow.Variable // function-local declared variables to seed into the function scope at call time
InitialState string // entry node id (from OnFunctionCall's outgoing edge)
Actions map[string]Executable // action nodes, keyed by node id
OutputAssignments map[string]workflow.Expression // return uid → expression evaluated in callee scope at end
}
Function is a compiled, callable sub-workflow. Synchronous, no triggers.
type GPIOConfig ¶
type GPIOConfig struct {
Chip string `json:"chip"`
}
type LLMProviderConfig ¶ added in v1.0.2
LLMProviderConfig is the resolved connection to a self-hosted/custom LLM endpoint the llmproxy doesn't ship. The declared workflow model supplies the id and capabilities; this supplies how to reach it. Model is the optional upstream model name the endpoint serves (defaults to the workflow model id).
type LinearNode ¶
type LinearNode struct {
// contains filtered or unexported fields
}
LinearNode is embedded by nodes with at most one target per port
func NewLinearNode ¶
func NewLinearNode(id string) LinearNode
NewLinearNode creates a new LinearNode
func (*LinearNode) AddTransition ¶
func (b *LinearNode) AddTransition(port string, tr Transition) error
func (*LinearNode) ID ¶
func (b *LinearNode) ID() string
func (*LinearNode) Next ¶
func (b *LinearNode) Next(port string, scope *Scope) (string, error)
Next applies the outgoing transition's side effects (e.g. AgentTask prompt evaluation) against the scope and returns the target node ID. Returns StateIdle with a nil error when no transition is wired to the port.
type LlmClient ¶
type LlmClient interface {
Chat(ctx context.Context, req *llmproxy.ChatRequest) (*llmproxy.ChatResponse, error)
}
LlmClient is the external service for language model calls.
type MQTTConnection ¶
type MQTTConnection struct {
BrokerURL string `json:"brokerUrl"`
ClientID string `json:"clientId,omitempty"`
Username string `json:"username,omitempty"`
Password string `json:"password,omitempty"`
PublishPrefix string `json:"publishPrefix,omitempty"`
SubscribePrefix string `json:"subscribePrefix,omitempty"`
Will *MQTTWill `json:"will,omitempty"`
}
type MemorySync ¶ added in v1.0.2
type MemorySync interface {
Hydrate(ctx context.Context) ([]workflow.MemoryFile, error)
Push(ctx context.Context, uid, content string) error
}
MemorySync is the OPTIONAL remote mirror for agent memory. The Manager owns local filesystem persistence unconditionally; when a MemorySync is configured it hydrates from the mirror on a cold start (empty local copy) and pushes every local write back. nil → local-only: no hydration, no mirroring. fh-backend adapter: HTTP. Push is best-effort — a mirror failure must not fail the agent's local write.
type MissingFieldError ¶
MissingFieldError signals a required workflow field was absent at build time.
func (*MissingFieldError) Error ¶
func (e *MissingFieldError) Error() string
type RAGQueryParams ¶
RAGQueryParams is a similarity-search request issued through a Retriever.
type RAGQueryResult ¶
RAGQueryResult is one ranked chunk returned by a Retriever.
type ResourceBinding ¶
ResourceBinding is how one workflow resource binds to the environment. Ref is the shared platform resource it points at (driver instance id in the boot DeviceManifest, or external resource id in ExternalResources); the engine picks the pool by the workflow resource's type. Index is the optional per-channel physical sub-address within that resource (GPIO line, or ADC/PWM/ DAC channel number); nil for UART/MQTT/memory/model.
type Retriever ¶
type Retriever interface {
QueryRAG(ctx context.Context, params RAGQueryParams) ([]RAGQueryResult, error)
}
Retriever is the external service for retrieval-augmented generation.
type RetryConfig ¶
RetryConfig tunes RegisterWithRetry and HeartbeatLoop. Interval is the wait between Register retries and the tick cadence for Heartbeat.
type Runner ¶
type Runner struct {
Scope *Scope
Nodes map[string]Executable
Triggers map[string]Trigger
InitialState string
Transports TransportRegistry // released by Run's defer chain on ctx cancellation
}
func (*Runner) Run ¶
Run starts all trigger goroutines and the state-runner loop. One iteration = one node execution or one event consumed. Runs indefinitely until ctx is cancelled by the caller. On exit (ctx cancellation), every owned resource is released — triggers via their individual Close, transports via Registry.CloseAll. The single ctx is the only lifecycle handle the caller needs.
type Scope ¶
type Scope struct {
Vars map[string]expr.Value // Holds variable sof all sources, separated by srcID
Conversation llmproxy.InputItems
// contains filtered or unexported fields
}
Scope holds runtime variables and conversation state for workflow execution. Scope is single-threaded: after Setup, only the state-runner modifies scope. Cross-goroutine delivery to trigger goroutines happens through the channels Subscribe() returns
func NewFunctionScope ¶
NewFunctionScope creates an isolated scope for function execution, with the given arguments pre-seeded under SrcFnArg and initialized declared variables.
func NewMainScope ¶
NewMainScope creates a scope for the main workflow, initialized with the given declared variables.
func (*Scope) GetConversation ¶
func (s *Scope) GetConversation() llmproxy.InputItems
func (*Scope) SetConversation ¶
type SerialConfig ¶
type Supervisor ¶ added in v1.0.2
type Supervisor interface {
Register(ctx context.Context, reg AgentRegistration) error
Heartbeat(ctx context.Context, address string) error
}
Supervisor is the external receiver of lifecycle events from the engine.
type ToolNode ¶
type ToolNode struct {
// contains filtered or unexported fields
}
ToolNode is embedded by tool-only nodes that never participate in the state machine
func (*ToolNode) AddTransition ¶
func (b *ToolNode) AddTransition(port string, _ Transition) error
type ToolProvider ¶
type ToolProvider interface {
Tools() ([]llmproxy.FunctionTool, error)
}
ToolProvider marks nodes that can be exposed as LLM tools to agent nodes. A single ToolProvider may contribute one or more LLM-callable function tools. Descriptions live on the implementing node; either hardcoded, or carried as a node argument.
type Transition ¶
type Transition struct {
TargetID string
EdgeType workflow.EdgeType
Prompt *workflow.Expression
Description *string
}
Transition carries the metadata needed by a branching node to describe one of its possible outgoing transitions to an LLM.
func (Transition) Apply ¶
func (tr Transition) Apply(scope *Scope) error
Apply runs the edge-type-specific side effect against the scope before the state machine moves on.
type TransportRegistry ¶
type TransportRegistry interface {
CloseAll() error
}
Runner is the workflow graph interpreter. One Runner executes one workflow and owns the per-deploy transport connections it was built against. Construct via build/ package. Run releases every owned resource via its defer chain on ctx cancellation TransportRegistry is the per-deploy set of transports the Runner owns and releases on shutdown. *transport.Registry satisfies it; kept as an interface so package engine does not import package transport (which would cycle now that transport depends on engine domain types).
type Trigger ¶
type Trigger interface {
Wirable
// Wait blocks until the trigger fires or ctx cancels, then returns the
// event or error.
Wait(ctx context.Context) (Event, error)
// Close releases resources on shutdown. Runner calls this even if Setup
// failed partway, so implementations should guard nil fields.
Close() error
}
Trigger is the contract for nodes that produce events from their own goroutine. The runner constructs one goroutine per Trigger and drives the lifecycle.
type TriggerNode ¶
type TriggerNode struct {
// contains filtered or unexported fields
}
TriggerNode is the common embed for every trigger
func NewTriggerNode ¶
func NewTriggerNode(id string) TriggerNode
NewTriggerNode creates a new TriggerNode
func (*TriggerNode) AddTransition ¶
func (b *TriggerNode) AddTransition(_ string, tr Transition) error
func (*TriggerNode) ID ¶
func (b *TriggerNode) ID() string
func (*TriggerNode) Target ¶
func (b *TriggerNode) Target() string
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
Package backend is the engine-side HTTP client for everything the engine needs from fh-backend: agent registration, log ingestion, LLM chat, RAG queries.
|
Package backend is the engine-side HTTP client for everything the engine needs from fh-backend: agent registration, log ingestion, LLM chat, RAG queries. |
|
internal/httpclient
Package httpclient is a minimal JSON HTTP client vendored into the engine so the fh-backend capability implementation has no dependency on the closed fh-backend module.
|
Package httpclient is a minimal JSON HTTP client vendored into the engine so the fh-backend capability implementation has no dependency on the closed fh-backend module. |
|
Package channel defines the engine's workflow-level handles to external resources — hardware drivers (GPIO, ADC, UART, ...) and network protocols (MQTT, future HTTP).
|
Package channel defines the engine's workflow-level handles to external resources — hardware drivers (GPIO, ADC, UART, ...) and network protocols (MQTT, future HTTP). |
|
Package driver is the OS-level abstraction for I/O resources.
|
Package driver is the OS-level abstraction for I/O resources. |
|
Package memory manages the engine's local copy of an agent's declared memory files.
|
Package memory manages the engine's local copy of an agent's declared memory files. |
|
Package rag provides a local implementation to satisfy Retriever interface of the engine.
|
Package rag provides a local implementation to satisfy Retriever interface of the engine. |
|
Package transport is the protocol-level abstraction for network resources the engine talks to.
|
Package transport is the protocol-level abstraction for network resources the engine talks to. |
|
Package websearch hosts the web search provider abstraction used by the engine's WebSearchTool node.
|
Package websearch hosts the web search provider abstraction used by the engine's WebSearchTool node. |