researchexecute

package
v1.0.0-beta.96 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 3, 2026 License: MIT Imports: 23 Imported by: 0

Documentation

Overview

Package researchexecute implements the execute_subqueries component from ADR-045 Phase 1 (PR 4 of six per docs/operations/22-adr045-phase1-plan.md).

execute_subqueries is the code-heavy stage of the graph-search rule chain. It receives a publish trigger on component.execute_subqueries.<loop_id>, reads the upstream research.Intent + research.RouteDecision payloads from AGENT_LOOPS, materializes one or more typed sub-queries from the routing intent, executes them in parallel across multiple retrieval tiers, normalises + dedups results, enforces the caller's token budget, and writes a research.Evidence array envelope plus an execute.complete.<loop_id> trigger key that R3 watches to dispatch the assess_sufficiency stage.

Two input shapes — same component:

  • walk_seeds args: model emits seed references (name/partial_id/candidate_index); component resolves to full 6-part federated entity IDs via the entity index, then dispatches multi-hop expansion as predicate_walk sub-queries.

  • decompose args: model emits decomposition intent (axes/focus/scope); component materializes typed sub-queries from MINIMAL TEMPLATES — entity_type → entity_state, time → temporal_range, predicate → predicate_walk (the fallback for axes that don't map to a dedicated primitive). spatial is deferred to Phase 2 (graph-index-spatial wire).

Architectural notes:

  • Pure-code component (no LLM calls). All work is graph + index retrieval against existing gateways.

  • Tier 0 (predicate queries) executes via graph-query NATS- direct subjects (graph.query.entitiesByPrefix etc.). Tier 1 (BM25) executes via graph.query.searchGraph (same surface research-graph-classify uses for initial candidate retrieval). Tier 2 (neural) is deferred to Phase 2.

  • Sub-query types are component-internal — defined here, not in agentic/research/. Reshapeable without cross-package churn if Phase 2 learns better primitives.

  • Parallel fan-out via errgroup with a bounded concurrency cap (config-driven). Per-tier ordering with recency tie-break; learned ranker deferred to Phase 2.

  • Provenance preserved end-to-end — every Evidence carries {tier, source, entity_id} the agent can quote back. No fabricated refs.

  • All public methods safe for concurrent use across loops; the component holds no per-call mutable state. Same pattern as research-graph-classify and research-graph-route.

Index

Constants

View Source
const (
	// DefaultExecuteTimeout caps the wall-clock for the full
	// fan-out (all sub-queries × all tiers). Generous default; per-
	// tier deadlines are derived as fractions of this cap.
	DefaultExecuteTimeout = 60 * time.Second

	// DefaultMaxParallelism caps concurrent sub-query execution.
	// Keeps fan-out from saturating the responding gateways under
	// a wide-decompose scenario; operators can raise for staging
	// environments with more capacity.
	DefaultMaxParallelism = 8

	// DefaultMaxResultsPerSubquery caps per-sub-query result count
	// before ranking + budget enforcement run. Prevents a single
	// runaway sub-query from dominating the evidence array.
	DefaultMaxResultsPerSubquery = 50

	// DefaultBudgetTokens is the fallback evidence-budget when the
	// upstream Intent doesn't supply one. Should never normally
	// fire (Intent.ResolvedBudgetTokens supplies a default of 4000
	// per agentic/research) but acts as a safety net.
	DefaultBudgetTokens = 4000

	// DefaultPerSubqueryTimeoutFraction is the fraction of the
	// overall ExecuteTimeout each sub-query gets as its per-call
	// deadline. 0.5 leaves slack for fan-out overhead and ranking;
	// operators can tighten.
	DefaultPerSubqueryTimeoutFraction = 0.5
)

Default knobs surfaced as exported constants so the materializer + fan-out tests and operator docs can reference them by name rather than duplicating literals.

View Source
const ComponentName = "research-graph-execute"

ComponentName is the canonical registry name + log subsystem.

Variables

This section is empty.

Functions

func NewProcessor

func NewProcessor(rawConfig json.RawMessage, deps component.Dependencies) (component.Discoverable, error)

NewProcessor is the component-factory shape registered with the component registry.

func Register

func Register(registry *component.Registry) error

Register registers the execute_subqueries processor with the supplied component registry. Called from componentregistry.Register at process bootstrap so production binaries pick the component up without extra wiring.

Types

type BM25Args

type BM25Args struct {
	Query string `json:"query"`
	Limit int    `json:"limit,omitempty"`
}

BM25Args carries the text query + result cap.

type Component

type Component struct {
	// contains filtered or unexported fields
}

Component implements the execute_subqueries processor. Struct field set is intentionally small; lifecycle methods own the NATS plumbing and the per-message handler hands off to the pure executeAll function in handler.go.

func (*Component) ConfigSchema

func (c *Component) ConfigSchema() component.ConfigSchema

ConfigSchema implements Discoverable.

func (*Component) DataFlow

func (c *Component) DataFlow() component.FlowMetrics

DataFlow implements Discoverable.

func (*Component) Health

func (c *Component) Health() component.HealthStatus

Health implements Discoverable.

func (*Component) Initialize

func (c *Component) Initialize() error

Initialize is part of the LifecycleComponent contract — no pre- Start work.

func (*Component) InputPorts

func (c *Component) InputPorts() []component.Port

InputPorts implements Discoverable.

func (*Component) Meta

func (c *Component) Meta() component.Metadata

Meta implements Discoverable.

func (*Component) OutputPorts

func (c *Component) OutputPorts() []component.Port

OutputPorts implements Discoverable. execute_subqueries has no NATS-publishing outputs; emits via KV writes to AGENT_LOOPS.

func (*Component) Start

func (c *Component) Start(ctx context.Context) error

Start opens the AGENT_LOOPS bucket, wires the GraphQueryClient adapter, subscribes inputs, reports idle.

func (*Component) Stop

func (c *Component) Stop(timeout time.Duration) error

Stop drains subscriptions and flips started under c.mu.

type Config

type Config struct {
	Ports *component.PortConfig `` /* 179-byte string literal not displayed */

	LoopsBucket string `` /* 175-byte string literal not displayed */

	ExecuteTimeout time.Duration `` /* 163-byte string literal not displayed */

	MaxParallelism int `` /* 193-byte string literal not displayed */

	MaxResultsPerSubquery int `` /* 252-byte string literal not displayed */
}

Config holds operator-tunable knobs for the execute_subqueries component.

func DefaultConfig

func DefaultConfig() Config

DefaultConfig returns a default Config skeleton with the standard execute_subqueries input port.

func (*Config) ApplyDefaults

func (c *Config) ApplyDefaults()

ApplyDefaults fills in defaults for unset fields.

func (*Config) Validate

func (c *Config) Validate() error

Validate validates the configuration. Negative caps are rejected; zero values fall through to ApplyDefaults.

type EntityStateArgs

type EntityStateArgs struct {
	EntityIDs []string `json:"entity_ids"`
}

EntityStateArgs carries IDs to fetch.

type GraphQueryClient

type GraphQueryClient interface {
	EntityState(ctx context.Context, args EntityStateArgs, tier, source string, limit int) ([]research.Evidence, error)
	PredicateWalk(ctx context.Context, args PredicateWalkArgs, tier, source string, limit int) ([]research.Evidence, error)
	TemporalRange(ctx context.Context, args TemporalRangeArgs, tier, source string, limit int) ([]research.Evidence, error)
	BM25(ctx context.Context, args BM25Args, tier, source string, limit int) ([]research.Evidence, error)
}

GraphQueryClient is the narrow surface this component consumes from graph-query / graph-index. Production wraps NATS-direct subjects (graph.query.entitiesByPrefix, graph.query.searchGraph, etc.); tests substitute an in-memory fake so the matrix doesn't need a live graph stack.

Per-method semantics:

  • EntityState: returns the current Triples for the named entities, projected into Evidence (one per entity).
  • PredicateWalk: from each seed, returns Evidence for entities reachable within MaxHops via Predicates (empty Predicates = all).
  • TemporalRange: returns Evidence for entities within the time window, optionally filtered by topic.
  • BM25: text search via graph-query's existing BM25 surface.

All methods MUST stamp Tier + Source on every returned Evidence (taken from the SubQuery they were dispatched for) so provenance stays honest end-to-end. The orchestrator does NOT re-stamp.

type LoopStore

type LoopStore interface {
	// GetIntent loads the research_intent payload from the
	// research.requested.<loopID> key.
	GetIntent(ctx context.Context, loopID string) (*research.Intent, error)

	// GetClassifierOutput loads the upstream ClassifierOutput from
	// the classify.complete.<loopID> trigger key. Needed for
	// walk_seeds candidate_index resolution + decompose entity_type
	// anchoring.
	GetClassifierOutput(ctx context.Context, loopID string) (*research.ClassifierOutput, error)

	// GetRouteDecision loads the upstream RouteDecision from the
	// route.complete.<loopID> trigger key. Drives sub-query
	// materialization.
	GetRouteDecision(ctx context.Context, loopID string) (*research.RouteDecision, error)

	// PutExecutionOutput writes the ExecutionOutput envelope at
	// R3's trigger key execute.complete.<loopID>.
	PutExecutionOutput(ctx context.Context, loopID string, envelope []byte) error

	// PutSnapshot writes the envelope at the stable non-trigger
	// key execute.snapshot.<loopID> so operators / downstream
	// queryability can read without racing R3's wildcard watcher.
	PutSnapshot(ctx context.Context, loopID string, envelope []byte) error
}

LoopStore is the AGENT_LOOPS read/write surface this component consumes. Production wraps natsclient.KVStore; tests substitute an in-memory map.

type PredicateWalkArgs

type PredicateWalkArgs struct {
	Seeds      []string `json:"seeds"`
	Predicates []string `json:"predicates,omitempty"`
	MaxHops    int      `json:"max_hops,omitempty"`
}

PredicateWalkArgs carries seed IDs + optional predicate filter. MaxHops 0 resolves to 1 at the executor.

type SubQuery

type SubQuery struct {
	Type   SubQueryType `json:"type"`
	Tier   string       `json:"tier"`   // "0" (predicate) or "1" (BM25)
	Source string       `json:"source"` // e.g. "walk_seeds:drone-001"; surfaces on Evidence.Source

	// Per-type args. Exactly one is populated based on Type — the
	// executor switch enforces. Optional fields allow tests + future
	// templates to omit per-type primitives that don't apply.
	EntityState   *EntityStateArgs   `json:"entity_state,omitempty"`
	PredicateWalk *PredicateWalkArgs `json:"predicate_walk,omitempty"`
	TemporalRange *TemporalRangeArgs `json:"temporal_range,omitempty"`
	BM25          *BM25Args          `json:"bm25,omitempty"`
}

SubQuery is a typed retrieval request. Component-internal — reshapeable without cross-package churn if Phase 2 learns better primitives. Each variant carries the minimum fields its tier executor needs; the Type discriminator drives dispatch in executeSubQuery.

Tier and Source travel with the SubQuery so the per-tier executor can stamp them onto every research.Evidence it produces without re-deriving — keeps provenance honest end-to-end.

func (SubQuery) Validate

func (q SubQuery) Validate() error

Validate returns a non-nil error when required per-type fields are missing. Called by the materializer before fan-out so a malformed sub-query surfaces before any retrieval work runs.

type SubQueryType

type SubQueryType string

SubQueryType is the closed set of Tier 0+1 primitives PR 4 ships. Phase 2 adds spatial_polygon + neural; Phase 1 stays minimal.

const (
	// SubQueryTypeEntityState fetches current state of named
	// entities. Tier 0; used for walk_seeds anchoring + decompose's
	// entity_type axis when classifier candidates are present.
	SubQueryTypeEntityState SubQueryType = "entity_state"

	// SubQueryTypePredicateWalk traverses predicate(s) from seed
	// entities. Tier 0; used for walk_seeds neighborhood expansion +
	// decompose's free-form axis fallback.
	SubQueryTypePredicateWalk SubQueryType = "predicate_walk"

	// SubQueryTypeTemporalRange queries entities within a time
	// window. Tier 0; used for decompose's time axis. Phase 1 ships
	// the type but the executor falls through to BM25 on topic when
	// graph-index-temporal isn't wired — operators see the
	// "temporal degrade" hint on the produced Evidence.
	SubQueryTypeTemporalRange SubQueryType = "temporal_range"

	// SubQueryTypeBM25 text-searches via graph-query's existing
	// BM25 surface. Tier 1; always added to widen coverage beyond
	// purely-structural retrieval (intent-shaped routing can miss
	// keyword matches the classifier surfaced).
	SubQueryTypeBM25 SubQueryType = "bm25"
)

type TemporalRangeArgs

type TemporalRangeArgs struct {
	// Start / End are RFC3339 strings; component-internal so
	// timezone handling stays in the executor where the upstream
	// graph-index-temporal surface lives.
	Start string `json:"start"`
	End   string `json:"end"`
	Topic string `json:"topic,omitempty"`
}

TemporalRangeArgs carries start/end + an optional topic filter. Empty Topic widens to all entities in the window (may be heavy; callers should always pass a topic in practice).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL