network

package
v0.0.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 4, 2026 License: MIT Imports: 31 Imported by: 0

Documentation

Overview

Package network defines the AGH Network v0 protocol surface shared by the transport, router, and delivery layers.

Index

Constants

View Source
const (
	// AuditDirectionSent records a successful outbound publish.
	AuditDirectionSent = "sent"
	// AuditDirectionReceived records an accepted inbound delivery.
	AuditDirectionReceived = "received"
	// AuditDirectionRejected records a rejected envelope.
	AuditDirectionRejected = "rejected"
	// AuditDirectionDelivered records a completed local delivery.
	AuditDirectionDelivered = "delivered"
)
View Source
const (
	// StatusDisabled reports that the network runtime is intentionally disabled.
	StatusDisabled = "disabled"
	// StatusRunning reports a connected network runtime.
	StatusRunning = "running"
	// StatusDisconnected reports a network runtime whose transport lost its connection.
	StatusDisconnected = "disconnected"
)
View Source
const DefaultMaxReplayAge = 5 * time.Minute

DefaultMaxReplayAge is the RFC-recommended maximum receiver replay age when `expires_at` is not present.

View Source
const ProtocolV0 = "agh-network/v0"

ProtocolV0 is the workspace-qualified wire protocol identifier.

Variables

View Source
var (
	// ErrWorkNotFound reports that no current work matched the
	// lifecycle message being applied.
	ErrWorkNotFound = errors.New("network: work not found")
	// ErrWorkActorNotAllowed reports a lifecycle actor outside the
	// initiator/target pair.
	ErrWorkActorNotAllowed = errors.New("network: work actor not allowed")
	// ErrInvalidStateTransition reports an impossible lifecycle transition.
	ErrInvalidStateTransition = errors.New("network: invalid work state transition")
	// ErrWorkContainerMismatch reports that a work_id was continued from a
	// different conversation container than the one that opened it.
	ErrWorkContainerMismatch = errors.New("network: work container mismatch")
	// ErrWorkClosed reports a message for a terminal work that must
	// be rejected instead of reopening the work.
	ErrWorkClosed = errors.New("network: work closed")
)
View Source
var (
	// ErrLocalPeerNotFound reports an unknown local session sender.
	ErrLocalPeerNotFound = errors.New("network: local peer not found")
	// ErrTargetPeerNotFound reports a directed send target missing from presence.
	ErrTargetPeerNotFound = errors.New("network: target peer not found")
	// ErrDuplicateEnvelope reports a replay-window duplicate.
	ErrDuplicateEnvelope = errors.New("network: duplicate envelope")
	// ErrEnvelopeNotTarget reports a directed envelope for a peer this daemon does not own.
	ErrEnvelopeNotTarget = errors.New("network: envelope not targeted to a local peer")
)
View Source
var (
	// ErrTaskIngressUnavailable reports that the network runtime was not wired
	// with a task service.
	ErrTaskIngressUnavailable = errors.New("network: task ingress is not configured")
	// ErrTaskIngressPeerNotFound reports that the supplied peer is not currently
	// authenticated in the requested channel.
	ErrTaskIngressPeerNotFound = errors.New("network: task ingress peer not found")
	// ErrTaskIngressCapabilityDenied reports that the peer lacks the capability
	// needed for task ingress.
	ErrTaskIngressCapabilityDenied = errors.New("network: task ingress capability denied")
	// ErrTaskChannelMismatch reports a request whose bound or requested task
	// channel does not match the authenticated ingress channel.
	ErrTaskChannelMismatch = errors.New("network: task channel mismatch")
	// ErrTaskChannelStale reports a stored task binding that no longer validates
	// under the current channel grammar.
	ErrTaskChannelStale = errors.New("network: stale task channel")
)
View Source
var (
	// ErrInvalidEnvelope reports a structurally invalid envelope.
	ErrInvalidEnvelope = errors.New("network: invalid envelope")
	// ErrMissingField reports a required protocol field is absent.
	ErrMissingField = errors.New("network: missing field")
	// ErrInvalidField reports a present field violates protocol rules.
	ErrInvalidField = errors.New("network: invalid field")
	// ErrInvalidKind reports an unknown or unsupported message kind.
	ErrInvalidKind = errors.New("network: invalid kind")
	// ErrInvalidBody reports a malformed or invalid kind-specific body.
	ErrInvalidBody = errors.New("network: invalid body")
	// ErrEnvelopeTooLarge reports an envelope exceeding the protocol size limit.
	ErrEnvelopeTooLarge = errors.New("network: envelope too large")
	// ErrExpired reports an envelope that is already expired.
	ErrExpired = errors.New("network: expired")
	// ErrReplayTooOld reports an envelope outside the receiver replay window.
	ErrReplayTooOld = errors.New("network: replay window exceeded")
	// ErrVerificationFailed reports a syntactically valid envelope whose
	// integrity checks failed.
	ErrVerificationFailed = errors.New("network: verification failed")
	// ErrLegacyFieldRejected reports an obsolete hard-cut wire field.
	ErrLegacyFieldRejected = errors.New("network: legacy field rejected")
	// ErrDirectRoomCollision reports that a direct_id is bound to a different
	// channel peer pair than its deterministic identity permits.
	ErrDirectRoomCollision = errors.New("network: direct room collision")
)

Functions

func BroadcastSubject

func BroadcastSubject(workspaceID string, channel string) (string, error)

BroadcastSubject builds the workspace-qualified broadcast subject for one channel.

func DirectRoomIdentity

func DirectRoomIdentity(
	workspaceID string,
	channel string,
	localPeer string,
	remotePeer string,
) (string, string, string, error)

DirectRoomIdentity derives the stable two-party direct room identity scoped to one workspace channel.

func DirectSubject

func DirectSubject(workspaceID string, channel string, peerID string) (string, error)

DirectSubject builds the workspace-qualified direct subject for one target peer.

func IsTerminalState

func IsTerminalState(state WorkState) bool

IsTerminalState reports whether the state is terminal under the RFC.

func NormalizeAuditEntry

func NormalizeAuditEntry(
	sessionID string,
	direction string,
	envelope Envelope,
	reason string,
	at time.Time,
) (store.NetworkAuditEntry, error)

NormalizeAuditEntry derives a consistent audit row from envelope metadata.

func NormalizeDirectRoomPeers

func NormalizeDirectRoomPeers(localPeer string, remotePeer string) (string, string, error)

NormalizeDirectRoomPeers validates, rejects same-peer rooms, and returns peers in their stable direct-room storage order.

func PreviewTextForRawBody

func PreviewTextForRawBody(kind Kind, raw json.RawMessage) string

PreviewTextForRawBody derives operator-facing preview text from one raw persisted message body. Invalid bodies return an empty preview.

func ResolveGreetSummary

func ResolveGreetSummary(card PeerCard, summary string) string

ResolveGreetSummary returns a deterministic operator-facing summary for one greet advertisement.

func RouteToken

func RouteToken(peerID string) (string, error)

RouteToken derives the deterministic NATS route token for one peer.

func ValidateChannel

func ValidateChannel(channel string) error

ValidateChannel reports whether the channel matches the RFC grammar.

func ValidateConversationID

func ValidateConversationID(id string, field string) error

ValidateConversationID reports whether a container identifier matches its field grammar.

func ValidateConversationRef

func ValidateConversationRef(ref ConversationRef) error

ValidateConversationRef reports whether a conversation reference identifies exactly one container.

func ValidateDirectRoomBinding

func ValidateDirectRoomBinding(workspaceID string, channel string, directID string, peerA string, peerB string) error

ValidateDirectRoomBinding proves that an existing direct room row matches the deterministic identity for its workspace/channel-scoped peer pair.

func ValidateDirectRoomPeers

func ValidateDirectRoomPeers(peerA string, peerB string) error

ValidateDirectRoomPeers reports whether two peer IDs form a valid two-party direct room.

func ValidateEnvelope

func ValidateEnvelope(env Envelope, opts ValidateOptions) error

ValidateEnvelope validates one envelope without returning a normalized copy.

func ValidateEnvelopeConversation

func ValidateEnvelopeConversation(env Envelope) error

ValidateEnvelopeConversation enforces kind-specific container and work fields.

func ValidatePeerID

func ValidatePeerID(peerID string) error

ValidatePeerID reports whether the peer identifier matches the RFC grammar.

func ValidateSurface

func ValidateSurface(surface Surface) error

ValidateSurface reports whether the surface matches the RFC conversation values.

func ValidateWorkID

func ValidateWorkID(id string) error

ValidateWorkID reports whether a work id can safely cross the network boundary.

func ValidateWorkState

func ValidateWorkState(state WorkState) error

ValidateWorkState reports whether the state is a known work lifecycle state.

func ValidateWorkTransition

func ValidateWorkTransition(from WorkState, to WorkState) error

ValidateWorkTransition reports whether a trace may advance work from one state to another.

func ValidateWorkspaceID

func ValidateWorkspaceID(workspaceID string) error

ValidateWorkspaceID reports whether the workspace identity can safely occupy one NATS subject token and one protocol envelope field.

Types

type AuditStore

type AuditStore interface {
	WriteNetworkAudit(ctx context.Context, entry store.NetworkAuditEntry) error
}

AuditStore is the persistence surface consumed by the network audit writer.

type AuditWriter

type AuditWriter interface {
	RecordSent(ctx context.Context, sessionID string, envelope Envelope) error
	RecordReceived(ctx context.Context, sessionID string, envelope Envelope) error
	RecordRejected(ctx context.Context, sessionID string, envelope Envelope, reason string) error
	RecordDelivered(ctx context.Context, sessionID string, envelope Envelope) error
}

AuditWriter records network activity into the configured sinks.

type AuditWriterOption

type AuditWriterOption func(*FileAuditWriter)

type Body

type Body interface {
	Kind() Kind
}

Body is the typed representation of one envelope body.

func DecodeBody

func DecodeBody(kind Kind, raw json.RawMessage) (Body, error)

DecodeBody parses and validates one kind-specific envelope body.

type CapabilityBody

type CapabilityBody struct {
	Capability CapabilityEnvelopePayload `json:"capability"`
}

CapabilityBody carries or advertises one transferable capability artifact.

func (CapabilityBody) Kind

func (CapabilityBody) Kind() Kind

Kind returns the wire kind for the body.

type CapabilityEnvelopePayload

type CapabilityEnvelopePayload struct {
	ID                string   `json:"id"`
	Summary           string   `json:"summary"`
	Outcome           string   `json:"outcome"`
	Version           string   `json:"version,omitempty"`
	Digest            string   `json:"digest"`
	ContextNeeded     []string `json:"context_needed,omitempty"`
	ArtifactsExpected []string `json:"artifacts_expected,omitempty"`
	ExecutionOutline  []string `json:"execution_outline,omitempty"`
	Constraints       []string `json:"constraints,omitempty"`
	Examples          []string `json:"examples,omitempty"`
	Requirements      []string `json:"requirements,omitempty"`
}

CapabilityEnvelopePayload is the transferable unified capability document.

type ChannelInfo

type ChannelInfo struct {
	WorkspaceID string
	Channel     string
	PeerCount   int
}

ChannelInfo summarizes one active runtime channel.

type ConversationRef

type ConversationRef struct {
	WorkspaceID string
	Channel     string
	Surface     Surface
	ThreadID    string
	DirectID    string
}

ConversationRef identifies exactly one conversation container.

func ConversationRefFromEnvelope

func ConversationRefFromEnvelope(env Envelope) (ConversationRef, error)

ConversationRefFromEnvelope returns the validated container reference for a conversation envelope.

func (ConversationRef) ContainerKey

func (r ConversationRef) ContainerKey() string

ContainerKey returns a stable workspace/channel/surface/container key.

func (ConversationRef) IsDirect

func (r ConversationRef) IsDirect() bool

IsDirect reports whether the reference targets a direct room.

func (ConversationRef) IsThread

func (r ConversationRef) IsThread() bool

IsThread reports whether the reference targets a public thread.

func (ConversationRef) Validate

func (r ConversationRef) Validate() error

Validate reports whether the reference identifies exactly one container.

type Delivery

type Delivery struct {
	SessionID string
	PeerID    string
	Envelope  Envelope
}

Delivery is one accepted inbound message delivery target.

type Envelope

type Envelope struct {
	Protocol    string          `json:"protocol"`
	ID          string          `json:"id"`
	WorkspaceID string          `json:"workspace_id"`
	Kind        Kind            `json:"kind"`
	Channel     string          `json:"channel"`
	Surface     *Surface        `json:"surface,omitempty"`
	ThreadID    *string         `json:"thread_id,omitempty"`
	DirectID    *string         `json:"direct_id,omitempty"`
	From        string          `json:"from"`
	To          *string         `json:"to,omitempty"`
	WorkID      *string         `json:"work_id,omitempty"`
	ReplyTo     *string         `json:"reply_to,omitempty"`
	TraceID     *string         `json:"trace_id,omitempty"`
	CausationID *string         `json:"causation_id,omitempty"`
	TS          int64           `json:"ts"`
	ExpiresAt   *int64          `json:"expires_at,omitempty"`
	Body        json.RawMessage `json:"body"`
	Proof       *Proof          `json:"proof"`
	Ext         ExtensionMap    `json:"ext,omitempty"`
}

Envelope is the shared AGH Network v0 wire envelope.

func NormalizeEnvelope

func NormalizeEnvelope(env Envelope, opts ValidateOptions) (Envelope, error)

NormalizeEnvelope trims identifier fields, validates the envelope, and returns a safe cloned copy for downstream use.

func ParseEnvelope

func ParseEnvelope(data []byte, opts ValidateOptions) (Envelope, error)

ParseEnvelope decodes, validates, and normalizes one raw envelope.

func (Envelope) DecodeBody

func (e Envelope) DecodeBody() (Body, error)

DecodeBody parses and validates the envelope body using the envelope kind.

func (Envelope) IsBroadcast

func (e Envelope) IsBroadcast() bool

IsBroadcast reports whether the envelope is channel-broadcast.

func (Envelope) IsDirected

func (e Envelope) IsDirected() bool

IsDirected reports whether the envelope targets a specific peer.

func (*Envelope) UnmarshalJSON

func (e *Envelope) UnmarshalJSON(data []byte) error

UnmarshalJSON rejects obsolete hard-cut wire fields before decoding.

type ExtensionMap

type ExtensionMap map[string]json.RawMessage

ExtensionMap preserves opaque extension payloads without interpreting them.

type FileAuditWriter

type FileAuditWriter struct {
	// contains filtered or unexported fields
}

FileAuditWriter writes normalized network audit records to a JSONL file and optionally mirrors them into a persistent store.

func NewAuditWriter

func NewAuditWriter(path string, auditStore AuditStore, opts ...AuditWriterOption) (*FileAuditWriter, error)

NewAuditWriter constructs the dual-path network audit writer.

func (*FileAuditWriter) RecordDelivered

func (w *FileAuditWriter) RecordDelivered(ctx context.Context, sessionID string, envelope Envelope) error

RecordDelivered stores a delivered network audit record.

func (*FileAuditWriter) RecordReceived

func (w *FileAuditWriter) RecordReceived(ctx context.Context, sessionID string, envelope Envelope) error

RecordReceived stores a received network audit record.

func (*FileAuditWriter) RecordRejected

func (w *FileAuditWriter) RecordRejected(
	ctx context.Context,
	sessionID string,
	envelope Envelope,
	reason string,
) error

RecordRejected stores a rejected network audit record.

func (*FileAuditWriter) RecordSent

func (w *FileAuditWriter) RecordSent(ctx context.Context, sessionID string, envelope Envelope) error

RecordSent stores a sent network audit record.

func (*FileAuditWriter) RecordTaskIngress

func (w *FileAuditWriter) RecordTaskIngress(ctx context.Context, audit TaskIngressAudit) error

RecordTaskIngress stores one accepted or rejected task-ingress audit record using the existing network audit sinks.

type GreetBody

type GreetBody struct {
	PeerCard PeerCard `json:"peer_card"`
	Summary  string   `json:"summary,omitempty"`
}

GreetBody advertises peer presence and capabilities in a channel.

func (GreetBody) Kind

func (GreetBody) Kind() Kind

Kind returns the wire kind for the body.

type Heartbeat

type Heartbeat struct {
	// contains filtered or unexported fields
}

Heartbeat owns one periodic greet publisher.

func (*Heartbeat) Done

func (h *Heartbeat) Done() <-chan struct{}

Done returns the heartbeat completion signal.

func (*Heartbeat) Stop

func (h *Heartbeat) Stop()

Stop cancels the heartbeat and waits for its goroutine to exit.

type Kind

type Kind string

Kind identifies one normative AGH Network message kind.

const (
	KindGreet      Kind = "greet"
	KindWhois      Kind = "whois"
	KindSay        Kind = "say"
	KindCapability Kind = "capability"
	KindReceipt    Kind = "receipt"
	KindTrace      Kind = "trace"
)

func (Kind) Validate

func (k Kind) Validate() error

Validate reports whether the kind is one of the documented RFC values.

type KindMetric

type KindMetric struct {
	Kind      Kind
	Sent      int64
	Received  int64
	Rejected  int64
	Delivered int64
}

KindMetric is the runtime per-kind network activity snapshot surfaced by status APIs.

type LifecycleAction

type LifecycleAction string

LifecycleAction explains how a lifecycle helper handled one message.

const (
	LifecycleActionOpened     LifecycleAction = "opened"
	LifecycleActionAdvanced   LifecycleAction = "advanced"
	LifecycleActionUnchanged  LifecycleAction = "unchanged"
	LifecycleActionIgnored    LifecycleAction = "ignored"
	LifecycleActionRejectWork LifecycleAction = "reject_work"
)

type LifecycleResult

type LifecycleResult struct {
	Work       Work
	Action     LifecycleAction
	ReasonCode *ReasonCode
}

LifecycleResult is the reusable lifecycle decision surface for router code.

func ApplyWorkEnvelope

func ApplyWorkEnvelope(current *Work, env Envelope, at time.Time) (LifecycleResult, error)

ApplyWorkEnvelope applies one validated lifecycle envelope to the current work state and returns the router-facing decision.

type LocalPeer

type LocalPeer struct {
	SessionID         string
	PeerID            string
	WorkspaceID       string
	Channel           string
	PeerCard          PeerCard
	CapabilityCatalog []sessionpkg.NetworkPeerCapability
	JoinedAt          time.Time
}

LocalPeer is one daemon-local peer joined to one runtime channel.

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager owns transport, routing, presence, delivery, and the late-bound session lifecycle callbacks required by daemon boot integration.

func NewManager

func NewManager(
	ctx context.Context,
	cfg aghconfig.NetworkConfig,
	prompter deliveryPrompter,
	auditPath string,
	auditStore AuditStore,
	opts ...ManagerOption,
) (*Manager, error)

NewManager constructs the top-level network runtime and starts the embedded transport it owns.

func (*Manager) CancelTaskFromPeer

func (m *Manager) CancelTaskFromPeer(
	ctx context.Context,
	ingress TaskIngressContext,
	taskID string,
	req taskpkg.CancelTask,
) (*taskpkg.Task, error)

CancelTaskFromPeer requests manager-owned task cancellation after validating the authenticated peer context and task channel binding.

func (*Manager) CreateTaskFromPeer

func (m *Manager) CreateTaskFromPeer(
	ctx context.Context,
	ingress TaskIngressContext,
	spec taskpkg.CreateTask,
) (*taskpkg.Task, error)

CreateTaskFromPeer creates one task on behalf of an authenticated network peer after channel and capability validation succeed.

func (*Manager) EnqueueRunFromPeer

func (m *Manager) EnqueueRunFromPeer(
	ctx context.Context,
	ingress TaskIngressContext,
	spec taskpkg.EnqueueRun,
) (*taskpkg.Run, error)

EnqueueRunFromPeer enqueues one task run from an authenticated network peer while preserving origin-scoped idempotency inside the task manager.

func (*Manager) Inbox

func (m *Manager) Inbox(ctx context.Context, sessionID string) ([]Envelope, error)

Inbox returns the queued inbound envelopes for one local session.

func (*Manager) JoinChannel

func (m *Manager) JoinChannel(ctx context.Context, join sessionpkg.NetworkPeerJoin) error

JoinChannel registers one daemon-local session as a visible network peer.

func (*Manager) LeaveChannel

func (m *Manager) LeaveChannel(ctx context.Context, sessionID string) error

LeaveChannel removes one daemon-local session from the active network runtime.

func (*Manager) ListChannels

func (m *Manager) ListChannels(ctx context.Context, workspaceID string) ([]ChannelInfo, error)

ListChannels returns the currently active runtime channels.

func (*Manager) ListPeers

func (m *Manager) ListPeers(ctx context.Context, workspaceID string, channel string) ([]PeerInfo, error)

ListPeers returns the current visible local+remote peer snapshot.

func (*Manager) OnTurnEnd

func (m *Manager) OnTurnEnd(sessionID string)

OnTurnEnd wakes the per-session delivery worker after a prompt turn finishes.

func (*Manager) Send

func (m *Manager) Send(ctx context.Context, req SendRequest) (string, error)

Send publishes one outbound envelope through the owned router/transport.

func (*Manager) Shutdown

func (m *Manager) Shutdown(ctx context.Context) error

Shutdown drains all background work and stops the owned transport.

func (*Manager) Status

func (m *Manager) Status(ctx context.Context) (*Status, error)

Status returns a safe diagnostics snapshot without exposing transport credentials.

func (*Manager) UpdateTaskFromPeer

func (m *Manager) UpdateTaskFromPeer(
	ctx context.Context,
	ingress TaskIngressContext,
	taskID string,
	patch taskpkg.Patch,
) (*taskpkg.Task, error)

UpdateTaskFromPeer applies one mutable task patch through the task manager after enforcing channel-bound ingress rules.

func (*Manager) WaitInbox

func (m *Manager) WaitInbox(ctx context.Context, sessionID string, channel string) ([]Envelope, error)

WaitInbox blocks until one or more queued inbound envelopes are available for the local session, optionally filtered by channel.

type ManagerOption

type ManagerOption func(*managerOptions)

ManagerOption customizes network manager construction.

func WithManagerAuditWriter

func WithManagerAuditWriter(auditor AuditWriter) ManagerOption

WithManagerAuditWriter injects a custom audit sink, primarily for tests.

func WithManagerClock

func WithManagerClock(now func() time.Time) ManagerOption

WithManagerClock overrides the manager clock, primarily for tests.

func WithManagerConversationStore

func WithManagerConversationStore(conversations store.NetworkConversationStore) ManagerOption

WithManagerConversationStore injects the durable conversation repository used as the commit boundary before runtime delivery side effects.

func WithManagerHookDispatcher

func WithManagerHookDispatcher(dispatcher HookDispatcher) ManagerOption

WithManagerHookDispatcher injects the network hook dispatcher.

func WithManagerLogger

func WithManagerLogger(logger *slog.Logger) ManagerOption

WithManagerLogger overrides the logger used by the network manager.

func WithManagerTaskService

func WithManagerTaskService(tasks TaskService) ManagerOption

WithManagerTaskService injects the daemon-owned task manager used for authenticated network task ingress.

type MetricSample

type MetricSample struct {
	Name   string
	Labels map[string]string
	Value  int64
}

MetricSample is one low-cardinality runtime network metric sample.

type PeerCard

type PeerCard struct {
	PeerID              string       `json:"peer_id"`
	DisplayName         *string      `json:"display_name,omitempty"`
	ProfilesSupported   []string     `json:"profiles_supported"`
	Capabilities        []string     `json:"capabilities"`
	ArtifactsSupported  []string     `json:"artifacts_supported"`
	TrustModesSupported []string     `json:"trust_modes_supported"`
	Ext                 ExtensionMap `json:"ext,omitempty"`
}

PeerCard advertises one peer's identity and capabilities.

func DefaultPeerCard

func DefaultPeerCard(peerID string) (PeerCard, error)

DefaultPeerCard returns the minimal protocol peer card for one peer identifier.

type PeerInfo

type PeerInfo struct {
	SessionID              *string
	PeerID                 string
	WorkspaceID            string
	Channel                string
	Local                  bool
	PeerCard               PeerCard
	CapabilityCatalog      []sessionpkg.NetworkPeerCapability
	CapabilityCatalogKnown bool
	JoinedAt               *time.Time
	LastSeen               *time.Time
	ExpiresAt              *time.Time
	PresenceState          PresenceState
	LastSeenAgeSeconds     *int64
}

PeerInfo is the API-facing snapshot for one visible peer.

type PeerLifecycleEvent

type PeerLifecycleEvent struct {
	Kind      PeerLifecycleKind
	Peer      PeerInfo
	Timestamp time.Time
}

PeerLifecycleEvent is the runtime-local representation of a peer presence transition. Transports convert it to hooks or event summaries; it is not part of the wire protocol.

type PeerLifecycleKind

type PeerLifecycleKind string

PeerLifecycleKind describes one peer membership transition observed by the router or manager lifecycle.

const (
	PeerLifecycleJoined PeerLifecycleKind = "joined"
	PeerLifecycleLeft   PeerLifecycleKind = "left"
)

type PeerRegistry

type PeerRegistry struct {
	// contains filtered or unexported fields
}

PeerRegistry tracks local session peers plus the remote peer cache.

func NewPeerRegistry

func NewPeerRegistry(greetInterval time.Duration, opts ...PeerRegistryOption) (*PeerRegistry, error)

NewPeerRegistry constructs the in-memory presence registry.

func (*PeerRegistry) ExpireRemotes

func (r *PeerRegistry) ExpireRemotes(at time.Time) []RemotePeerEntry

ExpireRemotes removes expired remotes and reports each peer removed by the same captured time.

func (*PeerRegistry) GreetInterval

func (r *PeerRegistry) GreetInterval() time.Duration

GreetInterval reports the configured presence heartbeat interval.

func (*PeerRegistry) HasPresence

func (r *PeerRegistry) HasPresence(workspaceID string, channel string, peerID string, at time.Time) bool

HasPresence reports whether the peer is visible and unexpired in the given channel.

func (*PeerRegistry) LeaveLocal

func (r *PeerRegistry) LeaveLocal(sessionID string) (LocalPeer, bool)

LeaveLocal removes one local session peer from the registry.

func (*PeerRegistry) ListChannels

func (r *PeerRegistry) ListChannels(workspaceID string, at time.Time) []ChannelInfo

ListChannels returns active runtime channels plus current peer counts.

func (*PeerRegistry) ListPeers

func (r *PeerRegistry) ListPeers(workspaceID string, channel string, at time.Time) []PeerInfo

ListPeers returns visible peers, optionally filtered to one workspace channel.

func (*PeerRegistry) LocalByPeer

func (r *PeerRegistry) LocalByPeer(workspaceID string, channel string, peerID string) (LocalPeer, bool)

LocalByPeer resolves one local peer by workspace, channel, and peer ID.

func (*PeerRegistry) LocalBySession

func (r *PeerRegistry) LocalBySession(sessionID string) (LocalPeer, bool)

LocalBySession resolves one local peer by session ID.

func (*PeerRegistry) LocalPeers

func (r *PeerRegistry) LocalPeers(workspaceID string, channel string) []LocalPeer

LocalPeers returns the local peers currently joined to one workspace channel.

func (*PeerRegistry) LookupPresence

func (r *PeerRegistry) LookupPresence(
	workspaceID string,
	channel string,
	peerID string,
	at time.Time,
) (PeerInfo, bool)

LookupPresence resolves one peer from the local registry first, then the remote cache.

func (*PeerRegistry) MatchLocalPeers

func (r *PeerRegistry) MatchLocalPeers(workspaceID string, channel string, query string) []LocalPeer

MatchLocalPeers returns local peers matching one whois query.

func (*PeerRegistry) RefreshRemote

func (r *PeerRegistry) RefreshRemote(
	workspaceID string,
	channel string,
	card PeerCard,
	seenAt time.Time,
) (RemotePeerEntry, bool, error)

RefreshRemote stores or refreshes one remote peer advertisement.

func (*PeerRegistry) RefreshRemoteDetailed

func (r *PeerRegistry) RefreshRemoteDetailed(
	workspaceID string,
	channel string,
	card PeerCard,
	capabilityCatalog []sessionpkg.NetworkPeerCapability,
	capabilityCatalogKnown bool,
	seenAt time.Time,
) (RemoteRefreshResult, error)

RefreshRemoteDetailed stores or refreshes one remote peer advertisement and reports lifecycle changes caused by the same registry mutation.

func (*PeerRegistry) RefreshRemoteWithCapabilityCatalog

func (r *PeerRegistry) RefreshRemoteWithCapabilityCatalog(
	workspaceID string,
	channel string,
	card PeerCard,
	capabilityCatalog []sessionpkg.NetworkPeerCapability,
	capabilityCatalogKnown bool,
	seenAt time.Time,
) (RemotePeerEntry, bool, error)

RefreshRemoteWithCapabilityCatalog stores or refreshes one remote peer advertisement plus optional rich capability discovery state learned via explicit whois responses.

func (*PeerRegistry) RegisterLocal

func (r *PeerRegistry) RegisterLocal(
	sessionID string,
	workspaceID string,
	channel string,
	card PeerCard,
	joinedAt time.Time,
) (LocalPeer, error)

RegisterLocal upserts one local peer membership keyed by session ID.

func (*PeerRegistry) RegisterLocalWithCapabilityCatalog

func (r *PeerRegistry) RegisterLocalWithCapabilityCatalog(
	sessionID string,
	workspaceID string,
	channel string,
	card PeerCard,
	capabilityCatalog []sessionpkg.NetworkPeerCapability,
	joinedAt time.Time,
) (LocalPeer, error)

RegisterLocalWithCapabilityCatalog upserts one local peer membership keyed by session ID, optionally retaining the runtime-owned rich capability catalog for explicit whois discovery.

func (*PeerRegistry) RemoteByPeer

func (r *PeerRegistry) RemoteByPeer(
	workspaceID string,
	channel string,
	peerID string,
	at time.Time,
) (RemotePeerEntry, bool)

RemoteByPeer resolves one active remote peer entry.

type PeerRegistryOption

type PeerRegistryOption func(*PeerRegistry)

PeerRegistryOption customizes the registry runtime.

func WithPeerRegistryClock

func WithPeerRegistryClock(now func() time.Time) PeerRegistryOption

WithPeerRegistryClock overrides the time source used by the registry.

type Presence

type Presence struct {
	State              PresenceState
	LastSeenAgeSeconds *int64
}

Presence captures derived activity state without introducing a second source of truth beyond PeerRegistry timestamps.

func DerivePresence

func DerivePresence(peer PeerInfo, now time.Time, greetInterval time.Duration) Presence

DerivePresence derives a peer's activity state from one captured clock value and the network greet interval. It is intentionally pure: callers own snapshots and time capture.

type PresenceState

type PresenceState string

PresenceState is the daemon-derived activity state for one network peer.

const (
	PresenceStateLocal    PresenceState = "local"
	PresenceStateActive   PresenceState = "active"
	PresenceStateInactive PresenceState = "inactive"
	PresenceStateExpired  PresenceState = "expired"
	PresenceStateUnknown  PresenceState = "unknown"
)

type Proof

type Proof map[string]json.RawMessage

Proof preserves the opaque protocol proof payload for forward compatibility.

type ReasonCode

type ReasonCode string

ReasonCode identifies one registered protocol rejection reason.

const (
	ReasonCodeMalformed             ReasonCode = "malformed"
	ReasonCodeExpired               ReasonCode = "expired"
	ReasonCodeDuplicate             ReasonCode = "duplicate"
	ReasonCodeUnsupportedKind       ReasonCode = "unsupported_kind"
	ReasonCodeUnsupportedProfile    ReasonCode = "unsupported_profile"
	ReasonCodeVerificationFailed    ReasonCode = "verification_failed"
	ReasonCodeNotTarget             ReasonCode = "not_target"
	ReasonCodeNotFound              ReasonCode = "not_found"
	ReasonCodeBusy                  ReasonCode = "busy"
	ReasonCodeInternal              ReasonCode = "internal"
	ReasonCodeInvalidSurface        ReasonCode = "invalid_surface"
	ReasonCodeConversationNotFound  ReasonCode = "conversation_not_found"
	ReasonCodeWorkClosed            ReasonCode = "work_closed"
	ReasonCodeWorkContainerMismatch ReasonCode = "work_container_mismatch"
	ReasonCodeLegacyFieldRejected   ReasonCode = "legacy_field_rejected"
)

func (ReasonCode) Validate

func (r ReasonCode) Validate() error

Validate reports whether the reason code belongs to the v0 registry.

type ReceiptBody

type ReceiptBody struct {
	ForID      string        `json:"for_id"`
	Status     ReceiptStatus `json:"status"`
	ReasonCode *ReasonCode   `json:"reason_code,omitempty"`
	Detail     *string       `json:"detail,omitempty"`
}

ReceiptBody acknowledges or rejects protocol-level admission.

func (ReceiptBody) Kind

func (ReceiptBody) Kind() Kind

Kind returns the wire kind for the body.

type ReceiptStatus

type ReceiptStatus string

ReceiptStatus identifies one receipt admission status.

const (
	ReceiptStatusAccepted    ReceiptStatus = "accepted"
	ReceiptStatusRejected    ReceiptStatus = "rejected"
	ReceiptStatusDuplicate   ReceiptStatus = "duplicate"
	ReceiptStatusExpired     ReceiptStatus = "expired"
	ReceiptStatusUnsupported ReceiptStatus = "unsupported"
	ReceiptStatusCanceled    ReceiptStatus = "canceled"
)

func (ReceiptStatus) Validate

func (s ReceiptStatus) Validate() error

Validate reports whether the receipt status is documented by the RFC.

type RemotePeerEntry

type RemotePeerEntry struct {
	PeerID                 string
	PeerCard               PeerCard
	WorkspaceID            string
	Channel                string
	CapabilityCatalog      []sessionpkg.NetworkPeerCapability
	CapabilityCatalogKnown bool
	LastSeen               time.Time
	ExpiresAt              time.Time
}

RemotePeerEntry is one cached remote peer advertisement.

type RemoteRefreshResult

type RemoteRefreshResult struct {
	Entry   RemotePeerEntry
	Stored  bool
	Joined  bool
	Expired []RemotePeerEntry
}

RemoteRefreshResult reports one remote greet cache mutation plus any peers expired while applying the same captured time.

type RouteResult

type RouteResult struct {
	Envelope   *Envelope
	Deliveries []Delivery
	Generated  []Envelope
	PeerEvents []PeerLifecycleEvent
	Duplicate  bool
	Ignored    bool
	Rejected   bool
	ReasonCode *ReasonCode
}

RouteResult is the router decision for one inbound envelope.

type Router

type Router struct {
	// contains filtered or unexported fields
}

Router handles outbound subject selection plus inbound receiver policy.

func NewRouter

func NewRouter(
	peers *PeerRegistry,
	transport RouterTransport,
	maxReplayAge time.Duration,
	opts ...RouterOption,
) (*Router, error)

NewRouter constructs the routing runtime on top of a peer registry.

func (*Router) Leave

func (r *Router) Leave(sessionID string) (LocalPeer, bool)

Leave removes the local sender presence for one session.

func (*Router) PrepareSend

func (r *Router) PrepareSend(ctx context.Context, req SendRequest) (SendResult, error)

PrepareSend validates one outbound request and computes the publish subject without performing transport side effects.

func (*Router) PublishGreet

func (r *Router) PublishGreet(ctx context.Context, sessionID string, summary string) (SendResult, error)

PublishGreet advertises one local peer card to its joined channel.

func (*Router) PublishPrepared

func (r *Router) PublishPrepared(ctx context.Context, prepared SendResult) (SendResult, error)

PublishPrepared publishes a previously prepared envelope and syncs the local lifecycle cache only after the transport accepts the publish.

func (*Router) Receive

func (r *Router) Receive(ctx context.Context, payload []byte) (RouteResult, error)

Receive validates one inbound envelope, updates presence, and returns delivery decisions.

func (*Router) Send

func (r *Router) Send(ctx context.Context, req SendRequest) (SendResult, error)

Send validates one outbound request, enforces presence preflight, and publishes it.

func (*Router) StartHeartbeat

func (r *Router) StartHeartbeat(ctx context.Context, sessionID string, summary string) (*Heartbeat, error)

StartHeartbeat publishes greet immediately, then keeps re-greeting on the configured interval.

type RouterOption

type RouterOption func(*Router)

RouterOption customizes router construction.

func WithRouterClock

func WithRouterClock(now func() time.Time) RouterOption

WithRouterClock overrides the clock used for send and receive decisions.

type RouterTransport

type RouterTransport interface {
	Publish(ctx context.Context, subject string, payload []byte) error
}

RouterTransport is the narrow publish surface consumed by the router.

type SayBody

type SayBody struct {
	Text      string            `json:"text"`
	Artifacts []json.RawMessage `json:"artifacts,omitempty"`
	Intent    string            `json:"intent,omitempty"`
}

SayBody carries broadcast chat-first communication.

func (SayBody) Kind

func (SayBody) Kind() Kind

Kind returns the wire kind for the body.

type SendRequest

type SendRequest struct {
	SessionID   string
	WorkspaceID string
	Channel     string
	Surface     *Surface
	ThreadID    *string
	DirectID    *string
	Kind        Kind
	To          *string
	Body        json.RawMessage
	WorkID      *string
	ReplyTo     *string
	TraceID     *string
	CausationID *string
	ExpiresAt   *int64
	ID          *string
	Ext         ExtensionMap
}

SendRequest carries one caller-supplied outbound envelope request.

type SendResult

type SendResult struct {
	ID       string
	Subject  string
	Envelope Envelope
}

SendResult summarizes one outbound publish.

type Status

type Status struct {
	Enabled              bool
	Status               string
	ListenerHost         string
	ListenerPort         int
	LocalPeers           int
	RemotePeers          int
	Channels             int
	QueuedMessages       int
	QueuedSessions       int
	DeliveryWorkers      int
	DeliveryQueueDepth   int
	MessagesSent         int64
	MessagesReceived     int64
	MessagesRejected     int64
	MessagesDelivered    int64
	WorkflowTaggedEvents int64
	HandoffTaggedEvents  int64
	OpenThreads          int64
	OpenDirectRooms      int64
	OpenWorkItems        int64
	ConversationMessages int64
	WorkTransitions      int64
	DirectResolves       int64
	LastDisconnect       string
	KindMetrics          []KindMetric
	Metrics              []MetricSample
}

Status is the manager-facing diagnostics snapshot consumed by daemon status and later transport surfaces.

type Surface

type Surface string

Surface identifies the conversation container class for one message.

const (
	SurfaceThread Surface = "thread"
	SurfaceDirect Surface = "direct"
)

func (Surface) Validate

func (s Surface) Validate() error

Validate reports whether the surface is one of the documented values.

type TaskIngressAudit

type TaskIngressAudit struct {
	Action      string
	Direction   string
	PeerID      string
	WorkspaceID string
	Channel     string
	RequestID   string
	Reason      string
	Payload     any
}

TaskIngressAudit captures one task-domain ingress decision originating from a validated network peer.

type TaskIngressAuditWriter

type TaskIngressAuditWriter interface {
	RecordTaskIngress(ctx context.Context, audit TaskIngressAudit) error
}

TaskIngressAuditWriter is the optional audit extension used by task-aware network ingress. Existing protocol-message auditing remains unchanged.

type TaskIngressContext

type TaskIngressContext struct {
	WorkspaceID string
	PeerID      string
	Channel     string
	RequestID   string
	Surface     Surface
	ThreadID    string
	DirectID    string
	WorkID      string
	ReplyTo     string
	TraceID     string
	CausationID string
}

TaskIngressContext captures the trusted peer identity and delivery metadata that network ingress derives from the live runtime rather than the payload.

func (TaskIngressContext) Validate

func (c TaskIngressContext) Validate() error

Validate reports whether the ingress context contains the mandatory peer and delivery identifiers.

type TaskService

type TaskService interface {
	GetTask(ctx context.Context, id string, actor taskpkg.ActorContext) (*taskpkg.View, error)
	CreateTask(ctx context.Context, spec taskpkg.CreateTask, actor taskpkg.ActorContext) (*taskpkg.Task, error)
	UpdateTask(
		ctx context.Context,
		id string,
		patch taskpkg.Patch,
		actor taskpkg.ActorContext,
	) (*taskpkg.Task, error)
	CancelTask(
		ctx context.Context,
		id string,
		req taskpkg.CancelTask,
		actor taskpkg.ActorContext,
	) (*taskpkg.Task, error)
	EnqueueRun(ctx context.Context, spec taskpkg.EnqueueRun, actor taskpkg.ActorContext) (*taskpkg.Run, error)
}

TaskService is the narrowed task-domain surface consumed by network ingress.

type TraceBody

type TraceBody struct {
	State        WorkState         `json:"state"`
	Message      string            `json:"message,omitempty"`
	Result       json.RawMessage   `json:"result,omitempty"`
	ArtifactRefs []json.RawMessage `json:"artifact_refs,omitempty"`
}

TraceBody reports progress or terminal outcome for work.

func (TraceBody) Kind

func (TraceBody) Kind() Kind

Kind returns the wire kind for the body.

type Transport

type Transport struct {
	// contains filtered or unexported fields
}

Transport owns the embedded NATS server plus the daemon's in-process connection.

func NewTransport

func NewTransport(ctx context.Context, cfg aghconfig.NetworkConfig, opts ...TransportOption) (*Transport, error)

NewTransport starts an embedded NATS server and connects the daemon to it using an in-process, token-authenticated connection.

func (*Transport) ClientURL

func (t *Transport) ClientURL() string

ClientURL reports the internal NATS client URL used by the transport.

func (*Transport) Drain

func (t *Transport) Drain(ctx context.Context) error

Drain closes the daemon connection cleanly before the server is shut down.

func (*Transport) Port

func (t *Transport) Port() int

Port reports the resolved listener port. Random-port servers return the actual chosen port after startup.

func (*Transport) Publish

func (t *Transport) Publish(ctx context.Context, subject string, payload []byte) error

Publish sends one payload to a NATS subject and flushes the connection.

func (*Transport) Shutdown

func (t *Transport) Shutdown(ctx context.Context) error

Shutdown drains the daemon connection and stops the embedded server.

func (*Transport) Subscribe

func (t *Transport) Subscribe(subject string, handler func(*nats.Msg)) (*nats.Subscription, error)

Subscribe registers a callback for one subject.

type TransportOption

type TransportOption func(*transportOptions)

TransportOption customizes embedded transport startup behavior.

func WithTransportDisconnectHandler

func WithTransportDisconnectHandler(handler func(error)) TransportOption

WithTransportDisconnectHandler registers a disconnect callback for later manager wiring.

func WithTransportLogger

func WithTransportLogger(logger *slog.Logger) TransportOption

WithTransportLogger overrides the logger used by the transport.

func WithTransportPublishTimeout

func WithTransportPublishTimeout(timeout time.Duration) TransportOption

WithTransportPublishTimeout overrides the publish flush timeout when the caller does not provide a deadline.

func WithTransportReadyTimeout

func WithTransportReadyTimeout(timeout time.Duration) TransportOption

WithTransportReadyTimeout overrides the server readiness timeout.

func WithTransportReconnectHandler

func WithTransportReconnectHandler(handler func()) TransportOption

WithTransportReconnectHandler registers a reconnect callback for later manager wiring.

type ValidateOptions

type ValidateOptions struct {
	Now          time.Time
	MaxReplayAge time.Duration
}

ValidateOptions configures envelope validation and normalization.

type WhoisBody

type WhoisBody struct {
	Type     WhoisType `json:"type"`
	Query    string    `json:"query,omitempty"`
	PeerCard *PeerCard `json:"peer_card,omitempty"`
}

WhoisBody requests or returns peer card information.

func (WhoisBody) Kind

func (WhoisBody) Kind() Kind

Kind returns the wire kind for the body.

type WhoisType

type WhoisType string

WhoisType identifies the request or response shape for `whois`.

const (
	WhoisTypeRequest  WhoisType = "request"
	WhoisTypeResponse WhoisType = "response"
)

func (WhoisType) Validate

func (t WhoisType) Validate() error

Validate reports whether the whois type is a documented value.

type Work

type Work struct {
	ID         string
	Ref        ConversationRef
	Initiator  string
	Target     string
	State      WorkState
	CreatedAt  time.Time
	UpdatedAt  time.Time
	TerminalAt *time.Time
}

Work tracks one directed work inside one channel.

func OpenWork

func OpenWork(env Envelope, at time.Time) (Work, error)

OpenWork opens a new work from the first directed message.

func (Work) IsParticipant

func (i Work) IsParticipant(peerID string) bool

IsParticipant reports whether the peer owns the work lifecycle.

func (Work) Validate

func (i Work) Validate() error

Validate reports whether the work carries a usable identity and state.

type WorkState

type WorkState string

WorkState identifies one RFC work lifecycle state.

const (
	WorkStateSubmitted  WorkState = "submitted"
	WorkStateWorking    WorkState = "working"
	WorkStateNeedsInput WorkState = "needs_input"
	WorkStateCompleted  WorkState = "completed"
	WorkStateFailed     WorkState = "failed"
	WorkStateCanceled   WorkState = "canceled"
)

func (WorkState) Validate

func (s WorkState) Validate() error

Validate reports whether the work state is documented by the RFC.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL