team

package
v0.7.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 13, 2026 License: MIT Imports: 17 Imported by: 0

Documentation

Overview

Package team defines the types and coordination primitives for P2P agent teams. A team is a dynamic, task-scoped group of agents that collaborate on a goal.

Index

Constants

View Source
const DefaultPostPayThreshold = 0.7

DefaultPostPayThreshold is the minimum trust score for post-pay eligibility. Matches paygate.DefaultPostPayThreshold to keep both layers consistent.

Variables

View Source
var (
	ErrTeamNotFound    = errors.New("team not found")
	ErrInsufficientAck = errors.New("insufficient acknowledgments from team members")
)

Sentinel errors for coordination.

View Source
var (
	ErrPriceRejected   = errors.New("proposed price was rejected")
	ErrNegotiationFail = errors.New("payment negotiation failed")
)

Sentinel errors for payment negotiation.

View Source
var (
	ErrTeamFull         = errors.New("team is at maximum capacity")
	ErrBudgetExceeded   = errors.New("team budget exceeded")
	ErrAlreadyMember    = errors.New("agent is already a team member")
	ErrNotMember        = errors.New("agent is not a team member")
	ErrTeamDisbanded    = errors.New("team has been disbanded")
	ErrConflict         = errors.New("conflicting results from team members")
	ErrTeamShuttingDown = errors.New("team is shutting down")
)

Sentinel errors for team operations.

Functions

func BuildEscrowTools added in v0.7.0

func BuildEscrowTools(coord *Coordinator, escrowEngine *escrow.Engine, budgetEngine *budget.Engine) []*agent.Tool

BuildEscrowTools creates high-level workflow tools that combine team + escrow + budget.

func BuildTools added in v0.7.0

func BuildTools(coord *Coordinator) []*agent.Tool

BuildTools creates team coordination tools from a Coordinator.

func FastestResolver

func FastestResolver(results []TaskResult) (map[string]interface{}, error)

FastestResolver picks the first successful result.

func MajorityResolver

func MajorityResolver(results []TaskResult) (map[string]interface{}, error)

MajorityResolver picks the most common result by simple majority.

func WithScopedContext

func WithScopedContext(ctx context.Context, sc ScopedContext) context.Context

WithScopedContext returns a context carrying the team scope.

Types

type AssignmentStrategy

type AssignmentStrategy string

AssignmentStrategy determines how tasks are assigned to members.

const (
	AssignBestMatch    AssignmentStrategy = "best_match"
	AssignRoundRobin   AssignmentStrategy = "round_robin"
	AssignLoadBalanced AssignmentStrategy = "load_balanced"
)

type BoltStore added in v0.6.0

type BoltStore struct {
	// contains filtered or unexported fields
}

BoltStore is a BoltDB-backed TeamStore.

func NewBoltStore added in v0.6.0

func NewBoltStore(db *bolt.DB, logger *zap.SugaredLogger) (*BoltStore, error)

NewBoltStore creates a BoltStore and ensures the teams bucket exists.

func (*BoltStore) Delete added in v0.6.0

func (s *BoltStore) Delete(teamID string) error

Delete removes a team from BoltDB.

func (*BoltStore) Load added in v0.6.0

func (s *BoltStore) Load(teamID string) (*Team, error)

Load retrieves a team by ID from BoltDB.

func (*BoltStore) LoadAll added in v0.6.0

func (s *BoltStore) LoadAll() ([]*Team, error)

LoadAll retrieves all persisted teams.

func (*BoltStore) Save added in v0.6.0

func (s *BoltStore) Save(t *Team) error

Save persists a team to BoltDB.

type ConflictResolver

type ConflictResolver func(results []TaskResult) (map[string]interface{}, error)

ConflictResolver decides the final result when members produce conflicting outputs.

type ConflictStrategy

type ConflictStrategy string

ConflictStrategy defines how to resolve conflicting results from multiple agents.

const (
	StrategyTrustWeighted  ConflictStrategy = "trust_weighted"
	StrategyMajorityVote   ConflictStrategy = "majority_vote"
	StrategyLeaderDecides  ConflictStrategy = "leader_decides"
	StrategyFailOnConflict ConflictStrategy = "fail_on_conflict"
)

type ContextFilter

type ContextFilter struct {
	// AllowedKeys restricts shared metadata to these keys. Empty means allow all.
	AllowedKeys []string
	// ExcludeKeys removes these keys from shared metadata.
	ExcludeKeys []string
}

ContextFilter determines which context data is shared with a team member.

func (*ContextFilter) Filter

func (f *ContextFilter) Filter(metadata map[string]string) map[string]string

Filter applies the filter to a metadata map and returns a new filtered copy.

type Coordinator

type Coordinator struct {
	// contains filtered or unexported fields
}

Coordinator manages the lifecycle of agent teams — forming, delegating, collecting, and disbanding.

func NewCoordinator

func NewCoordinator(cfg CoordinatorConfig) *Coordinator

NewCoordinator creates a team coordinator.

func (*Coordinator) ActiveTeams

func (c *Coordinator) ActiveTeams() []*Team

ActiveTeams returns all currently managed teams (alias for ListTeams).

func (*Coordinator) CollectResults

func (c *Coordinator) CollectResults(teamID, toolName string, results []TaskResult) (map[string]interface{}, error)

CollectResults resolves conflicts from delegated task results using the configured resolver.

func (*Coordinator) DelegateTask

func (c *Coordinator) DelegateTask(ctx context.Context, teamID, toolName string, params map[string]interface{}) ([]TaskResult, error)

DelegateTask sends a task to all workers in the team and collects results.

func (*Coordinator) DisbandTeam

func (c *Coordinator) DisbandTeam(teamID string) error

DisbandTeam marks a team as disbanded and removes it from the coordinator.

func (*Coordinator) FormTeam

func (c *Coordinator) FormTeam(ctx context.Context, req FormTeamRequest) (*Team, error)

FormTeam creates a new team by selecting agents from the pool.

func (*Coordinator) GetTeam

func (c *Coordinator) GetTeam(teamID string) (*Team, error)

GetTeam returns a team by ID.

func (*Coordinator) GracefulShutdown added in v0.6.0

func (c *Coordinator) GracefulShutdown(ctx context.Context, teamID string, reason string) error

GracefulShutdown performs an ordered team shutdown:

  1. Set team status to StatusShuttingDown (blocks new task delegation)
  2. Calculate proportional milestone settlement for completed work
  3. Publish TeamGracefulShutdownEvent
  4. Disband team with reason

func (*Coordinator) KickMember added in v0.6.0

func (c *Coordinator) KickMember(_ context.Context, teamID string, memberDID string, reason string) error

KickMember removes a member from a team with a reason and publishes a TeamMemberLeftEvent.

func (*Coordinator) ListTeams

func (c *Coordinator) ListTeams() []*Team

ListTeams returns all active teams.

func (*Coordinator) LoadPersistedTeams added in v0.6.0

func (c *Coordinator) LoadPersistedTeams() error

LoadPersistedTeams loads all teams from the persistent store into memory. This should be called during startup to restore teams from a previous session.

func (*Coordinator) TeamsForMember added in v0.6.0

func (c *Coordinator) TeamsForMember(did string) []string

TeamsForMember returns all active team IDs that include the given DID.

type CoordinatorConfig

type CoordinatorConfig struct {
	Pool             *agentpool.Pool
	Selector         *agentpool.Selector
	InvokeFn         InvokeFunc
	ConflictResolver ConflictResolver
	Conflict         ConflictStrategy
	Assignment       AssignmentStrategy
	Bus              *eventbus.Bus
	Store            TeamStore
	Logger           *zap.SugaredLogger
}

CoordinatorConfig configures the team coordinator.

type FormTeamRequest

type FormTeamRequest struct {
	TeamID      string
	Name        string
	Goal        string
	LeaderDID   string
	Capability  string
	MemberCount int
	MaxMembers  int
}

FormTeamRequest describes how to form a new team.

type GitDivergence added in v0.6.0

type GitDivergence struct {
	WorkspaceID  string
	MemberDID    string
	MemberHead   string
	MajorityHead string
}

GitDivergence describes when a member's HEAD differs from the majority.

type GitStateProvider added in v0.6.0

type GitStateProvider func(ctx context.Context, peerID, workspaceID string) (string, error)

GitStateProvider returns the HEAD commit hash for a workspace from a given member.

type HealthMonitor added in v0.6.0

type HealthMonitor struct {
	// contains filtered or unexported fields
}

HealthMonitor periodically pings team members and publishes unhealthy events when a member misses too many consecutive health checks.

func NewHealthMonitor added in v0.6.0

func NewHealthMonitor(cfg HealthMonitorConfig) *HealthMonitor

NewHealthMonitor creates a health monitor with the given configuration.

func (*HealthMonitor) DetectDivergence added in v0.6.0

func (h *HealthMonitor) DetectDivergence(workspaceID string) []GitDivergence

DetectDivergence checks if members have different HEAD commits for a workspace. It returns a list of members whose HEAD differs from the majority.

func (*HealthMonitor) Name added in v0.6.0

func (h *HealthMonitor) Name() string

Name implements lifecycle.Component.

func (*HealthMonitor) Start added in v0.6.0

func (h *HealthMonitor) Start(_ context.Context, wg *sync.WaitGroup) error

Start implements lifecycle.Component. It launches the periodic health check loop and subscribes to task completion events for counter resets.

func (*HealthMonitor) Stop added in v0.6.0

func (h *HealthMonitor) Stop(_ context.Context) error

Stop implements lifecycle.Component.

type HealthMonitorConfig added in v0.6.0

type HealthMonitorConfig struct {
	Coordinator      *Coordinator
	Bus              *eventbus.Bus
	Logger           *zap.SugaredLogger
	Interval         time.Duration
	MaxMissed        int
	InvokeFn         InvokeFunc
	GitStateProvider GitStateProvider
	WorkspaceIDsFn   func() []string
}

HealthMonitorConfig configures the health monitor.

type InvokeFunc

type InvokeFunc func(ctx context.Context, peerID, toolName string, params map[string]interface{}) (map[string]interface{}, error)

InvokeFunc is the callback used by the coordinator to send a task to a remote agent.

type Member

type Member struct {
	DID          string            `json:"did"`
	Name         string            `json:"name"`
	PeerID       string            `json:"peerId"`
	Role         Role              `json:"role"`
	Status       MemberStatus      `json:"status"`
	Capabilities []string          `json:"capabilities"`
	TrustScore   float64           `json:"trustScore"`
	JoinedAt     time.Time         `json:"joinedAt"`
	Metadata     map[string]string `json:"metadata,omitempty"`
}

Member represents an agent participating in a team.

func (*Member) Clone

func (m *Member) Clone() *Member

Clone returns a deep copy of the Member.

type MemberGitState added in v0.6.0

type MemberGitState struct {
	MemberDID string
	HeadHash  string
	UpdatedAt time.Time
}

MemberGitState records a member's known HEAD hash for a workspace.

type MemberStatus

type MemberStatus string

MemberStatus represents the operational state of a team member.

const (
	MemberIdle   MemberStatus = "idle"
	MemberBusy   MemberStatus = "busy"
	MemberFailed MemberStatus = "failed"
	MemberLeft   MemberStatus = "left"
)

type Negotiator

type Negotiator struct {
	// contains filtered or unexported fields
}

Negotiator handles payment negotiation between team leader and members.

func NewNegotiator

func NewNegotiator(cfg NegotiatorConfig) *Negotiator

NewNegotiator creates a payment negotiator.

func (*Negotiator) NegotiatePayment

func (n *Negotiator) NegotiatePayment(ctx context.Context, teamID string, member *Member, toolName string) (*PaymentAgreement, error)

NegotiatePayment determines the payment terms for a team member. It queries the member's price and the leader's trust in the member to decide the mode.

type NegotiatorConfig

type NegotiatorConfig struct {
	PriceQueryFn     PriceQueryFunc
	TrustScoreFn     TrustScoreFunc
	PostPayThreshold float64 // min trust score for post-pay (default: 0.8)
	DefaultValidity  time.Duration
}

NegotiatorConfig configures the payment negotiator.

type PaymentAgreement

type PaymentAgreement struct {
	TeamID      string      `json:"teamId"`
	MemberDID   string      `json:"memberDid"`
	Mode        PaymentMode `json:"mode"`
	PricePerUse string      `json:"pricePerUse"` // decimal string, e.g. "0.50"
	Currency    string      `json:"currency"`
	MaxUses     int         `json:"maxUses"` // 0 = unlimited
	ValidUntil  time.Time   `json:"validUntil"`
	AgreedAt    time.Time   `json:"agreedAt"`
}

PaymentAgreement records the negotiated payment terms between a team leader and a member.

func NegotiatePaymentQuick

func NegotiatePaymentQuick(teamID, agentDID string, trustScore, pricePerTask, maxBudget float64) *PaymentAgreement

NegotiatePaymentQuick creates a payment agreement using the simple trust-based mode selection.

func (*PaymentAgreement) IsExpired

func (a *PaymentAgreement) IsExpired() bool

IsExpired reports whether the agreement has passed its validity window.

type PaymentMode

type PaymentMode string

PaymentMode describes how a team member will be compensated.

const (
	PaymentPrepay  PaymentMode = "prepay"
	PaymentPostpay PaymentMode = "postpay"
	PaymentFree    PaymentMode = "free"
)

func SelectPaymentMode

func SelectPaymentMode(trustScore, pricePerTask float64) PaymentMode

SelectPaymentMode chooses payment mode based on trust score and price. High trust (>= DefaultPostPayThreshold) with nonzero price -> PostPay; low trust -> PrePay; zero price -> Free.

type PriceQueryFunc

type PriceQueryFunc func(ctx context.Context, peerID, toolName string) (price string, isFree bool, err error)

PriceQueryFunc queries a remote agent's price for a capability or tool.

type Role

type Role string

Role describes a member's function within a team.

const (
	RoleLeader   Role = "leader"
	RoleWorker   Role = "worker"
	RoleReviewer Role = "reviewer"
	RoleObserver Role = "observer"
)

type ScopedContext

type ScopedContext struct {
	TeamID    string
	MemberDID string
	Role      Role
}

ScopedContext wraps a context with team-specific metadata so that downstream handlers can identify which team and member is executing a request.

func ScopedContextFromContext

func ScopedContextFromContext(ctx context.Context) (ScopedContext, bool)

ScopedContextFromContext extracts the team scope from ctx.

type TaskResult

type TaskResult struct {
	MemberDID string
	Result    map[string]interface{}
	Err       error
	Duration  time.Duration
}

TaskResult holds the output of a delegated task from a single member.

type TaskResultSummary

type TaskResultSummary struct {
	TaskID     string  `json:"taskId"`
	AgentDID   string  `json:"agentDid"`
	AgentName  string  `json:"agentName"`
	Success    bool    `json:"success"`
	Result     string  `json:"result"`
	Error      string  `json:"error,omitempty"`
	DurationMs int64   `json:"durationMs"`
	Cost       float64 `json:"cost"`
}

TaskResultSummary holds the summarized result of a delegated task.

func ResolveConflict

func ResolveConflict(strategy ConflictStrategy, results []TaskResultSummary) (*TaskResultSummary, error)

ResolveConflict picks the best result based on the given strategy.

type Team

type Team struct {
	ID          string     `json:"id"`
	Name        string     `json:"name"`
	Goal        string     `json:"goal"`
	LeaderDID   string     `json:"leaderDid"`
	Status      TeamStatus `json:"status"`
	MaxMembers  int        `json:"maxMembers"`
	Budget      float64    `json:"budget"`
	Spent       float64    `json:"spent"`
	CreatedAt   time.Time  `json:"createdAt"`
	DisbandedAt time.Time  `json:"disbandedAt,omitempty"`
	// contains filtered or unexported fields
}

Team is a task-scoped group of P2P agents coordinating on a shared goal.

func NewTeam

func NewTeam(id, name, goal, leaderDID string, maxMembers int) *Team

NewTeam creates a team in the forming state.

func (*Team) Activate

func (t *Team) Activate()

Activate transitions the team to active status.

func (*Team) ActiveMembers

func (t *Team) ActiveMembers() []*Member

ActiveMembers returns deep copies of members that are not in MemberLeft or MemberFailed state.

func (*Team) AddMember

func (t *Team) AddMember(m *Member) error

AddMember adds an agent to the team.

func (*Team) AddSpend

func (t *Team) AddSpend(amount float64) error

AddSpend adds to the team's spent total. Returns ErrBudgetExceeded if budget is exceeded.

func (*Team) Disband

func (t *Team) Disband()

Disband marks the team as disbanded.

func (*Team) GetMember

func (t *Team) GetMember(did string) *Member

GetMember returns a member by DID.

func (*Team) MarshalJSON added in v0.6.0

func (t *Team) MarshalJSON() ([]byte, error)

MarshalJSON implements json.Marshaler. Converts the internal members map to a slice.

func (*Team) MemberCount

func (t *Team) MemberCount() int

MemberCount returns the number of members.

func (*Team) Members

func (t *Team) Members() []*Member

Members returns copies of all current members (safe for concurrent use).

func (*Team) RemoveMember

func (t *Team) RemoveMember(did string) error

RemoveMember removes an agent from the team.

func (*Team) UnmarshalJSON added in v0.6.0

func (t *Team) UnmarshalJSON(data []byte) error

UnmarshalJSON implements json.Unmarshaler. Converts the members slice back to a map.

type TeamStatus

type TeamStatus string

TeamStatus represents the lifecycle state of a team.

const (
	StatusForming      TeamStatus = "forming"
	StatusActive       TeamStatus = "active"
	StatusCompleted    TeamStatus = "completed"
	StatusShuttingDown TeamStatus = "shutting_down"
	StatusDisbanded    TeamStatus = "disbanded"
)

type TeamStore added in v0.6.0

type TeamStore interface {
	Save(team *Team) error
	Load(teamID string) (*Team, error)
	LoadAll() ([]*Team, error)
	Delete(teamID string) error
}

TeamStore persists team state.

type TrustScoreFunc

type TrustScoreFunc func(ctx context.Context, peerDID string) (float64, error)

TrustScoreFunc retrieves the trust score for a peer.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL