Documentation
¶
Overview ¶
Package boid implements Boids-inspired local coordination rules for multi-agent teams.
This package provides three local rules based on Craig Reynolds' Boids simulation (1986):
- Cohesion: Steer toward high-centrality nodes in the active subgraph matching the role's objective
- Alignment: Match traversal direction of same-role agents
- Separation: Avoid overlapping k-hop neighborhoods with other agents
Architecture ¶
Agent positions are tracked in the AGENT_POSITIONS KV bucket. Rules watch this bucket and evaluate local coordination signals based on graph topology. Signals are published to agent.boid.<loopID> subjects for consumption by the agentic-loop.
Graph Topology as Coordination Substrate ¶
Unlike spatial Boids, which use Euclidean distance, these rules use the knowledge graph topology directly:
- Proximity: k-hop distance via PivotIndex.IsWithinHops()
- Center of mass: PageRank centrality over active subgraph
- Heading: Traversal direction (relationship types being followed)
- Flock boundaries: Community membership
Configuration ¶
Rules are configured via JSON with role-specific thresholds:
{
"type": "boid",
"metadata": {
"boid_rule": "separation",
"role_thresholds": {
"general": 2,
"architect": 3
}
}
}
Research Context ¶
This is an experimental implementation for validating the hypothesis that explicit workflow choreography can be replaced by intent-weighted local rules for certain agent team configurations. See docs/research/boids-hypothesis.md for details.
Index ¶
- Constants
- func CalculateVelocity(oldFocus, newFocus []string) float64
- type AgentPosition
- type AlignmentRule
- func (r *AlignmentRule) Evaluate(_ []message.Message) bool
- func (r *AlignmentRule) EvaluateEntityState(entityState *gtypes.EntityState) bool
- func (r *AlignmentRule) ExecuteEvents(_ []message.Message) ([]rule.Event, error)
- func (r *AlignmentRule) GetPendingSignals() []*SteeringSignal
- func (r *AlignmentRule) Name() string
- func (r *AlignmentRule) SetPositionProvider(provider PositionProvider)
- func (r *AlignmentRule) Subscribe() []string
- type CentralityProvider
- type CohesionRule
- func (r *CohesionRule) Evaluate(_ []message.Message) bool
- func (r *CohesionRule) EvaluateEntityState(entityState *gtypes.EntityState) bool
- func (r *CohesionRule) ExecuteEvents(_ []message.Message) ([]rule.Event, error)
- func (r *CohesionRule) GetPendingSignals() []*SteeringSignal
- func (r *CohesionRule) Name() string
- func (r *CohesionRule) SetCentralityProvider(provider CentralityProvider)
- func (r *CohesionRule) SetPivotIndex(index *structural.PivotIndex)
- func (r *CohesionRule) SetPositionProvider(provider PositionProvider)
- func (r *CohesionRule) Subscribe() []string
- type Config
- type PageRankCentralityProvider
- type PositionProvider
- type PositionTracker
- func (t *PositionTracker) Delete(ctx context.Context, loopID string) error
- func (t *PositionTracker) Get(ctx context.Context, loopID string) (*AgentPosition, error)
- func (t *PositionTracker) ListAll(ctx context.Context) ([]*AgentPosition, error)
- func (t *PositionTracker) ListByRole(ctx context.Context, role string) ([]*AgentPosition, error)
- func (t *PositionTracker) ListOthers(ctx context.Context, excludeLoopID string) ([]*AgentPosition, error)
- func (t *PositionTracker) Put(ctx context.Context, pos *AgentPosition) (uint64, error)
- func (t *PositionTracker) UpdateFocusEntities(ctx context.Context, loopID string, entities []string) error
- func (t *PositionTracker) UpdateTraversalVector(ctx context.Context, loopID string, predicates []string) error
- type RuleFactory
- type SeparationRule
- func (r *SeparationRule) Evaluate(_ []message.Message) bool
- func (r *SeparationRule) EvaluateEntityState(entityState *gtypes.EntityState) bool
- func (r *SeparationRule) ExecuteEvents(_ []message.Message) ([]rule.Event, error)
- func (r *SeparationRule) GetPendingSignals() []*SteeringSignal
- func (r *SeparationRule) Name() string
- func (r *SeparationRule) SetPivotIndex(index *structural.PivotIndex)
- func (r *SeparationRule) SetPositionProvider(provider PositionProvider)
- func (r *SeparationRule) Subscribe() []string
- type SignalGenerator
- type SteeringSignal
Constants ¶
const ( // Domain identifies boid-related messages. Domain = "boid" // CategoryPosition is the category for agent position updates. CategoryPosition = "position" // CategorySignal is the category for steering signals. CategorySignal = "signal" // SchemaVersion is the current schema version. SchemaVersion = "v1" // KVBucketAgentPositions is the KV bucket name for agent positions. KVBucketAgentPositions = "AGENT_POSITIONS" )
Constants for the boid domain and message types.
const ( // RuleTypeSeparation identifies separation rules. RuleTypeSeparation = "separation" // RuleTypeCohesion identifies cohesion rules. RuleTypeCohesion = "cohesion" // RuleTypeAlignment identifies alignment rules. RuleTypeAlignment = "alignment" )
Boid rule type identifiers.
const ( SignalTypeSeparation = "separation" SignalTypeCohesion = "cohesion" SignalTypeAlignment = "alignment" )
Signal type identifiers.
const ( // DefaultSeparationThreshold is the default k-hop distance for separation rules. DefaultSeparationThreshold = 2 // DefaultSteeringStrength is the default influence strength (0.0-1.0). DefaultSteeringStrength = 0.5 // DefaultAlignmentWindow is the default number of recent traversals to consider. DefaultAlignmentWindow = 5 // DefaultCentralityWeight is the default weight for centrality attraction. DefaultCentralityWeight = 0.7 )
Default configuration values.
Variables ¶
This section is empty.
Functions ¶
func CalculateVelocity ¶
CalculateVelocity computes velocity based on position changes. Velocity is a normalized measure of how much the focus entities changed.
Types ¶
type AgentPosition ¶
type AgentPosition struct {
// LoopID is the unique identifier for the agentic loop.
LoopID string `json:"loop_id"`
// Role is the agent's role (general, architect, editor, reviewer).
Role string `json:"role"`
// FocusEntities are the entity IDs the agent is currently working on.
FocusEntities []string `json:"focus_entities"`
// TraversalVector contains the relationship predicates being followed.
// This represents the agent's "heading" in graph space.
TraversalVector []string `json:"traversal_vector"`
// Velocity represents the rate of position change (0.0-1.0).
// Higher values indicate more active exploration.
Velocity float64 `json:"velocity"`
// Iteration is the current loop iteration number.
Iteration int `json:"iteration"`
// LastUpdate is when this position was last updated.
LastUpdate time.Time `json:"last_update"`
}
AgentPosition tracks an agent's current focus and traversal direction in the graph. This is stored in the AGENT_POSITIONS KV bucket, keyed by LoopID.
func (*AgentPosition) MarshalJSON ¶
func (p *AgentPosition) MarshalJSON() ([]byte, error)
MarshalJSON implements json.Marshaler.
func (*AgentPosition) Schema ¶
func (p *AgentPosition) Schema() message.Type
Schema returns the message type schema for AgentPosition.
func (*AgentPosition) UnmarshalJSON ¶
func (p *AgentPosition) UnmarshalJSON(data []byte) error
UnmarshalJSON implements json.Unmarshaler.
func (*AgentPosition) Validate ¶
func (p *AgentPosition) Validate() error
Validate checks that the AgentPosition has required fields and valid values.
type AlignmentRule ¶
type AlignmentRule struct {
// contains filtered or unexported fields
}
AlignmentRule implements the Boids alignment behavior. It steers agents to match the traversal direction of same-role agents by suggesting common predicate patterns to follow.
func NewAlignmentRule ¶
func NewAlignmentRule(id string, def rule.Definition, config *Config, cooldown time.Duration, logger *slog.Logger) *AlignmentRule
NewAlignmentRule creates a new alignment rule.
func (*AlignmentRule) Evaluate ¶
Evaluate is not used for boid rules (they use EvaluateEntityState).
func (*AlignmentRule) EvaluateEntityState ¶
func (r *AlignmentRule) EvaluateEntityState(entityState *gtypes.EntityState) bool
EvaluateEntityState evaluates the alignment rule against an agent's position. Implements the rule.EntityStateEvaluator interface.
func (*AlignmentRule) ExecuteEvents ¶
ExecuteEvents is not used for boid rules (signals are generated in EvaluateEntityState).
func (*AlignmentRule) GetPendingSignals ¶
func (r *AlignmentRule) GetPendingSignals() []*SteeringSignal
GetPendingSignals returns and clears the pending signals.
func (*AlignmentRule) SetPositionProvider ¶
func (r *AlignmentRule) SetPositionProvider(provider PositionProvider)
SetPositionProvider sets the provider for retrieving other agent positions.
type CentralityProvider ¶
type CentralityProvider interface {
// GetPageRankScores returns PageRank scores for a set of entities.
// Returns a map of entity ID to score (0.0-1.0).
GetPageRankScores(ctx context.Context, entityIDs []string) (map[string]float64, error)
}
CentralityProvider provides centrality scores for entities.
type CohesionRule ¶
type CohesionRule struct {
// contains filtered or unexported fields
}
CohesionRule implements the Boids cohesion behavior. It steers agents toward high-centrality nodes in their active subgraph that match their role's objective function.
func NewCohesionRule ¶
func NewCohesionRule(id string, def rule.Definition, config *Config, cooldown time.Duration, logger *slog.Logger) *CohesionRule
NewCohesionRule creates a new cohesion rule.
func (*CohesionRule) EvaluateEntityState ¶
func (r *CohesionRule) EvaluateEntityState(entityState *gtypes.EntityState) bool
EvaluateEntityState evaluates the cohesion rule against an agent's position. Implements the rule.EntityStateEvaluator interface.
func (*CohesionRule) ExecuteEvents ¶
ExecuteEvents is not used for boid rules (signals are generated in EvaluateEntityState).
func (*CohesionRule) GetPendingSignals ¶
func (r *CohesionRule) GetPendingSignals() []*SteeringSignal
GetPendingSignals returns and clears the pending signals.
func (*CohesionRule) SetCentralityProvider ¶
func (r *CohesionRule) SetCentralityProvider(provider CentralityProvider)
SetCentralityProvider sets the provider for centrality scores.
func (*CohesionRule) SetPivotIndex ¶
func (r *CohesionRule) SetPivotIndex(index *structural.PivotIndex)
SetPivotIndex sets the pivot index for finding reachable candidates.
func (*CohesionRule) SetPositionProvider ¶
func (r *CohesionRule) SetPositionProvider(provider PositionProvider)
SetPositionProvider sets the provider for retrieving agent positions.
type Config ¶
type Config struct {
// BoidRule specifies the rule type: separation, cohesion, or alignment.
BoidRule string `json:"boid_rule"`
// RoleFilter limits the rule to agents with this role (empty = all roles).
RoleFilter string `json:"role_filter,omitempty"`
// RoleThresholds maps role names to k-hop separation thresholds.
// Used by separation rules to allow different thresholds per role.
RoleThresholds map[string]int `json:"role_thresholds,omitempty"`
// SeparationThreshold is the default k-hop distance for separation (if RoleThresholds not set).
SeparationThreshold int `json:"separation_threshold,omitempty"`
// CentralityWeight is the weight for centrality attraction in cohesion rules (0.0-1.0).
CentralityWeight float64 `json:"centrality_weight,omitempty"`
// AlignmentWindow is the number of recent traversals to consider for alignment.
AlignmentWindow int `json:"alignment_window,omitempty"`
// SteeringStrength is how much to influence agent behavior (0.0-1.0).
SteeringStrength float64 `json:"steering_strength"`
// Cooldown specifies minimum time between signal emissions.
Cooldown string `json:"cooldown,omitempty"`
}
Config contains configuration extracted from rule metadata.
func ParseConfig ¶
ParseConfig extracts Config from rule metadata.
func (*Config) GetSeparationThreshold ¶
GetSeparationThreshold returns the separation threshold for a given role.
type PageRankCentralityProvider ¶
type PageRankCentralityProvider struct {
// contains filtered or unexported fields
}
PageRankCentralityProvider implements CentralityProvider using cached PageRank scores. It computes PageRank periodically based on the configured TTL and caches the results.
func NewPageRankCentralityProvider ¶
func NewPageRankCentralityProvider(provider clustering.Provider, ttl time.Duration, logger *slog.Logger) *PageRankCentralityProvider
NewPageRankCentralityProvider creates a new PageRank-based centrality provider. The provider parameter must implement the clustering.Provider interface for graph access. TTL controls how often PageRank is recomputed (0 means compute on every request).
func (*PageRankCentralityProvider) GetPageRankScores ¶
func (p *PageRankCentralityProvider) GetPageRankScores(ctx context.Context, entityIDs []string) (map[string]float64, error)
GetPageRankScores returns PageRank scores for the specified entities. If the cached scores are stale (older than TTL), they are recomputed first. Returns a map of entity ID to normalized score (0.0-1.0).
func (*PageRankCentralityProvider) Invalidate ¶
func (p *PageRankCentralityProvider) Invalidate()
Invalidate clears the cached scores, forcing a recomputation on next request.
type PositionProvider ¶
type PositionProvider interface {
Get(ctx context.Context, loopID string) (*AgentPosition, error)
ListOthers(ctx context.Context, excludeLoopID string) ([]*AgentPosition, error)
}
PositionProvider retrieves agent positions for rule evaluation.
type PositionTracker ¶
type PositionTracker struct {
// contains filtered or unexported fields
}
PositionTracker manages agent positions in the AGENT_POSITIONS KV bucket.
func NewPositionTracker ¶
func NewPositionTracker(kv jetstream.KeyValue, logger *slog.Logger) *PositionTracker
NewPositionTracker creates a new position tracker.
func (*PositionTracker) Delete ¶
func (t *PositionTracker) Delete(ctx context.Context, loopID string) error
Delete removes an agent's position.
func (*PositionTracker) Get ¶
func (t *PositionTracker) Get(ctx context.Context, loopID string) (*AgentPosition, error)
Get retrieves an agent's position by loop ID.
func (*PositionTracker) ListAll ¶
func (t *PositionTracker) ListAll(ctx context.Context) ([]*AgentPosition, error)
ListAll returns all agent positions.
func (*PositionTracker) ListByRole ¶
func (t *PositionTracker) ListByRole(ctx context.Context, role string) ([]*AgentPosition, error)
ListByRole returns all agent positions with a specific role.
func (*PositionTracker) ListOthers ¶
func (t *PositionTracker) ListOthers(ctx context.Context, excludeLoopID string) ([]*AgentPosition, error)
ListOthers returns all agent positions except the specified loop ID.
func (*PositionTracker) Put ¶
func (t *PositionTracker) Put(ctx context.Context, pos *AgentPosition) (uint64, error)
Put stores an agent's position.
func (*PositionTracker) UpdateFocusEntities ¶
func (t *PositionTracker) UpdateFocusEntities(ctx context.Context, loopID string, entities []string) error
UpdateFocusEntities updates the focus entities for an agent.
func (*PositionTracker) UpdateTraversalVector ¶
func (t *PositionTracker) UpdateTraversalVector(ctx context.Context, loopID string, predicates []string) error
UpdateTraversalVector updates the traversal vector for an agent.
type RuleFactory ¶
type RuleFactory struct {
// contains filtered or unexported fields
}
RuleFactory creates boid coordination rules.
func NewRuleFactory ¶
func NewRuleFactory() *RuleFactory
NewRuleFactory creates a new boid rule factory.
func (*RuleFactory) Create ¶
func (f *RuleFactory) Create(id string, def rule.Definition, deps rule.Dependencies) (rule.Rule, error)
Create creates a boid rule from the definition.
func (*RuleFactory) Schema ¶
func (f *RuleFactory) Schema() rule.Schema
Schema returns the boid rule schema for documentation.
func (*RuleFactory) Type ¶
func (f *RuleFactory) Type() string
Type returns the factory type identifier.
func (*RuleFactory) Validate ¶
func (f *RuleFactory) Validate(def rule.Definition) error
Validate validates the boid rule definition.
type SeparationRule ¶
type SeparationRule struct {
// contains filtered or unexported fields
}
SeparationRule implements the Boids separation behavior. It prevents agents from working on overlapping graph neighborhoods by generating avoid signals when agents are within k-hop distance.
func NewSeparationRule ¶
func NewSeparationRule(id string, def rule.Definition, config *Config, cooldown time.Duration, logger *slog.Logger) *SeparationRule
NewSeparationRule creates a new separation rule.
func (*SeparationRule) Evaluate ¶
Evaluate is not used for boid rules (they use EvaluateEntityState).
func (*SeparationRule) EvaluateEntityState ¶
func (r *SeparationRule) EvaluateEntityState(entityState *gtypes.EntityState) bool
EvaluateEntityState evaluates the separation rule against an agent's position. Implements the rule.EntityStateEvaluator interface.
func (*SeparationRule) ExecuteEvents ¶
ExecuteEvents is not used for boid rules (signals are generated in EvaluateEntityState).
func (*SeparationRule) GetPendingSignals ¶
func (r *SeparationRule) GetPendingSignals() []*SteeringSignal
GetPendingSignals returns and clears the pending signals. This is used by the rule processor to retrieve generated signals.
func (*SeparationRule) SetPivotIndex ¶
func (r *SeparationRule) SetPivotIndex(index *structural.PivotIndex)
SetPivotIndex sets the pivot index for k-hop distance estimation.
func (*SeparationRule) SetPositionProvider ¶
func (r *SeparationRule) SetPositionProvider(provider PositionProvider)
SetPositionProvider sets the provider for retrieving other agent positions.
type SignalGenerator ¶
type SignalGenerator interface {
GetPendingSignals() []*SteeringSignal
}
SignalGenerator interface for rules that generate boid signals.
type SteeringSignal ¶
type SteeringSignal struct {
// LoopID identifies which agent loop should receive this signal.
LoopID string `json:"loop_id"`
// SignalType identifies the rule type: separation, cohesion, or alignment.
SignalType string `json:"signal_type"`
// SuggestedFocus contains entity IDs to prioritize (cohesion signals).
SuggestedFocus []string `json:"suggested_focus,omitempty"`
// AvoidEntities contains entity IDs to deprioritize (separation signals).
AvoidEntities []string `json:"avoid_entities,omitempty"`
// AlignWith contains predicate patterns to favor (alignment signals).
AlignWith []string `json:"align_with,omitempty"`
// Strength indicates how strongly to apply the signal (0.0-1.0).
Strength float64 `json:"strength"`
// SourceRule identifies which rule generated this signal.
SourceRule string `json:"source_rule,omitempty"`
// Timestamp is when this signal was generated.
Timestamp time.Time `json:"timestamp"`
// Metadata contains additional context about the signal.
Metadata map[string]any `json:"metadata,omitempty"`
}
SteeringSignal is published when a Boid rule fires. It provides guidance to the agentic-loop for adjusting agent behavior.
func (*SteeringSignal) MarshalJSON ¶
func (s *SteeringSignal) MarshalJSON() ([]byte, error)
MarshalJSON implements json.Marshaler.
func (*SteeringSignal) Schema ¶
func (s *SteeringSignal) Schema() message.Type
Schema returns the message type schema for SteeringSignal.
func (*SteeringSignal) UnmarshalJSON ¶
func (s *SteeringSignal) UnmarshalJSON(data []byte) error
UnmarshalJSON implements json.Unmarshaler.
func (*SteeringSignal) Validate ¶
func (s *SteeringSignal) Validate() error
Validate checks that the SteeringSignal has required fields.