boid

package
v1.0.0-alpha.17 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 8, 2026 License: MIT Imports: 14 Imported by: 0

Documentation

Overview

Package boid implements Boids-inspired local coordination rules for multi-agent teams.

This package provides three local rules based on Craig Reynolds' Boids simulation (1986):

  • Cohesion: Steer toward high-centrality nodes in the active subgraph matching the role's objective
  • Alignment: Match traversal direction of same-role agents
  • Separation: Avoid overlapping k-hop neighborhoods with other agents

Architecture

Agent positions are tracked in the AGENT_POSITIONS KV bucket. Rules watch this bucket and evaluate local coordination signals based on graph topology. Signals are published to agent.boid.<loopID> subjects for consumption by the agentic-loop.

Graph Topology as Coordination Substrate

Unlike spatial Boids, which use Euclidean distance, these rules use the knowledge graph topology directly:

  • Proximity: k-hop distance via PivotIndex.IsWithinHops()
  • Center of mass: PageRank centrality over active subgraph
  • Heading: Traversal direction (relationship types being followed)
  • Flock boundaries: Community membership

Configuration

Rules are configured via JSON with role-specific thresholds:

{
  "type": "boid",
  "metadata": {
    "boid_rule": "separation",
    "role_thresholds": {
      "general": 2,
      "architect": 3
    }
  }
}

Research Context

This is an experimental implementation for validating the hypothesis that explicit workflow choreography can be replaced by intent-weighted local rules for certain agent team configurations. See docs/research/boids-hypothesis.md for details.

Index

Constants

View Source
const (
	// Domain identifies boid-related messages.
	Domain = "boid"

	// CategoryPosition is the category for agent position updates.
	CategoryPosition = "position"

	// CategorySignal is the category for steering signals.
	CategorySignal = "signal"

	// SchemaVersion is the current schema version.
	SchemaVersion = "v1"

	// KVBucketAgentPositions is the KV bucket name for agent positions.
	KVBucketAgentPositions = "AGENT_POSITIONS"
)

Constants for the boid domain and message types.

View Source
const (
	// RuleTypeSeparation identifies separation rules.
	RuleTypeSeparation = "separation"

	// RuleTypeCohesion identifies cohesion rules.
	RuleTypeCohesion = "cohesion"

	// RuleTypeAlignment identifies alignment rules.
	RuleTypeAlignment = "alignment"
)

Boid rule type identifiers.

View Source
const (
	SignalTypeSeparation = "separation"
	SignalTypeCohesion   = "cohesion"
	SignalTypeAlignment  = "alignment"
)

Signal type identifiers.

View Source
const (
	// DefaultSeparationThreshold is the default k-hop distance for separation rules.
	DefaultSeparationThreshold = 2

	// DefaultSteeringStrength is the default influence strength (0.0-1.0).
	DefaultSteeringStrength = 0.5

	// DefaultAlignmentWindow is the default number of recent traversals to consider.
	DefaultAlignmentWindow = 5

	// DefaultCentralityWeight is the default weight for centrality attraction.
	DefaultCentralityWeight = 0.7
)

Default configuration values.

Variables

This section is empty.

Functions

func CalculateVelocity

func CalculateVelocity(oldFocus, newFocus []string) float64

CalculateVelocity computes velocity based on position changes. Velocity is a normalized measure of how much the focus entities changed.

Types

type AgentPosition

type AgentPosition struct {
	// LoopID is the unique identifier for the agentic loop.
	LoopID string `json:"loop_id"`

	// Role is the agent's role (general, architect, editor, reviewer).
	Role string `json:"role"`

	// FocusEntities are the entity IDs the agent is currently working on.
	FocusEntities []string `json:"focus_entities"`

	// TraversalVector contains the relationship predicates being followed.
	// This represents the agent's "heading" in graph space.
	TraversalVector []string `json:"traversal_vector"`

	// Velocity represents the rate of position change (0.0-1.0).
	// Higher values indicate more active exploration.
	Velocity float64 `json:"velocity"`

	// Iteration is the current loop iteration number.
	Iteration int `json:"iteration"`

	// LastUpdate is when this position was last updated.
	LastUpdate time.Time `json:"last_update"`
}

AgentPosition tracks an agent's current focus and traversal direction in the graph. This is stored in the AGENT_POSITIONS KV bucket, keyed by LoopID.

func (*AgentPosition) MarshalJSON

func (p *AgentPosition) MarshalJSON() ([]byte, error)

MarshalJSON implements json.Marshaler.

func (*AgentPosition) Schema

func (p *AgentPosition) Schema() message.Type

Schema returns the message type schema for AgentPosition.

func (*AgentPosition) UnmarshalJSON

func (p *AgentPosition) UnmarshalJSON(data []byte) error

UnmarshalJSON implements json.Unmarshaler.

func (*AgentPosition) Validate

func (p *AgentPosition) Validate() error

Validate checks that the AgentPosition has required fields and valid values.

type AlignmentRule

type AlignmentRule struct {
	// contains filtered or unexported fields
}

AlignmentRule implements the Boids alignment behavior. It steers agents to match the traversal direction of same-role agents by suggesting common predicate patterns to follow.

func NewAlignmentRule

func NewAlignmentRule(id string, def rule.Definition, config *Config, cooldown time.Duration, logger *slog.Logger) *AlignmentRule

NewAlignmentRule creates a new alignment rule.

func (*AlignmentRule) Evaluate

func (r *AlignmentRule) Evaluate(_ []message.Message) bool

Evaluate is not used for boid rules (they use EvaluateEntityState).

func (*AlignmentRule) EvaluateEntityState

func (r *AlignmentRule) EvaluateEntityState(entityState *gtypes.EntityState) bool

EvaluateEntityState evaluates the alignment rule against an agent's position. Implements the rule.EntityStateEvaluator interface.

func (*AlignmentRule) ExecuteEvents

func (r *AlignmentRule) ExecuteEvents(_ []message.Message) ([]rule.Event, error)

ExecuteEvents is not used for boid rules (signals are generated in EvaluateEntityState).

func (*AlignmentRule) GetPendingSignals

func (r *AlignmentRule) GetPendingSignals() []*SteeringSignal

GetPendingSignals returns and clears the pending signals.

func (*AlignmentRule) Name

func (r *AlignmentRule) Name() string

Name returns the rule name.

func (*AlignmentRule) SetPositionProvider

func (r *AlignmentRule) SetPositionProvider(provider PositionProvider)

SetPositionProvider sets the provider for retrieving other agent positions.

func (*AlignmentRule) Subscribe

func (r *AlignmentRule) Subscribe() []string

Subscribe returns subjects this rule subscribes to. Boid rules watch the AGENT_POSITIONS KV bucket, not NATS subjects.

type CentralityProvider

type CentralityProvider interface {
	// GetPageRankScores returns PageRank scores for a set of entities.
	// Returns a map of entity ID to score (0.0-1.0).
	GetPageRankScores(ctx context.Context, entityIDs []string) (map[string]float64, error)
}

CentralityProvider provides centrality scores for entities.

type CohesionRule

type CohesionRule struct {
	// contains filtered or unexported fields
}

CohesionRule implements the Boids cohesion behavior. It steers agents toward high-centrality nodes in their active subgraph that match their role's objective function.

func NewCohesionRule

func NewCohesionRule(id string, def rule.Definition, config *Config, cooldown time.Duration, logger *slog.Logger) *CohesionRule

NewCohesionRule creates a new cohesion rule.

func (*CohesionRule) Evaluate

func (r *CohesionRule) Evaluate(_ []message.Message) bool

Evaluate is not used for boid rules (they use EvaluateEntityState).

func (*CohesionRule) EvaluateEntityState

func (r *CohesionRule) EvaluateEntityState(entityState *gtypes.EntityState) bool

EvaluateEntityState evaluates the cohesion rule against an agent's position. Implements the rule.EntityStateEvaluator interface.

func (*CohesionRule) ExecuteEvents

func (r *CohesionRule) ExecuteEvents(_ []message.Message) ([]rule.Event, error)

ExecuteEvents is not used for boid rules (signals are generated in EvaluateEntityState).

func (*CohesionRule) GetPendingSignals

func (r *CohesionRule) GetPendingSignals() []*SteeringSignal

GetPendingSignals returns and clears the pending signals.

func (*CohesionRule) Name

func (r *CohesionRule) Name() string

Name returns the rule name.

func (*CohesionRule) SetCentralityProvider

func (r *CohesionRule) SetCentralityProvider(provider CentralityProvider)

SetCentralityProvider sets the provider for centrality scores.

func (*CohesionRule) SetPivotIndex

func (r *CohesionRule) SetPivotIndex(index *structural.PivotIndex)

SetPivotIndex sets the pivot index for finding reachable candidates.

func (*CohesionRule) SetPositionProvider

func (r *CohesionRule) SetPositionProvider(provider PositionProvider)

SetPositionProvider sets the provider for retrieving agent positions.

func (*CohesionRule) Subscribe

func (r *CohesionRule) Subscribe() []string

Subscribe returns subjects this rule subscribes to. Boid rules watch the AGENT_POSITIONS KV bucket, not NATS subjects.

type Config

type Config struct {
	// BoidRule specifies the rule type: separation, cohesion, or alignment.
	BoidRule string `json:"boid_rule"`

	// RoleFilter limits the rule to agents with this role (empty = all roles).
	RoleFilter string `json:"role_filter,omitempty"`

	// RoleThresholds maps role names to k-hop separation thresholds.
	// Used by separation rules to allow different thresholds per role.
	RoleThresholds map[string]int `json:"role_thresholds,omitempty"`

	// SeparationThreshold is the default k-hop distance for separation (if RoleThresholds not set).
	SeparationThreshold int `json:"separation_threshold,omitempty"`

	// CentralityWeight is the weight for centrality attraction in cohesion rules (0.0-1.0).
	CentralityWeight float64 `json:"centrality_weight,omitempty"`

	// AlignmentWindow is the number of recent traversals to consider for alignment.
	AlignmentWindow int `json:"alignment_window,omitempty"`

	// SteeringStrength is how much to influence agent behavior (0.0-1.0).
	SteeringStrength float64 `json:"steering_strength"`

	// Cooldown specifies minimum time between signal emissions.
	Cooldown string `json:"cooldown,omitempty"`
}

Config contains configuration extracted from rule metadata.

func ParseConfig

func ParseConfig(metadata map[string]any) (*Config, error)

ParseConfig extracts Config from rule metadata.

func (*Config) GetSeparationThreshold

func (c *Config) GetSeparationThreshold(role string) int

GetSeparationThreshold returns the separation threshold for a given role.

type PageRankCentralityProvider

type PageRankCentralityProvider struct {
	// contains filtered or unexported fields
}

PageRankCentralityProvider implements CentralityProvider using cached PageRank scores. It computes PageRank periodically based on the configured TTL and caches the results.

func NewPageRankCentralityProvider

func NewPageRankCentralityProvider(provider clustering.Provider, ttl time.Duration, logger *slog.Logger) *PageRankCentralityProvider

NewPageRankCentralityProvider creates a new PageRank-based centrality provider. The provider parameter must implement the clustering.Provider interface for graph access. TTL controls how often PageRank is recomputed (0 means compute on every request).

func (*PageRankCentralityProvider) GetPageRankScores

func (p *PageRankCentralityProvider) GetPageRankScores(ctx context.Context, entityIDs []string) (map[string]float64, error)

GetPageRankScores returns PageRank scores for the specified entities. If the cached scores are stale (older than TTL), they are recomputed first. Returns a map of entity ID to normalized score (0.0-1.0).

func (*PageRankCentralityProvider) Invalidate

func (p *PageRankCentralityProvider) Invalidate()

Invalidate clears the cached scores, forcing a recomputation on next request.

type PositionProvider

type PositionProvider interface {
	Get(ctx context.Context, loopID string) (*AgentPosition, error)
	ListOthers(ctx context.Context, excludeLoopID string) ([]*AgentPosition, error)
}

PositionProvider retrieves agent positions for rule evaluation.

type PositionTracker

type PositionTracker struct {
	// contains filtered or unexported fields
}

PositionTracker manages agent positions in the AGENT_POSITIONS KV bucket.

func NewPositionTracker

func NewPositionTracker(kv jetstream.KeyValue, logger *slog.Logger) *PositionTracker

NewPositionTracker creates a new position tracker.

func (*PositionTracker) Delete

func (t *PositionTracker) Delete(ctx context.Context, loopID string) error

Delete removes an agent's position.

func (*PositionTracker) Get

func (t *PositionTracker) Get(ctx context.Context, loopID string) (*AgentPosition, error)

Get retrieves an agent's position by loop ID.

func (*PositionTracker) ListAll

func (t *PositionTracker) ListAll(ctx context.Context) ([]*AgentPosition, error)

ListAll returns all agent positions.

func (*PositionTracker) ListByRole

func (t *PositionTracker) ListByRole(ctx context.Context, role string) ([]*AgentPosition, error)

ListByRole returns all agent positions with a specific role.

func (*PositionTracker) ListOthers

func (t *PositionTracker) ListOthers(ctx context.Context, excludeLoopID string) ([]*AgentPosition, error)

ListOthers returns all agent positions except the specified loop ID.

func (*PositionTracker) Put

Put stores an agent's position.

func (*PositionTracker) UpdateFocusEntities

func (t *PositionTracker) UpdateFocusEntities(ctx context.Context, loopID string, entities []string) error

UpdateFocusEntities updates the focus entities for an agent.

func (*PositionTracker) UpdateTraversalVector

func (t *PositionTracker) UpdateTraversalVector(ctx context.Context, loopID string, predicates []string) error

UpdateTraversalVector updates the traversal vector for an agent.

type RuleFactory

type RuleFactory struct {
	// contains filtered or unexported fields
}

RuleFactory creates boid coordination rules.

func NewRuleFactory

func NewRuleFactory() *RuleFactory

NewRuleFactory creates a new boid rule factory.

func (*RuleFactory) Create

func (f *RuleFactory) Create(id string, def rule.Definition, deps rule.Dependencies) (rule.Rule, error)

Create creates a boid rule from the definition.

func (*RuleFactory) Schema

func (f *RuleFactory) Schema() rule.Schema

Schema returns the boid rule schema for documentation.

func (*RuleFactory) Type

func (f *RuleFactory) Type() string

Type returns the factory type identifier.

func (*RuleFactory) Validate

func (f *RuleFactory) Validate(def rule.Definition) error

Validate validates the boid rule definition.

type SeparationRule

type SeparationRule struct {
	// contains filtered or unexported fields
}

SeparationRule implements the Boids separation behavior. It prevents agents from working on overlapping graph neighborhoods by generating avoid signals when agents are within k-hop distance.

func NewSeparationRule

func NewSeparationRule(id string, def rule.Definition, config *Config, cooldown time.Duration, logger *slog.Logger) *SeparationRule

NewSeparationRule creates a new separation rule.

func (*SeparationRule) Evaluate

func (r *SeparationRule) Evaluate(_ []message.Message) bool

Evaluate is not used for boid rules (they use EvaluateEntityState).

func (*SeparationRule) EvaluateEntityState

func (r *SeparationRule) EvaluateEntityState(entityState *gtypes.EntityState) bool

EvaluateEntityState evaluates the separation rule against an agent's position. Implements the rule.EntityStateEvaluator interface.

func (*SeparationRule) ExecuteEvents

func (r *SeparationRule) ExecuteEvents(_ []message.Message) ([]rule.Event, error)

ExecuteEvents is not used for boid rules (signals are generated in EvaluateEntityState).

func (*SeparationRule) GetPendingSignals

func (r *SeparationRule) GetPendingSignals() []*SteeringSignal

GetPendingSignals returns and clears the pending signals. This is used by the rule processor to retrieve generated signals.

func (*SeparationRule) Name

func (r *SeparationRule) Name() string

Name returns the rule name.

func (*SeparationRule) SetPivotIndex

func (r *SeparationRule) SetPivotIndex(index *structural.PivotIndex)

SetPivotIndex sets the pivot index for k-hop distance estimation.

func (*SeparationRule) SetPositionProvider

func (r *SeparationRule) SetPositionProvider(provider PositionProvider)

SetPositionProvider sets the provider for retrieving other agent positions.

func (*SeparationRule) Subscribe

func (r *SeparationRule) Subscribe() []string

Subscribe returns subjects this rule subscribes to. Boid rules watch the AGENT_POSITIONS KV bucket, not NATS subjects.

type SignalGenerator

type SignalGenerator interface {
	GetPendingSignals() []*SteeringSignal
}

SignalGenerator interface for rules that generate boid signals.

type SteeringSignal

type SteeringSignal struct {
	// LoopID identifies which agent loop should receive this signal.
	LoopID string `json:"loop_id"`

	// SignalType identifies the rule type: separation, cohesion, or alignment.
	SignalType string `json:"signal_type"`

	// SuggestedFocus contains entity IDs to prioritize (cohesion signals).
	SuggestedFocus []string `json:"suggested_focus,omitempty"`

	// AvoidEntities contains entity IDs to deprioritize (separation signals).
	AvoidEntities []string `json:"avoid_entities,omitempty"`

	// AlignWith contains predicate patterns to favor (alignment signals).
	AlignWith []string `json:"align_with,omitempty"`

	// Strength indicates how strongly to apply the signal (0.0-1.0).
	Strength float64 `json:"strength"`

	// SourceRule identifies which rule generated this signal.
	SourceRule string `json:"source_rule,omitempty"`

	// Timestamp is when this signal was generated.
	Timestamp time.Time `json:"timestamp"`

	// Metadata contains additional context about the signal.
	Metadata map[string]any `json:"metadata,omitempty"`
}

SteeringSignal is published when a Boid rule fires. It provides guidance to the agentic-loop for adjusting agent behavior.

func (*SteeringSignal) MarshalJSON

func (s *SteeringSignal) MarshalJSON() ([]byte, error)

MarshalJSON implements json.Marshaler.

func (*SteeringSignal) Schema

func (s *SteeringSignal) Schema() message.Type

Schema returns the message type schema for SteeringSignal.

func (*SteeringSignal) UnmarshalJSON

func (s *SteeringSignal) UnmarshalJSON(data []byte) error

UnmarshalJSON implements json.Unmarshaler.

func (*SteeringSignal) Validate

func (s *SteeringSignal) Validate() error

Validate checks that the SteeringSignal has required fields.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL