Documentation
¶
Overview ¶
Package network defines the AGH Network v0 protocol surface shared by the transport, router, and delivery layers.
Index ¶
- Constants
- Variables
- func BroadcastSubject(workspaceID string, channel string) (string, error)
- func DirectRoomIdentity(workspaceID string, channel string, localPeer string, remotePeer string) (string, string, string, error)
- func DirectSubject(workspaceID string, channel string, peerID string) (string, error)
- func IsTerminalState(state WorkState) bool
- func NormalizeAuditEntry(sessionID string, direction string, envelope Envelope, reason string, ...) (store.NetworkAuditEntry, error)
- func NormalizeDirectRoomPeers(localPeer string, remotePeer string) (string, string, error)
- func PreviewTextForRawBody(kind Kind, raw json.RawMessage) string
- func ResolveGreetSummary(card PeerCard, summary string) string
- func RouteToken(peerID string) (string, error)
- func ValidateChannel(channel string) error
- func ValidateConversationID(id string, field string) error
- func ValidateConversationRef(ref ConversationRef) error
- func ValidateDirectRoomBinding(workspaceID string, channel string, directID string, peerA string, ...) error
- func ValidateDirectRoomPeers(peerA string, peerB string) error
- func ValidateEnvelope(env Envelope, opts ValidateOptions) error
- func ValidateEnvelopeConversation(env Envelope) error
- func ValidatePeerID(peerID string) error
- func ValidateSurface(surface Surface) error
- func ValidateWorkID(id string) error
- func ValidateWorkState(state WorkState) error
- func ValidateWorkTransition(from WorkState, to WorkState) error
- func ValidateWorkspaceID(workspaceID string) error
- type AuditStore
- type AuditWriter
- type AuditWriterOption
- type Body
- type CapabilityBody
- type CapabilityEnvelopePayload
- type ChannelInfo
- type ConversationRef
- type Delivery
- type Envelope
- type ExtensionMap
- type FileAuditWriter
- func (w *FileAuditWriter) RecordDelivered(ctx context.Context, sessionID string, envelope Envelope) error
- func (w *FileAuditWriter) RecordReceived(ctx context.Context, sessionID string, envelope Envelope) error
- func (w *FileAuditWriter) RecordRejected(ctx context.Context, sessionID string, envelope Envelope, reason string) error
- func (w *FileAuditWriter) RecordSent(ctx context.Context, sessionID string, envelope Envelope) error
- func (w *FileAuditWriter) RecordTaskIngress(ctx context.Context, audit TaskIngressAudit) error
- type GreetBody
- type Heartbeat
- type HookDispatcher
- type Kind
- type KindMetric
- type LifecycleAction
- type LifecycleResult
- type LocalPeer
- type Manager
- func (m *Manager) CancelTaskFromPeer(ctx context.Context, ingress TaskIngressContext, taskID string, ...) (*taskpkg.Task, error)
- func (m *Manager) CreateTaskFromPeer(ctx context.Context, ingress TaskIngressContext, spec taskpkg.CreateTask) (*taskpkg.Task, error)
- func (m *Manager) EnqueueRunFromPeer(ctx context.Context, ingress TaskIngressContext, spec taskpkg.EnqueueRun) (*taskpkg.Run, error)
- func (m *Manager) Inbox(ctx context.Context, sessionID string) ([]Envelope, error)
- func (m *Manager) JoinChannel(ctx context.Context, join sessionpkg.NetworkPeerJoin) error
- func (m *Manager) LeaveChannel(ctx context.Context, sessionID string) error
- func (m *Manager) ListChannels(ctx context.Context, workspaceID string) ([]ChannelInfo, error)
- func (m *Manager) ListPeers(ctx context.Context, workspaceID string, channel string) ([]PeerInfo, error)
- func (m *Manager) OnTurnEnd(sessionID string)
- func (m *Manager) Send(ctx context.Context, req SendRequest) (string, error)
- func (m *Manager) Shutdown(ctx context.Context) error
- func (m *Manager) Status(ctx context.Context) (*Status, error)
- func (m *Manager) UpdateTaskFromPeer(ctx context.Context, ingress TaskIngressContext, taskID string, ...) (*taskpkg.Task, error)
- func (m *Manager) WaitInbox(ctx context.Context, sessionID string, channel string) ([]Envelope, error)
- type ManagerOption
- func WithManagerAuditWriter(auditor AuditWriter) ManagerOption
- func WithManagerClock(now func() time.Time) ManagerOption
- func WithManagerConversationStore(conversations store.NetworkConversationStore) ManagerOption
- func WithManagerHookDispatcher(dispatcher HookDispatcher) ManagerOption
- func WithManagerLogger(logger *slog.Logger) ManagerOption
- func WithManagerTaskService(tasks TaskService) ManagerOption
- type MetricSample
- type PeerCard
- type PeerInfo
- type PeerLifecycleEvent
- type PeerLifecycleKind
- type PeerRegistry
- func (r *PeerRegistry) ExpireRemotes(at time.Time) []RemotePeerEntry
- func (r *PeerRegistry) GreetInterval() time.Duration
- func (r *PeerRegistry) HasPresence(workspaceID string, channel string, peerID string, at time.Time) bool
- func (r *PeerRegistry) LeaveLocal(sessionID string) (LocalPeer, bool)
- func (r *PeerRegistry) ListChannels(workspaceID string, at time.Time) []ChannelInfo
- func (r *PeerRegistry) ListPeers(workspaceID string, channel string, at time.Time) []PeerInfo
- func (r *PeerRegistry) LocalByPeer(workspaceID string, channel string, peerID string) (LocalPeer, bool)
- func (r *PeerRegistry) LocalBySession(sessionID string) (LocalPeer, bool)
- func (r *PeerRegistry) LocalPeers(workspaceID string, channel string) []LocalPeer
- func (r *PeerRegistry) LookupPresence(workspaceID string, channel string, peerID string, at time.Time) (PeerInfo, bool)
- func (r *PeerRegistry) MatchLocalPeers(workspaceID string, channel string, query string) []LocalPeer
- func (r *PeerRegistry) RefreshRemote(workspaceID string, channel string, card PeerCard, seenAt time.Time) (RemotePeerEntry, bool, error)
- func (r *PeerRegistry) RefreshRemoteDetailed(workspaceID string, channel string, card PeerCard, ...) (RemoteRefreshResult, error)
- func (r *PeerRegistry) RefreshRemoteWithCapabilityCatalog(workspaceID string, channel string, card PeerCard, ...) (RemotePeerEntry, bool, error)
- func (r *PeerRegistry) RegisterLocal(sessionID string, workspaceID string, channel string, card PeerCard, ...) (LocalPeer, error)
- func (r *PeerRegistry) RegisterLocalWithCapabilityCatalog(sessionID string, workspaceID string, channel string, card PeerCard, ...) (LocalPeer, error)
- func (r *PeerRegistry) RemoteByPeer(workspaceID string, channel string, peerID string, at time.Time) (RemotePeerEntry, bool)
- type PeerRegistryOption
- type Presence
- type PresenceState
- type Proof
- type ReasonCode
- type ReceiptBody
- type ReceiptStatus
- type RemotePeerEntry
- type RemoteRefreshResult
- type RouteResult
- type Router
- func (r *Router) Leave(sessionID string) (LocalPeer, bool)
- func (r *Router) PrepareSend(ctx context.Context, req SendRequest) (SendResult, error)
- func (r *Router) PublishGreet(ctx context.Context, sessionID string, summary string) (SendResult, error)
- func (r *Router) PublishPrepared(ctx context.Context, prepared SendResult) (SendResult, error)
- func (r *Router) Receive(ctx context.Context, payload []byte) (RouteResult, error)
- func (r *Router) Send(ctx context.Context, req SendRequest) (SendResult, error)
- func (r *Router) StartHeartbeat(ctx context.Context, sessionID string, summary string) (*Heartbeat, error)
- type RouterOption
- type RouterTransport
- type SayBody
- type SendRequest
- type SendResult
- type Status
- type Surface
- type TaskIngressAudit
- type TaskIngressAuditWriter
- type TaskIngressContext
- type TaskService
- type TraceBody
- type Transport
- func (t *Transport) ClientURL() string
- func (t *Transport) Drain(ctx context.Context) error
- func (t *Transport) Port() int
- func (t *Transport) Publish(ctx context.Context, subject string, payload []byte) error
- func (t *Transport) Shutdown(ctx context.Context) error
- func (t *Transport) Subscribe(subject string, handler func(*nats.Msg)) (*nats.Subscription, error)
- type TransportOption
- func WithTransportDisconnectHandler(handler func(error)) TransportOption
- func WithTransportLogger(logger *slog.Logger) TransportOption
- func WithTransportPublishTimeout(timeout time.Duration) TransportOption
- func WithTransportReadyTimeout(timeout time.Duration) TransportOption
- func WithTransportReconnectHandler(handler func()) TransportOption
- type ValidateOptions
- type WhoisBody
- type WhoisType
- type Work
- type WorkState
Constants ¶
const ( // AuditDirectionSent records a successful outbound publish. AuditDirectionSent = "sent" // AuditDirectionReceived records an accepted inbound delivery. AuditDirectionReceived = "received" // AuditDirectionRejected records a rejected envelope. AuditDirectionRejected = "rejected" // AuditDirectionDelivered records a completed local delivery. AuditDirectionDelivered = "delivered" )
const ( // StatusDisabled reports that the network runtime is intentionally disabled. StatusDisabled = "disabled" // StatusRunning reports a connected network runtime. StatusRunning = "running" // StatusDisconnected reports a network runtime whose transport lost its connection. StatusDisconnected = "disconnected" )
const DefaultMaxReplayAge = 5 * time.Minute
DefaultMaxReplayAge is the RFC-recommended maximum receiver replay age when `expires_at` is not present.
const ProtocolV0 = "agh-network/v0"
ProtocolV0 is the workspace-qualified wire protocol identifier.
Variables ¶
var ( // ErrWorkNotFound reports that no current work matched the // lifecycle message being applied. ErrWorkNotFound = errors.New("network: work not found") // ErrWorkActorNotAllowed reports a lifecycle actor outside the // initiator/target pair. ErrWorkActorNotAllowed = errors.New("network: work actor not allowed") // ErrInvalidStateTransition reports an impossible lifecycle transition. ErrInvalidStateTransition = errors.New("network: invalid work state transition") // ErrWorkContainerMismatch reports that a work_id was continued from a // different conversation container than the one that opened it. ErrWorkContainerMismatch = errors.New("network: work container mismatch") // ErrWorkClosed reports a message for a terminal work that must // be rejected instead of reopening the work. ErrWorkClosed = errors.New("network: work closed") )
var ( // ErrLocalPeerNotFound reports an unknown local session sender. ErrLocalPeerNotFound = errors.New("network: local peer not found") // ErrTargetPeerNotFound reports a directed send target missing from presence. ErrTargetPeerNotFound = errors.New("network: target peer not found") // ErrDuplicateEnvelope reports a replay-window duplicate. ErrDuplicateEnvelope = errors.New("network: duplicate envelope") // ErrEnvelopeNotTarget reports a directed envelope for a peer this daemon does not own. ErrEnvelopeNotTarget = errors.New("network: envelope not targeted to a local peer") )
var ( // with a task service. ErrTaskIngressUnavailable = errors.New("network: task ingress is not configured") // ErrTaskIngressPeerNotFound reports that the supplied peer is not currently // authenticated in the requested channel. ErrTaskIngressPeerNotFound = errors.New("network: task ingress peer not found") // ErrTaskIngressCapabilityDenied reports that the peer lacks the capability // needed for task ingress. ErrTaskIngressCapabilityDenied = errors.New("network: task ingress capability denied") // ErrTaskChannelMismatch reports a request whose bound or requested task // channel does not match the authenticated ingress channel. ErrTaskChannelMismatch = errors.New("network: task channel mismatch") // ErrTaskChannelStale reports a stored task binding that no longer validates // under the current channel grammar. ErrTaskChannelStale = errors.New("network: stale task channel") )
var ( // ErrInvalidEnvelope reports a structurally invalid envelope. ErrInvalidEnvelope = errors.New("network: invalid envelope") // ErrMissingField reports a required protocol field is absent. ErrMissingField = errors.New("network: missing field") // ErrInvalidField reports a present field violates protocol rules. ErrInvalidField = errors.New("network: invalid field") // ErrInvalidKind reports an unknown or unsupported message kind. ErrInvalidKind = errors.New("network: invalid kind") // ErrInvalidBody reports a malformed or invalid kind-specific body. ErrInvalidBody = errors.New("network: invalid body") // ErrEnvelopeTooLarge reports an envelope exceeding the protocol size limit. ErrEnvelopeTooLarge = errors.New("network: envelope too large") // ErrExpired reports an envelope that is already expired. ErrExpired = errors.New("network: expired") // ErrReplayTooOld reports an envelope outside the receiver replay window. ErrReplayTooOld = errors.New("network: replay window exceeded") // ErrVerificationFailed reports a syntactically valid envelope whose // integrity checks failed. ErrVerificationFailed = errors.New("network: verification failed") // ErrLegacyFieldRejected reports an obsolete hard-cut wire field. ErrLegacyFieldRejected = errors.New("network: legacy field rejected") // ErrDirectRoomCollision reports that a direct_id is bound to a different // channel peer pair than its deterministic identity permits. ErrDirectRoomCollision = errors.New("network: direct room collision") )
Functions ¶
func BroadcastSubject ¶
BroadcastSubject builds the workspace-qualified broadcast subject for one channel.
func DirectRoomIdentity ¶
func DirectRoomIdentity( workspaceID string, channel string, localPeer string, remotePeer string, ) (string, string, string, error)
DirectRoomIdentity derives the stable two-party direct room identity scoped to one workspace channel.
func DirectSubject ¶
DirectSubject builds the workspace-qualified direct subject for one target peer.
func IsTerminalState ¶
IsTerminalState reports whether the state is terminal under the RFC.
func NormalizeAuditEntry ¶
func NormalizeAuditEntry( sessionID string, direction string, envelope Envelope, reason string, at time.Time, ) (store.NetworkAuditEntry, error)
NormalizeAuditEntry derives a consistent audit row from envelope metadata.
func NormalizeDirectRoomPeers ¶
NormalizeDirectRoomPeers validates, rejects same-peer rooms, and returns peers in their stable direct-room storage order.
func PreviewTextForRawBody ¶
func PreviewTextForRawBody(kind Kind, raw json.RawMessage) string
PreviewTextForRawBody derives operator-facing preview text from one raw persisted message body. Invalid bodies return an empty preview.
func ResolveGreetSummary ¶
ResolveGreetSummary returns a deterministic operator-facing summary for one greet advertisement.
func RouteToken ¶
RouteToken derives the deterministic NATS route token for one peer.
func ValidateChannel ¶
ValidateChannel reports whether the channel matches the RFC grammar.
func ValidateConversationID ¶
ValidateConversationID reports whether a container identifier matches its field grammar.
func ValidateConversationRef ¶
func ValidateConversationRef(ref ConversationRef) error
ValidateConversationRef reports whether a conversation reference identifies exactly one container.
func ValidateDirectRoomBinding ¶
func ValidateDirectRoomBinding(workspaceID string, channel string, directID string, peerA string, peerB string) error
ValidateDirectRoomBinding proves that an existing direct room row matches the deterministic identity for its workspace/channel-scoped peer pair.
func ValidateDirectRoomPeers ¶
ValidateDirectRoomPeers reports whether two peer IDs form a valid two-party direct room.
func ValidateEnvelope ¶
func ValidateEnvelope(env Envelope, opts ValidateOptions) error
ValidateEnvelope validates one envelope without returning a normalized copy.
func ValidateEnvelopeConversation ¶
ValidateEnvelopeConversation enforces kind-specific container and work fields.
func ValidatePeerID ¶
ValidatePeerID reports whether the peer identifier matches the RFC grammar.
func ValidateSurface ¶
ValidateSurface reports whether the surface matches the RFC conversation values.
func ValidateWorkID ¶
ValidateWorkID reports whether a work id can safely cross the network boundary.
func ValidateWorkState ¶
ValidateWorkState reports whether the state is a known work lifecycle state.
func ValidateWorkTransition ¶
ValidateWorkTransition reports whether a trace may advance work from one state to another.
func ValidateWorkspaceID ¶
ValidateWorkspaceID reports whether the workspace identity can safely occupy one NATS subject token and one protocol envelope field.
Types ¶
type AuditStore ¶
type AuditStore interface {
WriteNetworkAudit(ctx context.Context, entry store.NetworkAuditEntry) error
}
AuditStore is the persistence surface consumed by the network audit writer.
type AuditWriter ¶
type AuditWriter interface {
RecordSent(ctx context.Context, sessionID string, envelope Envelope) error
RecordReceived(ctx context.Context, sessionID string, envelope Envelope) error
RecordRejected(ctx context.Context, sessionID string, envelope Envelope, reason string) error
RecordDelivered(ctx context.Context, sessionID string, envelope Envelope) error
}
AuditWriter records network activity into the configured sinks.
type AuditWriterOption ¶
type AuditWriterOption func(*FileAuditWriter)
type Body ¶
type Body interface {
Kind() Kind
}
Body is the typed representation of one envelope body.
func DecodeBody ¶
func DecodeBody(kind Kind, raw json.RawMessage) (Body, error)
DecodeBody parses and validates one kind-specific envelope body.
type CapabilityBody ¶
type CapabilityBody struct {
Capability CapabilityEnvelopePayload `json:"capability"`
}
CapabilityBody carries or advertises one transferable capability artifact.
func (CapabilityBody) Kind ¶
func (CapabilityBody) Kind() Kind
Kind returns the wire kind for the body.
type CapabilityEnvelopePayload ¶
type CapabilityEnvelopePayload struct {
ID string `json:"id"`
Summary string `json:"summary"`
Outcome string `json:"outcome"`
Version string `json:"version,omitempty"`
Digest string `json:"digest"`
ContextNeeded []string `json:"context_needed,omitempty"`
ArtifactsExpected []string `json:"artifacts_expected,omitempty"`
ExecutionOutline []string `json:"execution_outline,omitempty"`
Constraints []string `json:"constraints,omitempty"`
Examples []string `json:"examples,omitempty"`
Requirements []string `json:"requirements,omitempty"`
}
CapabilityEnvelopePayload is the transferable unified capability document.
type ChannelInfo ¶
ChannelInfo summarizes one active runtime channel.
type ConversationRef ¶
type ConversationRef struct {
WorkspaceID string
Channel string
Surface Surface
ThreadID string
DirectID string
}
ConversationRef identifies exactly one conversation container.
func ConversationRefFromEnvelope ¶
func ConversationRefFromEnvelope(env Envelope) (ConversationRef, error)
ConversationRefFromEnvelope returns the validated container reference for a conversation envelope.
func (ConversationRef) ContainerKey ¶
func (r ConversationRef) ContainerKey() string
ContainerKey returns a stable workspace/channel/surface/container key.
func (ConversationRef) IsDirect ¶
func (r ConversationRef) IsDirect() bool
IsDirect reports whether the reference targets a direct room.
func (ConversationRef) IsThread ¶
func (r ConversationRef) IsThread() bool
IsThread reports whether the reference targets a public thread.
func (ConversationRef) Validate ¶
func (r ConversationRef) Validate() error
Validate reports whether the reference identifies exactly one container.
type Envelope ¶
type Envelope struct {
Protocol string `json:"protocol"`
ID string `json:"id"`
WorkspaceID string `json:"workspace_id"`
Kind Kind `json:"kind"`
Channel string `json:"channel"`
Surface *Surface `json:"surface,omitempty"`
ThreadID *string `json:"thread_id,omitempty"`
DirectID *string `json:"direct_id,omitempty"`
From string `json:"from"`
To *string `json:"to,omitempty"`
WorkID *string `json:"work_id,omitempty"`
ReplyTo *string `json:"reply_to,omitempty"`
TraceID *string `json:"trace_id,omitempty"`
CausationID *string `json:"causation_id,omitempty"`
TS int64 `json:"ts"`
ExpiresAt *int64 `json:"expires_at,omitempty"`
Body json.RawMessage `json:"body"`
Proof *Proof `json:"proof"`
Ext ExtensionMap `json:"ext,omitempty"`
}
Envelope is the shared AGH Network v0 wire envelope.
func NormalizeEnvelope ¶
func NormalizeEnvelope(env Envelope, opts ValidateOptions) (Envelope, error)
NormalizeEnvelope trims identifier fields, validates the envelope, and returns a safe cloned copy for downstream use.
func ParseEnvelope ¶
func ParseEnvelope(data []byte, opts ValidateOptions) (Envelope, error)
ParseEnvelope decodes, validates, and normalizes one raw envelope.
func (Envelope) DecodeBody ¶
DecodeBody parses and validates the envelope body using the envelope kind.
func (Envelope) IsBroadcast ¶
IsBroadcast reports whether the envelope is channel-broadcast.
func (Envelope) IsDirected ¶
IsDirected reports whether the envelope targets a specific peer.
func (*Envelope) UnmarshalJSON ¶
UnmarshalJSON rejects obsolete hard-cut wire fields before decoding.
type ExtensionMap ¶
type ExtensionMap map[string]json.RawMessage
ExtensionMap preserves opaque extension payloads without interpreting them.
type FileAuditWriter ¶
type FileAuditWriter struct {
// contains filtered or unexported fields
}
FileAuditWriter writes normalized network audit records to a JSONL file and optionally mirrors them into a persistent store.
func NewAuditWriter ¶
func NewAuditWriter(path string, auditStore AuditStore, opts ...AuditWriterOption) (*FileAuditWriter, error)
NewAuditWriter constructs the dual-path network audit writer.
func (*FileAuditWriter) RecordDelivered ¶
func (w *FileAuditWriter) RecordDelivered(ctx context.Context, sessionID string, envelope Envelope) error
RecordDelivered stores a delivered network audit record.
func (*FileAuditWriter) RecordReceived ¶
func (w *FileAuditWriter) RecordReceived(ctx context.Context, sessionID string, envelope Envelope) error
RecordReceived stores a received network audit record.
func (*FileAuditWriter) RecordRejected ¶
func (w *FileAuditWriter) RecordRejected( ctx context.Context, sessionID string, envelope Envelope, reason string, ) error
RecordRejected stores a rejected network audit record.
func (*FileAuditWriter) RecordSent ¶
func (w *FileAuditWriter) RecordSent(ctx context.Context, sessionID string, envelope Envelope) error
RecordSent stores a sent network audit record.
func (*FileAuditWriter) RecordTaskIngress ¶
func (w *FileAuditWriter) RecordTaskIngress(ctx context.Context, audit TaskIngressAudit) error
RecordTaskIngress stores one accepted or rejected task-ingress audit record using the existing network audit sinks.
type GreetBody ¶
type GreetBody struct {
PeerCard PeerCard `json:"peer_card"`
Summary string `json:"summary,omitempty"`
}
GreetBody advertises peer presence and capabilities in a channel.
type Heartbeat ¶
type Heartbeat struct {
// contains filtered or unexported fields
}
Heartbeat owns one periodic greet publisher.
type HookDispatcher ¶
type HookDispatcher interface {
DispatchNetworkPeerJoined(
context.Context,
hookspkg.NetworkPeerJoinedPayload,
) (hookspkg.NetworkPeerJoinedPayload, error)
DispatchNetworkPeerLeft(
context.Context,
hookspkg.NetworkPeerLeftPayload,
) (hookspkg.NetworkPeerLeftPayload, error)
DispatchNetworkThreadOpened(
context.Context,
hookspkg.NetworkThreadOpenedPayload,
) (hookspkg.NetworkThreadOpenedPayload, error)
DispatchNetworkDirectRoomOpened(
context.Context,
hookspkg.NetworkDirectRoomOpenedPayload,
) (hookspkg.NetworkDirectRoomOpenedPayload, error)
DispatchNetworkMessagePersisted(
context.Context,
hookspkg.NetworkMessagePersistedPayload,
) (hookspkg.NetworkMessagePersistedPayload, error)
DispatchNetworkWorkOpened(
context.Context,
hookspkg.NetworkWorkOpenedPayload,
) (hookspkg.NetworkWorkOpenedPayload, error)
DispatchNetworkWorkTransitioned(
context.Context,
hookspkg.NetworkWorkTransitionedPayload,
) (hookspkg.NetworkWorkTransitionedPayload, error)
DispatchNetworkWorkClosed(
context.Context,
hookspkg.NetworkWorkClosedPayload,
) (hookspkg.NetworkWorkClosedPayload, error)
}
HookDispatcher observes committed network conversation state changes.
type KindMetric ¶
KindMetric is the runtime per-kind network activity snapshot surfaced by status APIs.
type LifecycleAction ¶
type LifecycleAction string
LifecycleAction explains how a lifecycle helper handled one message.
const ( LifecycleActionOpened LifecycleAction = "opened" LifecycleActionAdvanced LifecycleAction = "advanced" LifecycleActionUnchanged LifecycleAction = "unchanged" LifecycleActionIgnored LifecycleAction = "ignored" LifecycleActionRejectWork LifecycleAction = "reject_work" )
type LifecycleResult ¶
type LifecycleResult struct {
Work Work
Action LifecycleAction
ReasonCode *ReasonCode
}
LifecycleResult is the reusable lifecycle decision surface for router code.
func ApplyWorkEnvelope ¶
ApplyWorkEnvelope applies one validated lifecycle envelope to the current work state and returns the router-facing decision.
type LocalPeer ¶
type LocalPeer struct {
SessionID string
PeerID string
WorkspaceID string
Channel string
PeerCard PeerCard
CapabilityCatalog []sessionpkg.NetworkPeerCapability
JoinedAt time.Time
}
LocalPeer is one daemon-local peer joined to one runtime channel.
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
Manager owns transport, routing, presence, delivery, and the late-bound session lifecycle callbacks required by daemon boot integration.
func NewManager ¶
func NewManager( ctx context.Context, cfg aghconfig.NetworkConfig, prompter deliveryPrompter, auditPath string, auditStore AuditStore, opts ...ManagerOption, ) (*Manager, error)
NewManager constructs the top-level network runtime and starts the embedded transport it owns.
func (*Manager) CancelTaskFromPeer ¶
func (m *Manager) CancelTaskFromPeer( ctx context.Context, ingress TaskIngressContext, taskID string, req taskpkg.CancelTask, ) (*taskpkg.Task, error)
CancelTaskFromPeer requests manager-owned task cancellation after validating the authenticated peer context and task channel binding.
func (*Manager) CreateTaskFromPeer ¶
func (m *Manager) CreateTaskFromPeer( ctx context.Context, ingress TaskIngressContext, spec taskpkg.CreateTask, ) (*taskpkg.Task, error)
CreateTaskFromPeer creates one task on behalf of an authenticated network peer after channel and capability validation succeed.
func (*Manager) EnqueueRunFromPeer ¶
func (m *Manager) EnqueueRunFromPeer( ctx context.Context, ingress TaskIngressContext, spec taskpkg.EnqueueRun, ) (*taskpkg.Run, error)
EnqueueRunFromPeer enqueues one task run from an authenticated network peer while preserving origin-scoped idempotency inside the task manager.
func (*Manager) JoinChannel ¶
func (m *Manager) JoinChannel(ctx context.Context, join sessionpkg.NetworkPeerJoin) error
JoinChannel registers one daemon-local session as a visible network peer.
func (*Manager) LeaveChannel ¶
LeaveChannel removes one daemon-local session from the active network runtime.
func (*Manager) ListChannels ¶
ListChannels returns the currently active runtime channels.
func (*Manager) ListPeers ¶
func (m *Manager) ListPeers(ctx context.Context, workspaceID string, channel string) ([]PeerInfo, error)
ListPeers returns the current visible local+remote peer snapshot.
func (*Manager) OnTurnEnd ¶
OnTurnEnd wakes the per-session delivery worker after a prompt turn finishes.
func (*Manager) Status ¶
Status returns a safe diagnostics snapshot without exposing transport credentials.
func (*Manager) UpdateTaskFromPeer ¶
func (m *Manager) UpdateTaskFromPeer( ctx context.Context, ingress TaskIngressContext, taskID string, patch taskpkg.Patch, ) (*taskpkg.Task, error)
UpdateTaskFromPeer applies one mutable task patch through the task manager after enforcing channel-bound ingress rules.
type ManagerOption ¶
type ManagerOption func(*managerOptions)
ManagerOption customizes network manager construction.
func WithManagerAuditWriter ¶
func WithManagerAuditWriter(auditor AuditWriter) ManagerOption
WithManagerAuditWriter injects a custom audit sink, primarily for tests.
func WithManagerClock ¶
func WithManagerClock(now func() time.Time) ManagerOption
WithManagerClock overrides the manager clock, primarily for tests.
func WithManagerConversationStore ¶
func WithManagerConversationStore(conversations store.NetworkConversationStore) ManagerOption
WithManagerConversationStore injects the durable conversation repository used as the commit boundary before runtime delivery side effects.
func WithManagerHookDispatcher ¶
func WithManagerHookDispatcher(dispatcher HookDispatcher) ManagerOption
WithManagerHookDispatcher injects the network hook dispatcher.
func WithManagerLogger ¶
func WithManagerLogger(logger *slog.Logger) ManagerOption
WithManagerLogger overrides the logger used by the network manager.
func WithManagerTaskService ¶
func WithManagerTaskService(tasks TaskService) ManagerOption
WithManagerTaskService injects the daemon-owned task manager used for authenticated network task ingress.
type MetricSample ¶
MetricSample is one low-cardinality runtime network metric sample.
type PeerCard ¶
type PeerCard struct {
PeerID string `json:"peer_id"`
DisplayName *string `json:"display_name,omitempty"`
ProfilesSupported []string `json:"profiles_supported"`
Capabilities []string `json:"capabilities"`
ArtifactsSupported []string `json:"artifacts_supported"`
TrustModesSupported []string `json:"trust_modes_supported"`
Ext ExtensionMap `json:"ext,omitempty"`
}
PeerCard advertises one peer's identity and capabilities.
func DefaultPeerCard ¶
DefaultPeerCard returns the minimal protocol peer card for one peer identifier.
type PeerInfo ¶
type PeerInfo struct {
SessionID *string
PeerID string
WorkspaceID string
Channel string
Local bool
PeerCard PeerCard
CapabilityCatalog []sessionpkg.NetworkPeerCapability
CapabilityCatalogKnown bool
JoinedAt *time.Time
LastSeen *time.Time
ExpiresAt *time.Time
PresenceState PresenceState
LastSeenAgeSeconds *int64
}
PeerInfo is the API-facing snapshot for one visible peer.
type PeerLifecycleEvent ¶
type PeerLifecycleEvent struct {
Kind PeerLifecycleKind
Peer PeerInfo
Timestamp time.Time
}
PeerLifecycleEvent is the runtime-local representation of a peer presence transition. Transports convert it to hooks or event summaries; it is not part of the wire protocol.
type PeerLifecycleKind ¶
type PeerLifecycleKind string
PeerLifecycleKind describes one peer membership transition observed by the router or manager lifecycle.
const ( PeerLifecycleJoined PeerLifecycleKind = "joined" PeerLifecycleLeft PeerLifecycleKind = "left" )
type PeerRegistry ¶
type PeerRegistry struct {
// contains filtered or unexported fields
}
PeerRegistry tracks local session peers plus the remote peer cache.
func NewPeerRegistry ¶
func NewPeerRegistry(greetInterval time.Duration, opts ...PeerRegistryOption) (*PeerRegistry, error)
NewPeerRegistry constructs the in-memory presence registry.
func (*PeerRegistry) ExpireRemotes ¶
func (r *PeerRegistry) ExpireRemotes(at time.Time) []RemotePeerEntry
ExpireRemotes removes expired remotes and reports each peer removed by the same captured time.
func (*PeerRegistry) GreetInterval ¶
func (r *PeerRegistry) GreetInterval() time.Duration
GreetInterval reports the configured presence heartbeat interval.
func (*PeerRegistry) HasPresence ¶
func (r *PeerRegistry) HasPresence(workspaceID string, channel string, peerID string, at time.Time) bool
HasPresence reports whether the peer is visible and unexpired in the given channel.
func (*PeerRegistry) LeaveLocal ¶
func (r *PeerRegistry) LeaveLocal(sessionID string) (LocalPeer, bool)
LeaveLocal removes one local session peer from the registry.
func (*PeerRegistry) ListChannels ¶
func (r *PeerRegistry) ListChannels(workspaceID string, at time.Time) []ChannelInfo
ListChannels returns active runtime channels plus current peer counts.
func (*PeerRegistry) ListPeers ¶
ListPeers returns visible peers, optionally filtered to one workspace channel.
func (*PeerRegistry) LocalByPeer ¶
func (r *PeerRegistry) LocalByPeer(workspaceID string, channel string, peerID string) (LocalPeer, bool)
LocalByPeer resolves one local peer by workspace, channel, and peer ID.
func (*PeerRegistry) LocalBySession ¶
func (r *PeerRegistry) LocalBySession(sessionID string) (LocalPeer, bool)
LocalBySession resolves one local peer by session ID.
func (*PeerRegistry) LocalPeers ¶
func (r *PeerRegistry) LocalPeers(workspaceID string, channel string) []LocalPeer
LocalPeers returns the local peers currently joined to one workspace channel.
func (*PeerRegistry) LookupPresence ¶
func (r *PeerRegistry) LookupPresence( workspaceID string, channel string, peerID string, at time.Time, ) (PeerInfo, bool)
LookupPresence resolves one peer from the local registry first, then the remote cache.
func (*PeerRegistry) MatchLocalPeers ¶
func (r *PeerRegistry) MatchLocalPeers(workspaceID string, channel string, query string) []LocalPeer
MatchLocalPeers returns local peers matching one whois query.
func (*PeerRegistry) RefreshRemote ¶
func (r *PeerRegistry) RefreshRemote( workspaceID string, channel string, card PeerCard, seenAt time.Time, ) (RemotePeerEntry, bool, error)
RefreshRemote stores or refreshes one remote peer advertisement.
func (*PeerRegistry) RefreshRemoteDetailed ¶
func (r *PeerRegistry) RefreshRemoteDetailed( workspaceID string, channel string, card PeerCard, capabilityCatalog []sessionpkg.NetworkPeerCapability, capabilityCatalogKnown bool, seenAt time.Time, ) (RemoteRefreshResult, error)
RefreshRemoteDetailed stores or refreshes one remote peer advertisement and reports lifecycle changes caused by the same registry mutation.
func (*PeerRegistry) RefreshRemoteWithCapabilityCatalog ¶
func (r *PeerRegistry) RefreshRemoteWithCapabilityCatalog( workspaceID string, channel string, card PeerCard, capabilityCatalog []sessionpkg.NetworkPeerCapability, capabilityCatalogKnown bool, seenAt time.Time, ) (RemotePeerEntry, bool, error)
RefreshRemoteWithCapabilityCatalog stores or refreshes one remote peer advertisement plus optional rich capability discovery state learned via explicit whois responses.
func (*PeerRegistry) RegisterLocal ¶
func (r *PeerRegistry) RegisterLocal( sessionID string, workspaceID string, channel string, card PeerCard, joinedAt time.Time, ) (LocalPeer, error)
RegisterLocal upserts one local peer membership keyed by session ID.
func (*PeerRegistry) RegisterLocalWithCapabilityCatalog ¶
func (r *PeerRegistry) RegisterLocalWithCapabilityCatalog( sessionID string, workspaceID string, channel string, card PeerCard, capabilityCatalog []sessionpkg.NetworkPeerCapability, joinedAt time.Time, ) (LocalPeer, error)
RegisterLocalWithCapabilityCatalog upserts one local peer membership keyed by session ID, optionally retaining the runtime-owned rich capability catalog for explicit whois discovery.
func (*PeerRegistry) RemoteByPeer ¶
func (r *PeerRegistry) RemoteByPeer( workspaceID string, channel string, peerID string, at time.Time, ) (RemotePeerEntry, bool)
RemoteByPeer resolves one active remote peer entry.
type PeerRegistryOption ¶
type PeerRegistryOption func(*PeerRegistry)
PeerRegistryOption customizes the registry runtime.
func WithPeerRegistryClock ¶
func WithPeerRegistryClock(now func() time.Time) PeerRegistryOption
WithPeerRegistryClock overrides the time source used by the registry.
type Presence ¶
type Presence struct {
State PresenceState
LastSeenAgeSeconds *int64
}
Presence captures derived activity state without introducing a second source of truth beyond PeerRegistry timestamps.
type PresenceState ¶
type PresenceState string
PresenceState is the daemon-derived activity state for one network peer.
const ( PresenceStateLocal PresenceState = "local" PresenceStateActive PresenceState = "active" PresenceStateInactive PresenceState = "inactive" PresenceStateExpired PresenceState = "expired" PresenceStateUnknown PresenceState = "unknown" )
type Proof ¶
type Proof map[string]json.RawMessage
Proof preserves the opaque protocol proof payload for forward compatibility.
type ReasonCode ¶
type ReasonCode string
ReasonCode identifies one registered protocol rejection reason.
const ( ReasonCodeMalformed ReasonCode = "malformed" ReasonCodeExpired ReasonCode = "expired" ReasonCodeDuplicate ReasonCode = "duplicate" ReasonCodeUnsupportedKind ReasonCode = "unsupported_kind" ReasonCodeUnsupportedProfile ReasonCode = "unsupported_profile" ReasonCodeVerificationFailed ReasonCode = "verification_failed" ReasonCodeNotTarget ReasonCode = "not_target" ReasonCodeNotFound ReasonCode = "not_found" ReasonCodeBusy ReasonCode = "busy" ReasonCodeInternal ReasonCode = "internal" ReasonCodeInvalidSurface ReasonCode = "invalid_surface" ReasonCodeConversationNotFound ReasonCode = "conversation_not_found" ReasonCodeWorkClosed ReasonCode = "work_closed" ReasonCodeWorkContainerMismatch ReasonCode = "work_container_mismatch" ReasonCodeLegacyFieldRejected ReasonCode = "legacy_field_rejected" )
func (ReasonCode) Validate ¶
func (r ReasonCode) Validate() error
Validate reports whether the reason code belongs to the v0 registry.
type ReceiptBody ¶
type ReceiptBody struct {
ForID string `json:"for_id"`
Status ReceiptStatus `json:"status"`
ReasonCode *ReasonCode `json:"reason_code,omitempty"`
Detail *string `json:"detail,omitempty"`
}
ReceiptBody acknowledges or rejects protocol-level admission.
type ReceiptStatus ¶
type ReceiptStatus string
ReceiptStatus identifies one receipt admission status.
const ( ReceiptStatusAccepted ReceiptStatus = "accepted" ReceiptStatusRejected ReceiptStatus = "rejected" ReceiptStatusDuplicate ReceiptStatus = "duplicate" ReceiptStatusExpired ReceiptStatus = "expired" ReceiptStatusUnsupported ReceiptStatus = "unsupported" ReceiptStatusCanceled ReceiptStatus = "canceled" )
func (ReceiptStatus) Validate ¶
func (s ReceiptStatus) Validate() error
Validate reports whether the receipt status is documented by the RFC.
type RemotePeerEntry ¶
type RemotePeerEntry struct {
PeerID string
PeerCard PeerCard
WorkspaceID string
Channel string
CapabilityCatalog []sessionpkg.NetworkPeerCapability
CapabilityCatalogKnown bool
LastSeen time.Time
ExpiresAt time.Time
}
RemotePeerEntry is one cached remote peer advertisement.
type RemoteRefreshResult ¶
type RemoteRefreshResult struct {
Entry RemotePeerEntry
Stored bool
Joined bool
Expired []RemotePeerEntry
}
RemoteRefreshResult reports one remote greet cache mutation plus any peers expired while applying the same captured time.
type RouteResult ¶
type RouteResult struct {
Envelope *Envelope
Deliveries []Delivery
Generated []Envelope
PeerEvents []PeerLifecycleEvent
Duplicate bool
Ignored bool
Rejected bool
ReasonCode *ReasonCode
}
RouteResult is the router decision for one inbound envelope.
type Router ¶
type Router struct {
// contains filtered or unexported fields
}
Router handles outbound subject selection plus inbound receiver policy.
func NewRouter ¶
func NewRouter( peers *PeerRegistry, transport RouterTransport, maxReplayAge time.Duration, opts ...RouterOption, ) (*Router, error)
NewRouter constructs the routing runtime on top of a peer registry.
func (*Router) PrepareSend ¶
func (r *Router) PrepareSend(ctx context.Context, req SendRequest) (SendResult, error)
PrepareSend validates one outbound request and computes the publish subject without performing transport side effects.
func (*Router) PublishGreet ¶
func (r *Router) PublishGreet(ctx context.Context, sessionID string, summary string) (SendResult, error)
PublishGreet advertises one local peer card to its joined channel.
func (*Router) PublishPrepared ¶
func (r *Router) PublishPrepared(ctx context.Context, prepared SendResult) (SendResult, error)
PublishPrepared publishes a previously prepared envelope and syncs the local lifecycle cache only after the transport accepts the publish.
func (*Router) Receive ¶
Receive validates one inbound envelope, updates presence, and returns delivery decisions.
func (*Router) Send ¶
func (r *Router) Send(ctx context.Context, req SendRequest) (SendResult, error)
Send validates one outbound request, enforces presence preflight, and publishes it.
type RouterOption ¶
type RouterOption func(*Router)
RouterOption customizes router construction.
func WithRouterClock ¶
func WithRouterClock(now func() time.Time) RouterOption
WithRouterClock overrides the clock used for send and receive decisions.
type RouterTransport ¶
type RouterTransport interface {
Publish(ctx context.Context, subject string, payload []byte) error
}
RouterTransport is the narrow publish surface consumed by the router.
type SayBody ¶
type SayBody struct {
Text string `json:"text"`
Artifacts []json.RawMessage `json:"artifacts,omitempty"`
Intent string `json:"intent,omitempty"`
}
SayBody carries broadcast chat-first communication.
type SendRequest ¶
type SendRequest struct {
SessionID string
WorkspaceID string
Channel string
Surface *Surface
ThreadID *string
DirectID *string
Kind Kind
To *string
Body json.RawMessage
WorkID *string
ReplyTo *string
TraceID *string
CausationID *string
ExpiresAt *int64
ID *string
Ext ExtensionMap
}
SendRequest carries one caller-supplied outbound envelope request.
type SendResult ¶
SendResult summarizes one outbound publish.
type Status ¶
type Status struct {
Enabled bool
Status string
ListenerHost string
ListenerPort int
LocalPeers int
RemotePeers int
Channels int
QueuedMessages int
QueuedSessions int
DeliveryWorkers int
DeliveryQueueDepth int
MessagesSent int64
MessagesReceived int64
MessagesRejected int64
MessagesDelivered int64
WorkflowTaggedEvents int64
HandoffTaggedEvents int64
OpenThreads int64
OpenDirectRooms int64
OpenWorkItems int64
ConversationMessages int64
WorkTransitions int64
DirectResolves int64
LastDisconnect string
KindMetrics []KindMetric
Metrics []MetricSample
}
Status is the manager-facing diagnostics snapshot consumed by daemon status and later transport surfaces.
type Surface ¶
type Surface string
Surface identifies the conversation container class for one message.
type TaskIngressAudit ¶
type TaskIngressAudit struct {
Action string
Direction string
PeerID string
WorkspaceID string
Channel string
RequestID string
Reason string
Payload any
}
TaskIngressAudit captures one task-domain ingress decision originating from a validated network peer.
type TaskIngressAuditWriter ¶
type TaskIngressAuditWriter interface {
RecordTaskIngress(ctx context.Context, audit TaskIngressAudit) error
}
TaskIngressAuditWriter is the optional audit extension used by task-aware network ingress. Existing protocol-message auditing remains unchanged.
type TaskIngressContext ¶
type TaskIngressContext struct {
WorkspaceID string
PeerID string
Channel string
RequestID string
Surface Surface
ThreadID string
DirectID string
WorkID string
ReplyTo string
TraceID string
CausationID string
}
TaskIngressContext captures the trusted peer identity and delivery metadata that network ingress derives from the live runtime rather than the payload.
func (TaskIngressContext) Validate ¶
func (c TaskIngressContext) Validate() error
Validate reports whether the ingress context contains the mandatory peer and delivery identifiers.
type TaskService ¶
type TaskService interface {
GetTask(ctx context.Context, id string, actor taskpkg.ActorContext) (*taskpkg.View, error)
CreateTask(ctx context.Context, spec taskpkg.CreateTask, actor taskpkg.ActorContext) (*taskpkg.Task, error)
UpdateTask(
ctx context.Context,
id string,
patch taskpkg.Patch,
actor taskpkg.ActorContext,
) (*taskpkg.Task, error)
CancelTask(
ctx context.Context,
id string,
req taskpkg.CancelTask,
actor taskpkg.ActorContext,
) (*taskpkg.Task, error)
EnqueueRun(ctx context.Context, spec taskpkg.EnqueueRun, actor taskpkg.ActorContext) (*taskpkg.Run, error)
}
TaskService is the narrowed task-domain surface consumed by network ingress.
type TraceBody ¶
type TraceBody struct {
State WorkState `json:"state"`
Message string `json:"message,omitempty"`
Result json.RawMessage `json:"result,omitempty"`
ArtifactRefs []json.RawMessage `json:"artifact_refs,omitempty"`
}
TraceBody reports progress or terminal outcome for work.
type Transport ¶
type Transport struct {
// contains filtered or unexported fields
}
Transport owns the embedded NATS server plus the daemon's in-process connection.
func NewTransport ¶
func NewTransport(ctx context.Context, cfg aghconfig.NetworkConfig, opts ...TransportOption) (*Transport, error)
NewTransport starts an embedded NATS server and connects the daemon to it using an in-process, token-authenticated connection.
func (*Transport) Drain ¶
Drain closes the daemon connection cleanly before the server is shut down.
func (*Transport) Port ¶
Port reports the resolved listener port. Random-port servers return the actual chosen port after startup.
type TransportOption ¶
type TransportOption func(*transportOptions)
TransportOption customizes embedded transport startup behavior.
func WithTransportDisconnectHandler ¶
func WithTransportDisconnectHandler(handler func(error)) TransportOption
WithTransportDisconnectHandler registers a disconnect callback for later manager wiring.
func WithTransportLogger ¶
func WithTransportLogger(logger *slog.Logger) TransportOption
WithTransportLogger overrides the logger used by the transport.
func WithTransportPublishTimeout ¶
func WithTransportPublishTimeout(timeout time.Duration) TransportOption
WithTransportPublishTimeout overrides the publish flush timeout when the caller does not provide a deadline.
func WithTransportReadyTimeout ¶
func WithTransportReadyTimeout(timeout time.Duration) TransportOption
WithTransportReadyTimeout overrides the server readiness timeout.
func WithTransportReconnectHandler ¶
func WithTransportReconnectHandler(handler func()) TransportOption
WithTransportReconnectHandler registers a reconnect callback for later manager wiring.
type ValidateOptions ¶
ValidateOptions configures envelope validation and normalization.
type WhoisBody ¶
type WhoisBody struct {
Type WhoisType `json:"type"`
Query string `json:"query,omitempty"`
PeerCard *PeerCard `json:"peer_card,omitempty"`
}
WhoisBody requests or returns peer card information.
type WhoisType ¶
type WhoisType string
WhoisType identifies the request or response shape for `whois`.
type Work ¶
type Work struct {
ID string
Ref ConversationRef
Initiator string
Target string
State WorkState
CreatedAt time.Time
UpdatedAt time.Time
TerminalAt *time.Time
}
Work tracks one directed work inside one channel.
func (Work) IsParticipant ¶
IsParticipant reports whether the peer owns the work lifecycle.