Documentation
¶
Overview ¶
Package researchexecute implements the execute_subqueries component from ADR-045 Phase 1 (PR 4 of six per docs/operations/22-adr045-phase1-plan.md).
execute_subqueries is the code-heavy stage of the graph-search rule chain. It receives a publish trigger on component.execute_subqueries.<loop_id>, reads the upstream research.Intent + research.RouteDecision payloads from AGENT_LOOPS, materializes one or more typed sub-queries from the routing intent, executes them in parallel across multiple retrieval tiers, normalises + dedups results, enforces the caller's token budget, and writes a research.Evidence array envelope plus an execute.complete.<loop_id> trigger key that R3 watches to dispatch the assess_sufficiency stage.
Two input shapes — same component:
walk_seeds args: model emits seed references (name/partial_id/candidate_index); component resolves to full 6-part federated entity IDs via the entity index, then dispatches multi-hop expansion as predicate_walk sub-queries.
decompose args: model emits decomposition intent (axes/focus/scope); component materializes typed sub-queries from MINIMAL TEMPLATES — entity_type → entity_state, time → temporal_range, predicate → predicate_walk (the fallback for axes that don't map to a dedicated primitive). spatial is deferred to Phase 2 (graph-index-spatial wire).
Architectural notes:
Pure-code component (no LLM calls). All work is graph + index retrieval against existing gateways.
Tier 0 (predicate queries) executes via graph-query NATS- direct subjects (graph.query.entitiesByPrefix etc.). Tier 1 (BM25) executes via graph.query.searchGraph (same surface research-graph-classify uses for initial candidate retrieval). Tier 2 (neural) is deferred to Phase 2.
Sub-query types are component-internal — defined here, not in agentic/research/. Reshapeable without cross-package churn if Phase 2 learns better primitives.
Parallel fan-out via errgroup with a bounded concurrency cap (config-driven). Per-tier ordering with recency tie-break; learned ranker deferred to Phase 2.
Provenance preserved end-to-end — every Evidence carries {tier, source, entity_id} the agent can quote back. No fabricated refs.
All public methods safe for concurrent use across loops; the component holds no per-call mutable state. Same pattern as research-graph-classify and research-graph-route.
Index ¶
- Constants
- func NewProcessor(rawConfig json.RawMessage, deps component.Dependencies) (component.Discoverable, error)
- func Register(registry *component.Registry) error
- type BM25Args
- type Component
- func (c *Component) ConfigSchema() component.ConfigSchema
- func (c *Component) DataFlow() component.FlowMetrics
- func (c *Component) Health() component.HealthStatus
- func (c *Component) Initialize() error
- func (c *Component) InputPorts() []component.Port
- func (c *Component) Meta() component.Metadata
- func (c *Component) OutputPorts() []component.Port
- func (c *Component) Start(ctx context.Context) error
- func (c *Component) Stop(timeout time.Duration) error
- type Config
- type EntityStateArgs
- type GraphQueryClient
- type LoopStore
- type PredicateWalkArgs
- type SubQuery
- type SubQueryType
- type TemporalRangeArgs
Constants ¶
const ( // DefaultExecuteTimeout caps the wall-clock for the full // fan-out (all sub-queries × all tiers). Generous default; per- // tier deadlines are derived as fractions of this cap. DefaultExecuteTimeout = 60 * time.Second // DefaultMaxParallelism caps concurrent sub-query execution. // Keeps fan-out from saturating the responding gateways under // a wide-decompose scenario; operators can raise for staging // environments with more capacity. DefaultMaxParallelism = 8 // DefaultMaxResultsPerSubquery caps per-sub-query result count // before ranking + budget enforcement run. Prevents a single // runaway sub-query from dominating the evidence array. DefaultMaxResultsPerSubquery = 50 // DefaultBudgetTokens is the fallback evidence-budget when the // upstream Intent doesn't supply one. Should never normally // fire (Intent.ResolvedBudgetTokens supplies a default of 4000 // per agentic/research) but acts as a safety net. DefaultBudgetTokens = 4000 // DefaultPerSubqueryTimeoutFraction is the fraction of the // overall ExecuteTimeout each sub-query gets as its per-call // deadline. 0.5 leaves slack for fan-out overhead and ranking; // operators can tighten. DefaultPerSubqueryTimeoutFraction = 0.5 )
Default knobs surfaced as exported constants so the materializer + fan-out tests and operator docs can reference them by name rather than duplicating literals.
const ComponentName = "research-graph-execute"
ComponentName is the canonical registry name + log subsystem.
Variables ¶
This section is empty.
Functions ¶
func NewProcessor ¶
func NewProcessor(rawConfig json.RawMessage, deps component.Dependencies) (component.Discoverable, error)
NewProcessor is the component-factory shape registered with the component registry.
Types ¶
type Component ¶
type Component struct {
// contains filtered or unexported fields
}
Component implements the execute_subqueries processor. Struct field set is intentionally small; lifecycle methods own the NATS plumbing and the per-message handler hands off to the pure executeAll function in handler.go.
func (*Component) ConfigSchema ¶
func (c *Component) ConfigSchema() component.ConfigSchema
ConfigSchema implements Discoverable.
func (*Component) DataFlow ¶
func (c *Component) DataFlow() component.FlowMetrics
DataFlow implements Discoverable.
func (*Component) Health ¶
func (c *Component) Health() component.HealthStatus
Health implements Discoverable.
func (*Component) Initialize ¶
Initialize is part of the LifecycleComponent contract — no pre- Start work.
func (*Component) InputPorts ¶
InputPorts implements Discoverable.
func (*Component) OutputPorts ¶
OutputPorts implements Discoverable. execute_subqueries has no NATS-publishing outputs; emits via KV writes to AGENT_LOOPS.
type Config ¶
type Config struct {
Ports *component.PortConfig `` /* 179-byte string literal not displayed */
LoopsBucket string `` /* 175-byte string literal not displayed */
ExecuteTimeout time.Duration `` /* 163-byte string literal not displayed */
MaxParallelism int `` /* 193-byte string literal not displayed */
MaxResultsPerSubquery int `` /* 252-byte string literal not displayed */
}
Config holds operator-tunable knobs for the execute_subqueries component.
func DefaultConfig ¶
func DefaultConfig() Config
DefaultConfig returns a default Config skeleton with the standard execute_subqueries input port.
func (*Config) ApplyDefaults ¶
func (c *Config) ApplyDefaults()
ApplyDefaults fills in defaults for unset fields.
type EntityStateArgs ¶
type EntityStateArgs struct {
EntityIDs []string `json:"entity_ids"`
}
EntityStateArgs carries IDs to fetch.
type GraphQueryClient ¶
type GraphQueryClient interface {
EntityState(ctx context.Context, args EntityStateArgs, tier, source string, limit int) ([]research.Evidence, error)
PredicateWalk(ctx context.Context, args PredicateWalkArgs, tier, source string, limit int) ([]research.Evidence, error)
TemporalRange(ctx context.Context, args TemporalRangeArgs, tier, source string, limit int) ([]research.Evidence, error)
BM25(ctx context.Context, args BM25Args, tier, source string, limit int) ([]research.Evidence, error)
}
GraphQueryClient is the narrow surface this component consumes from graph-query / graph-index. Production wraps NATS-direct subjects (graph.query.entitiesByPrefix, graph.query.searchGraph, etc.); tests substitute an in-memory fake so the matrix doesn't need a live graph stack.
Per-method semantics:
- EntityState: returns the current Triples for the named entities, projected into Evidence (one per entity).
- PredicateWalk: from each seed, returns Evidence for entities reachable within MaxHops via Predicates (empty Predicates = all).
- TemporalRange: returns Evidence for entities within the time window, optionally filtered by topic.
- BM25: text search via graph-query's existing BM25 surface.
All methods MUST stamp Tier + Source on every returned Evidence (taken from the SubQuery they were dispatched for) so provenance stays honest end-to-end. The orchestrator does NOT re-stamp.
type LoopStore ¶
type LoopStore interface {
// GetIntent loads the research_intent payload from the
// research.requested.<loopID> key.
GetIntent(ctx context.Context, loopID string) (*research.Intent, error)
// GetClassifierOutput loads the upstream ClassifierOutput from
// the classify.complete.<loopID> trigger key. Needed for
// walk_seeds candidate_index resolution + decompose entity_type
// anchoring.
GetClassifierOutput(ctx context.Context, loopID string) (*research.ClassifierOutput, error)
// GetRouteDecision loads the upstream RouteDecision from the
// route.complete.<loopID> trigger key. Drives sub-query
// materialization.
GetRouteDecision(ctx context.Context, loopID string) (*research.RouteDecision, error)
// PutExecutionOutput writes the ExecutionOutput envelope at
// R3's trigger key execute.complete.<loopID>.
PutExecutionOutput(ctx context.Context, loopID string, envelope []byte) error
// PutSnapshot writes the envelope at the stable non-trigger
// key execute.snapshot.<loopID> so operators / downstream
// queryability can read without racing R3's wildcard watcher.
PutSnapshot(ctx context.Context, loopID string, envelope []byte) error
}
LoopStore is the AGENT_LOOPS read/write surface this component consumes. Production wraps natsclient.KVStore; tests substitute an in-memory map.
type PredicateWalkArgs ¶
type PredicateWalkArgs struct {
Seeds []string `json:"seeds"`
Predicates []string `json:"predicates,omitempty"`
MaxHops int `json:"max_hops,omitempty"`
}
PredicateWalkArgs carries seed IDs + optional predicate filter. MaxHops 0 resolves to 1 at the executor.
type SubQuery ¶
type SubQuery struct {
Type SubQueryType `json:"type"`
Tier string `json:"tier"` // "0" (predicate) or "1" (BM25)
Source string `json:"source"` // e.g. "walk_seeds:drone-001"; surfaces on Evidence.Source
// Per-type args. Exactly one is populated based on Type — the
// executor switch enforces. Optional fields allow tests + future
// templates to omit per-type primitives that don't apply.
EntityState *EntityStateArgs `json:"entity_state,omitempty"`
PredicateWalk *PredicateWalkArgs `json:"predicate_walk,omitempty"`
TemporalRange *TemporalRangeArgs `json:"temporal_range,omitempty"`
BM25 *BM25Args `json:"bm25,omitempty"`
}
SubQuery is a typed retrieval request. Component-internal — reshapeable without cross-package churn if Phase 2 learns better primitives. Each variant carries the minimum fields its tier executor needs; the Type discriminator drives dispatch in executeSubQuery.
Tier and Source travel with the SubQuery so the per-tier executor can stamp them onto every research.Evidence it produces without re-deriving — keeps provenance honest end-to-end.
type SubQueryType ¶
type SubQueryType string
SubQueryType is the closed set of Tier 0+1 primitives PR 4 ships. Phase 2 adds spatial_polygon + neural; Phase 1 stays minimal.
const ( // SubQueryTypeEntityState fetches current state of named // entities. Tier 0; used for walk_seeds anchoring + decompose's // entity_type axis when classifier candidates are present. SubQueryTypeEntityState SubQueryType = "entity_state" // SubQueryTypePredicateWalk traverses predicate(s) from seed // entities. Tier 0; used for walk_seeds neighborhood expansion + // decompose's free-form axis fallback. SubQueryTypePredicateWalk SubQueryType = "predicate_walk" // SubQueryTypeTemporalRange queries entities within a time // window. Tier 0; used for decompose's time axis. Phase 1 ships // the type but the executor falls through to BM25 on topic when // graph-index-temporal isn't wired — operators see the // "temporal degrade" hint on the produced Evidence. SubQueryTypeTemporalRange SubQueryType = "temporal_range" // SubQueryTypeBM25 text-searches via graph-query's existing // BM25 surface. Tier 1; always added to widen coverage beyond // purely-structural retrieval (intent-shaped routing can miss // keyword matches the classifier surfaced). SubQueryTypeBM25 SubQueryType = "bm25" )
type TemporalRangeArgs ¶
type TemporalRangeArgs struct {
// Start / End are RFC3339 strings; component-internal so
// timezone handling stays in the executor where the upstream
// graph-index-temporal surface lives.
Start string `json:"start"`
End string `json:"end"`
Topic string `json:"topic,omitempty"`
}
TemporalRangeArgs carries start/end + an optional topic filter. Empty Topic widens to all entities in the window (may be heavy; callers should always pass a topic in practice).