client

package
v1.0.0-alpha.36 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 12, 2026 License: MIT Imports: 24 Imported by: 0

Documentation

Overview

Package client provides test utilities for SemStreams E2E tests

Package client provides HTTP clients for SemStreams E2E tests

Package client provides HTTP clients for SemStreams E2E tests

Package client provides test utilities for SemStreams E2E tests

Package client provides HTTP clients for SemStreams E2E tests

Package client provides HTTP clients for SemStreams E2E tests

Package client provides HTTP clients for SemStreams E2E tests

Package client provides HTTP clients for SemStreams E2E tests

Package client provides HTTP and WebSocket clients for SemStreams E2E tests

Index

Constants

View Source
const BucketComponentStatus = "COMPONENT_STATUS"

BucketComponentStatus is the KV bucket for component lifecycle status

View Source
const BucketEntityStates = "ENTITY_STATES"

BucketEntityStates is the KV bucket name for entity states

View Source
const BucketOASFRecords = "OASF_RECORDS"

BucketOASFRecords is the KV bucket name for OASF records

View Source
const BucketWorkflowDefinitions = "WORKFLOW_DEFINITIONS"

BucketWorkflowDefinitions is the KV bucket for workflow definitions

View Source
const BucketWorkflowExecutions = "WORKFLOW_EXECUTIONS"

BucketWorkflowExecutions is the KV bucket for workflow executions

View Source
const StructuralIndexBucket = "STRUCTURAL_INDEX"

StructuralIndexBucket is the KV bucket for structural indices

Variables

View Source
var IndexBuckets = struct {
	EntityStates  string
	Predicate     string
	Incoming      string
	Outgoing      string
	Alias         string
	Spatial       string
	Temporal      string
	Embedding     string
	EmbeddingDedp string
	Community     string
	Structural    string
	Context       string
}{
	EntityStates:  "ENTITY_STATES",
	Predicate:     "PREDICATE_INDEX",
	Incoming:      "INCOMING_INDEX",
	Outgoing:      "OUTGOING_INDEX",
	Alias:         "ALIAS_INDEX",
	Spatial:       "SPATIAL_INDEX",
	Temporal:      "TEMPORAL_INDEX",
	Embedding:     "EMBEDDING_INDEX",
	EmbeddingDedp: "EMBEDDING_DEDUP",
	Community:     "COMMUNITY_INDEX",
	Structural:    "STRUCTURAL_INDEX",
	Context:       "CONTEXT_INDEX",
}

IndexBuckets defines the standard index bucket names

Functions

func CountMessageTypes

func CountMessageTypes(envelopes []StatusStreamEnvelope) map[string]int

CountMessageTypes returns a count of each message type in the envelopes

func CountSourceEntities

func CountSourceEntities(events []KVChangeEvent) int

CountSourceEntities returns the count of source entities (non-container) from events. Container entities (ending in .group, .group.container, .group.container.level) are excluded.

func CountUniqueKeys

func CountUniqueKeys(events []KVChangeEvent) int

CountUniqueKeys returns the count of unique non-deleted keys from events. It tracks creates/updates and removes deletes to give an accurate count of keys that currently exist. This is essential for SSE-based counting since NATS KV watch sends all existing keys first before streaming updates.

Types

type A2AAgentCard

type A2AAgentCard struct {
	Name         string `json:"name"`
	Description  string `json:"description"`
	URL          string `json:"url"`
	Capabilities struct {
		Streaming         bool `json:"streaming"`
		PushNotifications bool `json:"pushNotifications"`
	} `json:"capabilities"`
	Skills []struct {
		ID          string `json:"id"`
		Name        string `json:"name"`
		Description string `json:"description,omitempty"`
	} `json:"skills,omitempty"`
}

A2AAgentCard represents an A2A agent card response.

type A2AClient

type A2AClient struct {
	// contains filtered or unexported fields
}

A2AClient provides HTTP client for A2A adapter testing.

func NewA2AClient

func NewA2AClient(baseURL string) *A2AClient

NewA2AClient creates a new A2A test client.

func (*A2AClient) GetAgentCard

func (c *A2AClient) GetAgentCard(ctx context.Context) (*A2AAgentCard, error)

GetAgentCard retrieves the agent card from the A2A adapter.

func (*A2AClient) GetTask

func (c *A2AClient) GetTask(ctx context.Context, taskID string) (*A2ATask, error)

GetTask retrieves task status from the A2A adapter.

func (*A2AClient) Health

func (c *A2AClient) Health(ctx context.Context) error

Health checks if the A2A adapter is healthy.

func (*A2AClient) SubmitTask

func (c *A2AClient) SubmitTask(ctx context.Context, taskID, prompt string) (*A2ATask, error)

SubmitTask submits a task to the A2A adapter.

type A2AMessage

type A2AMessage struct {
	Role  string `json:"role"`
	Parts []struct {
		Text string `json:"text,omitempty"`
	} `json:"parts"`
}

A2AMessage represents an A2A message.

type A2ATask

type A2ATask struct {
	ID     string `json:"id"`
	Status struct {
		State string `json:"state"`
	} `json:"status"`
	Messages []A2AMessage `json:"messages,omitempty"`
}

A2ATask represents an A2A task for testing.

type AGNTCYMockClient

type AGNTCYMockClient struct {
	// contains filtered or unexported fields
}

AGNTCYMockClient provides HTTP client for testing the AGNTCY mock server.

func NewAGNTCYMockClient

func NewAGNTCYMockClient(baseURL string) *AGNTCYMockClient

NewAGNTCYMockClient creates a new client for the AGNTCY mock server.

func (*AGNTCYMockClient) Health

func (c *AGNTCYMockClient) Health(ctx context.Context) error

Health checks if the AGNTCY mock server is healthy.

func (*AGNTCYMockClient) ListRegistrations

func (c *AGNTCYMockClient) ListRegistrations(ctx context.Context) ([]DirectoryRegistration, error)

ListRegistrations returns all agent registrations from the mock directory.

func (*AGNTCYMockClient) WaitForRegistration

func (c *AGNTCYMockClient) WaitForRegistration(
	ctx context.Context,
	agentIDSubstring string,
	timeout time.Duration,
) (*DirectoryRegistration, error)

WaitForRegistration waits for an agent to be registered in the directory.

type Anomaly

type Anomaly struct {
	ID         string                 `json:"id"`
	Type       string                 `json:"type"`
	EntityA    string                 `json:"entity_a"`
	EntityB    string                 `json:"entity_b,omitempty"`
	Confidence float64                `json:"confidence"`
	Status     string                 `json:"status"`
	Evidence   map[string]interface{} `json:"evidence,omitempty"`
	DetectedAt string                 `json:"detected_at,omitempty"`
}

Anomaly represents a structural anomaly detected by the inference system

type AnomalyCounts

type AnomalyCounts struct {
	ByType   map[string]int `json:"by_type"`
	ByStatus map[string]int `json:"by_status"`
	Total    int            `json:"total"`
}

AnomalyCounts holds counts of anomalies by type and status

type Category

type Category struct {
	Name    string             `json:"name"`
	Metrics map[string]float64 `json:"metrics"`
}

Category groups related metrics

type ComponentInfo

type ComponentInfo struct {
	Name      string `json:"name"`
	Component string `json:"component"` // Component factory name (e.g., "udp", "graph-processor")
	Type      string `json:"type"`      // Component category (input/processor/output/storage/gateway)
	Enabled   bool   `json:"enabled"`
	State     string `json:"state"`
	Healthy   bool   `json:"healthy"`
	LastError string `json:"last_error,omitempty"`
}

ComponentInfo represents a single component's information Matches SemStreams /components/list API response format

type ComponentStatus

type ComponentStatus struct {
	Component       string `json:"component"`
	Stage           string `json:"stage"`
	CycleID         string `json:"cycle_id,omitempty"`
	CycleStartedAt  string `json:"cycle_started_at,omitempty"`
	StageStartedAt  string `json:"stage_started_at"`
	LastCompletedAt string `json:"last_completed_at,omitempty"`
	LastResult      string `json:"last_result,omitempty"` // "success" or "error"
	LastError       string `json:"last_error,omitempty"`
}

ComponentStatus represents the current processing state of a component. Matches component.Status from the core package.

type ContextEntry

type ContextEntry struct {
	EntityID  string `json:"entity_id"`
	Predicate string `json:"predicate"`
}

ContextEntry matches the indexmanager.ContextEntry structure. Phase 5: Added to verify ContextIndex stores entity+predicate pairs.

type DirectoryRegistration

type DirectoryRegistration struct {
	AgentID       string         `json:"agent_id"`
	OASFRecord    map[string]any `json:"oasf_record"`
	RegisteredAt  string         `json:"registered_at"`
	LastHeartbeat string         `json:"last_heartbeat"`
	TTL           string         `json:"ttl"`
}

DirectoryRegistration represents an agent registration in the mock directory.

type DisconnectedNode

type DisconnectedNode struct {
	ComponentName string   `json:"component_name"`
	Issue         string   `json:"issue"`
	Suggestions   []string `json:"suggestions,omitempty"`
}

DisconnectedNode represents a component with no connections

type EntityStabilizationResult

type EntityStabilizationResult struct {
	FinalCount   int
	WaitDuration time.Duration
	Stabilized   bool
	TimedOut     bool
	UsedSSE      bool
}

EntityStabilizationResult contains the result of waiting for entity count to stabilize.

type EntityState

type EntityState struct {
	ID         string         `json:"id"`
	Type       string         `json:"type"`
	Properties map[string]any `json:"properties"`
	Triples    []Triple       `json:"triples,omitempty"`
	Version    int            `json:"version"`
	UpdatedAt  string         `json:"updated_at,omitempty"`
}

EntityState represents an entity stored in NATS KV

type FlowExpectation

type FlowExpectation struct {
	InputSubject     string        // Expected input subject pattern (e.g., "input.udp")
	ProcessingStages []string      // Expected processing stages (e.g., ["process.rule", "process.graph"])
	MinMessages      int           // Minimum number of messages expected
	MaxLatencyMs     int           // Maximum acceptable p99 latency in ms
	Timeout          time.Duration // How long to wait for flow completion
}

FlowExpectation defines what we expect to see in a flow

type FlowInfo

type FlowInfo struct {
	ID           string `json:"id"`
	Name         string `json:"name"`
	RuntimeState string `json:"runtime_state"`
}

FlowInfo represents a flow from the flowbuilder API

type FlowResult

type FlowResult struct {
	Valid        bool                    `json:"valid"`
	Messages     int                     `json:"messages"`
	AvgLatency   time.Duration           `json:"avg_latency"`
	P99Latency   time.Duration           `json:"p99_latency,omitempty"`
	StageMetrics map[string]StageMetrics `json:"stage_metrics,omitempty"`
	Errors       []string                `json:"errors,omitempty"`
	Warnings     []string                `json:"warnings,omitempty"`
}

FlowResult contains the results of flow validation

type FlowSnapshot

type FlowSnapshot struct {
	Timestamp       time.Time        `json:"timestamp"`
	MetricsBaseline *MetricsBaseline `json:"metrics_baseline"`
	MessageCount    int              `json:"message_count"`
	LastMessageID   string           `json:"last_message_id,omitempty"`
}

FlowSnapshot captures the state of metrics and messages at a point in time

type FlowTracer

type FlowTracer struct {
	// contains filtered or unexported fields
}

FlowTracer correlates metrics and message logs to trace data flow through the system

func NewFlowTracer

func NewFlowTracer(metrics *MetricsClient, logger *MessageLoggerClient) *FlowTracer

NewFlowTracer creates a new flow tracer with metrics and message logger clients

func (*FlowTracer) CaptureFlowSnapshot

func (t *FlowTracer) CaptureFlowSnapshot(ctx context.Context) (*FlowSnapshot, error)

CaptureFlowSnapshot captures the current state of metrics and messages

func (*FlowTracer) ValidateFlow

func (t *FlowTracer) ValidateFlow(ctx context.Context, preSnapshot *FlowSnapshot, expected FlowExpectation) (*FlowResult, error)

ValidateFlow validates that data flowed through expected stages

func (*FlowTracer) WaitForFlowCompletion

func (t *FlowTracer) WaitForFlowCompletion(ctx context.Context, expected FlowExpectation) error

WaitForFlowCompletion waits for a complete data flow from input to output

type FlowValidation

type FlowValidation struct {
	Timestamp           string                   `json:"timestamp"`
	ValidationStatus    string                   `json:"validation_status"`
	ConnectedComponents [][]string               `json:"connected_components"`
	ConnectedEdges      []map[string]interface{} `json:"connected_edges"`
	DisconnectedNodes   []DisconnectedNode       `json:"disconnected_nodes"`
	OrphanedPorts       []OrphanedPort           `json:"orphaned_ports"`
	StreamWarnings      []StreamWarning          `json:"stream_warnings"`
	Summary             FlowValidationSummary    `json:"summary"`
}

FlowValidation represents the result of flowgraph validation from /components/validate

type FlowValidationSummary

type FlowValidationSummary struct {
	TotalComponents       int  `json:"total_components"`
	TotalConnections      int  `json:"total_connections"`
	ComponentGroups       int  `json:"component_groups"`
	OrphanedPortCount     int  `json:"orphaned_port_count"`
	DisconnectedNodeCount int  `json:"disconnected_node_count"`
	StreamWarningCount    int  `json:"stream_warning_count"`
	HasStreamIssues       bool `json:"has_stream_issues"`
}

FlowValidationSummary contains summary statistics from flow validation

type FlowsResponse

type FlowsResponse struct {
	Flows []FlowInfo `json:"flows"`
}

FlowsResponse represents the response from GET /flowbuilder/flows

type IncomingEntry

type IncomingEntry struct {
	Predicate    string `json:"predicate"`
	FromEntityID string `json:"from_entity_id"`
}

IncomingEntry matches the indexmanager.IncomingEntry structure. Phase 5: Added to verify IncomingIndex predicate storage.

type KCoreMetadata

type KCoreMetadata struct {
	MaxCore     int         `json:"max_core"`
	EntityCount int         `json:"entity_count"`
	ComputedAt  string      `json:"computed_at"`
	CoreBuckets map[int]int `json:"core_buckets"` // core number -> count of entities
}

KCoreMetadata contains k-core index metadata

type KVChangeEvent

type KVChangeEvent struct {
	Bucket    string          `json:"bucket"`
	Key       string          `json:"key"`
	Operation string          `json:"operation"` // "create", "update", "delete"
	Value     json.RawMessage `json:"value,omitempty"`
	Revision  uint64          `json:"revision"`
	Timestamp time.Time       `json:"timestamp"`
}

KVChangeEvent represents a KV bucket change from SSE

type KVEntry

type KVEntry struct {
	Key      string    `json:"key"`
	Value    any       `json:"value"`
	Revision uint64    `json:"revision"`
	Created  time.Time `json:"created"`
}

KVEntry represents a single KV bucket entry

type KVQueryResult

type KVQueryResult struct {
	Bucket  string    `json:"bucket"`
	Pattern string    `json:"pattern"`
	Count   int       `json:"count"`
	Entries []KVEntry `json:"entries"`
}

KVQueryResult represents the result of a KV bucket query

type KVWatchCondition

type KVWatchCondition func(events []KVChangeEvent) (satisfied bool, err error)

KVWatchCondition is a function that evaluates KV events and returns true when satisfied

func AllMatch

func AllMatch(predicate func(KVChangeEvent) bool, minCount int) KVWatchCondition

AllMatch returns a condition satisfied when all collected events match predicate

func AnyMatch

func AnyMatch(predicate func(KVChangeEvent) bool) KVWatchCondition

AnyMatch returns a condition satisfied when any event matches predicate

func CombineAnd

func CombineAnd(conditions ...KVWatchCondition) KVWatchCondition

CombineAnd returns a condition satisfied when all conditions are satisfied

func CombineOr

func CombineOr(conditions ...KVWatchCondition) KVWatchCondition

CombineOr returns a condition satisfied when any condition is satisfied

func CountCreatesReaches

func CountCreatesReaches(target int) KVWatchCondition

CountCreatesReaches returns a condition satisfied when create operations >= target

func CountReaches

func CountReaches(target int) KVWatchCondition

CountReaches returns a condition that is satisfied when event count >= target

func KeyExists

func KeyExists(key string) KVWatchCondition

KeyExists returns a condition satisfied when a specific key appears

func KeyPrefixCount

func KeyPrefixCount(prefix string, target int) KVWatchCondition

KeyPrefixCount returns a condition satisfied when keys with prefix >= target

func KeySuffixCount

func KeySuffixCount(suffix string, target int) KVWatchCondition

KeySuffixCount returns a condition satisfied when keys with suffix >= target

func KeysExist

func KeysExist(keys []string) KVWatchCondition

KeysExist returns a condition satisfied when all specified keys appear

func SourceEntityCountReaches

func SourceEntityCountReaches(target int) KVWatchCondition

SourceEntityCountReaches returns a condition satisfied when source (non-container) entity count >= target. Container entities have suffixes like .group, .group.container, or .group.container.level and are excluded from the count.

func UniqueKeyCountReaches

func UniqueKeyCountReaches(target int) KVWatchCondition

UniqueKeyCountReaches returns a condition satisfied when unique non-deleted keys >= target. This properly counts entities by tracking creates/updates and removing deletes, giving an accurate count of keys that currently exist in the bucket.

func ValueFieldEquals

func ValueFieldEquals(field string, expectedValue interface{}) KVWatchCondition

ValueFieldEquals returns a condition for checking a JSON field in event values

type KVWatchOpts

type KVWatchOpts struct {
	Timeout time.Duration // Max wait time (default 60s)
	Pattern string        // Key pattern filter (default "*")
}

KVWatchOpts configures KV watching behavior

func DefaultKVWatchOpts

func DefaultKVWatchOpts() KVWatchOpts

DefaultKVWatchOpts returns sensible defaults

type LoggerStats

type LoggerStats struct {
	TotalMessages     int64     `json:"total_messages"`
	ValidMessages     int64     `json:"valid_messages"`
	InvalidMessages   int64     `json:"invalid_messages"`
	StartTime         time.Time `json:"start_time"`
	LastMessageTime   time.Time `json:"last_message_time"`
	UptimeSeconds     float64   `json:"uptime_seconds"`
	MonitoredSubjects []string  `json:"monitored_subjects"`
	MaxEntries        int       `json:"max_entries"`
}

LoggerStats represents statistics from the MessageLogger service

type MessageEntry

type MessageEntry struct {
	Sequence    uint64          `json:"sequence"`
	Timestamp   time.Time       `json:"timestamp"`
	Subject     string          `json:"subject"`
	MessageType string          `json:"message_type,omitempty"`
	MessageID   string          `json:"message_id,omitempty"`
	TraceID     string          `json:"trace_id,omitempty"`
	SpanID      string          `json:"span_id,omitempty"`
	Summary     string          `json:"summary"`
	RawData     json.RawMessage `json:"raw_data,omitempty"`
	Metadata    map[string]any  `json:"metadata,omitempty"`
}

MessageEntry represents a logged message from the MessageLogger service

type MessageLoggerClient

type MessageLoggerClient struct {
	// contains filtered or unexported fields
}

MessageLoggerClient provides HTTP client access to the MessageLogger service

func NewMessageLoggerClient

func NewMessageLoggerClient(baseURL string) *MessageLoggerClient

NewMessageLoggerClient creates a new client for MessageLogger HTTP endpoints

func (*MessageLoggerClient) CountMessagesBySubject

func (c *MessageLoggerClient) CountMessagesBySubject(ctx context.Context, pattern string) (int, error)

CountMessagesBySubject returns count of messages matching subject pattern

func (*MessageLoggerClient) GetEntries

func (c *MessageLoggerClient) GetEntries(ctx context.Context, limit int, subjectPattern string) ([]MessageEntry, error)

GetEntries fetches logged messages with optional subject filter

func (*MessageLoggerClient) GetEntriesByTrace

func (c *MessageLoggerClient) GetEntriesByTrace(ctx context.Context, traceID string) (*TraceResponse, error)

GetEntriesByTrace fetches all entries for a specific W3C trace ID

func (*MessageLoggerClient) GetStats

func (c *MessageLoggerClient) GetStats(ctx context.Context) (*LoggerStats, error)

GetStats returns message logger statistics

func (*MessageLoggerClient) GetSubjects

func (c *MessageLoggerClient) GetSubjects(ctx context.Context) ([]string, error)

GetSubjects returns list of monitored NATS subjects

func (*MessageLoggerClient) Health

func (c *MessageLoggerClient) Health(ctx context.Context) error

Health checks if the message logger endpoint is reachable

func (*MessageLoggerClient) QueryKV

func (c *MessageLoggerClient) QueryKV(ctx context.Context, bucket, pattern string, limit int) (*KVQueryResult, error)

QueryKV queries a NATS KV bucket

func (*MessageLoggerClient) TraceMessage

func (c *MessageLoggerClient) TraceMessage(ctx context.Context, messageID string) (*MessageTrace, error)

TraceMessage finds all log entries related to a specific message ID

func (*MessageLoggerClient) WaitForMessages

func (c *MessageLoggerClient) WaitForMessages(ctx context.Context, pattern string, count int, timeout time.Duration) ([]MessageEntry, error)

WaitForMessages waits until N messages matching pattern are logged

type MessageTrace

type MessageTrace struct {
	MessageID string         // Original message ID
	Entries   []MessageEntry // All log entries related to this message
	Flow      []string       // Subject flow path: ["input.udp", "process.rule", "process.graph"]
	Duration  time.Duration  // Time from first to last entry
}

MessageTrace tracks a single message through the processing pipeline

type Metric

type Metric struct {
	Name   string            `json:"name"`
	Labels map[string]string `json:"labels,omitempty"`
	Value  float64           `json:"value"`
}

Metric represents a single parsed Prometheus metric

type MetricsBaseline

type MetricsBaseline struct {
	Timestamp time.Time          `json:"timestamp"`
	Metrics   map[string]float64 `json:"metrics"`
}

MetricsBaseline captures metrics at a point in time for comparison

type MetricsClient

type MetricsClient struct {
	// contains filtered or unexported fields
}

MetricsClient fetches and parses Prometheus metrics from SemStreams

func NewMetricsClient

func NewMetricsClient(baseURL string) *MetricsClient

NewMetricsClient creates a new client for Prometheus metrics endpoints

func (*MetricsClient) CaptureBaseline

func (c *MetricsClient) CaptureBaseline(ctx context.Context) (*MetricsBaseline, error)

CaptureBaseline takes a snapshot of all counter metrics for later comparison

func (*MetricsClient) CompareToBaseline

func (c *MetricsClient) CompareToBaseline(ctx context.Context, baseline *MetricsBaseline) (*MetricsDiff, error)

CompareToBaseline compares current metrics to a baseline and returns deltas

func (*MetricsClient) ExtractRuleMetrics

func (c *MetricsClient) ExtractRuleMetrics(ctx context.Context) (*RuleMetrics, error)

ExtractRuleMetrics gets all rule engine metrics in a single call. This enables consistent rule validation across E2E scenarios.

func (*MetricsClient) FetchRaw

func (c *MetricsClient) FetchRaw(ctx context.Context) (string, error)

FetchRaw retrieves raw metrics text from the Prometheus endpoint

func (*MetricsClient) FetchReport

func (c *MetricsClient) FetchReport(ctx context.Context, duration time.Duration) (*MetricsReport, error)

FetchReport retrieves metrics and organizes them into a categorized report

func (*MetricsClient) FetchSnapshot

func (c *MetricsClient) FetchSnapshot(ctx context.Context) (*MetricsSnapshot, error)

FetchSnapshot retrieves and parses all metrics into a snapshot

func (*MetricsClient) GetMetricByLabels

func (c *MetricsClient) GetMetricByLabels(ctx context.Context, metricName string, labelFilters map[string]string) ([]Metric, error)

GetMetricByLabels retrieves metrics matching the given name and optional label filters

func (*MetricsClient) GetMetricValue

func (c *MetricsClient) GetMetricValue(ctx context.Context, metricName string) (float64, error)

GetMetricValue retrieves a specific metric value by name

func (*MetricsClient) GetMetricsByPrefix

func (c *MetricsClient) GetMetricsByPrefix(ctx context.Context, prefix string) (map[string]float64, error)

GetMetricsByPrefix retrieves all metrics matching a prefix

func (*MetricsClient) Health

func (c *MetricsClient) Health(ctx context.Context) error

Health checks if the metrics endpoint is reachable

func (*MetricsClient) SumMetricsByName

func (c *MetricsClient) SumMetricsByName(ctx context.Context, metricName string) (float64, error)

SumMetricsByName sums all metrics with the given name (across all label combinations)

func (*MetricsClient) WaitForMetric

func (c *MetricsClient) WaitForMetric(ctx context.Context, metricName string, expected float64, opts WaitOpts) error

WaitForMetric polls until a metric reaches the expected value or timeout

func (*MetricsClient) WaitForMetricChange

func (c *MetricsClient) WaitForMetricChange(ctx context.Context, metricName string, baseline float64, opts WaitOpts) error

WaitForMetricChange waits for any change in a metric from its baseline value

func (*MetricsClient) WaitForMetricDelta

func (c *MetricsClient) WaitForMetricDelta(ctx context.Context, metricName string, baseline, delta float64, opts WaitOpts) error

WaitForMetricDelta waits for a metric to increase by at least delta from baseline

type MetricsDiff

type MetricsDiff struct {
	BaselineTime time.Time          `json:"baseline_time"`
	CurrentTime  time.Time          `json:"current_time"`
	Duration     time.Duration      `json:"duration"`
	Deltas       map[string]float64 `json:"deltas"`       // current - baseline
	RatePerSec   map[string]float64 `json:"rate_per_sec"` // delta / duration.Seconds()
}

MetricsDiff represents the difference between two metric snapshots

type MetricsReport

type MetricsReport struct {
	Timestamp    time.Time           `json:"timestamp"`
	Duration     time.Duration       `json:"duration_ns"`
	DurationText string              `json:"duration"`
	Counters     map[string]float64  `json:"counters"`
	Gauges       map[string]float64  `json:"gauges"`
	Histograms   map[string]float64  `json:"histograms,omitempty"`
	Categories   map[string]Category `json:"categories"`
}

MetricsReport is a structured summary of key metrics for comparison

type MetricsSnapshot

type MetricsSnapshot struct {
	Timestamp time.Time         `json:"timestamp"`
	Metrics   map[string]Metric `json:"metrics"`
	Raw       string            `json:"raw,omitempty"`
}

MetricsSnapshot represents a collection of metrics at a point in time

type NATSValidationClient

type NATSValidationClient struct {
	// contains filtered or unexported fields
}

NATSValidationClient wraps natsclient.Client for E2E test validation

func NewNATSValidationClient

func NewNATSValidationClient(ctx context.Context, natsURL string) (*NATSValidationClient, error)

NewNATSValidationClient creates a new NATS validation client

func (*NATSValidationClient) BucketExists

func (c *NATSValidationClient) BucketExists(ctx context.Context, bucketName string) (bool, error)

BucketExists checks if a KV bucket exists

func (*NATSValidationClient) Close

func (c *NATSValidationClient) Close(ctx context.Context) error

Close closes the NATS connection

func (*NATSValidationClient) CountBucketKeys

func (c *NATSValidationClient) CountBucketKeys(ctx context.Context, bucketName string) (int, error)

CountBucketKeys counts the number of keys in a specific KV bucket Returns 0, nil if bucket doesn't exist (graceful degradation)

func (*NATSValidationClient) CountEntities

func (c *NATSValidationClient) CountEntities(ctx context.Context) (int, error)

CountEntities counts the number of entities in the ENTITY_STATES bucket Returns 0, nil if bucket doesn't exist (graceful degradation)

func (*NATSValidationClient) CountOASFRecords

func (c *NATSValidationClient) CountOASFRecords(ctx context.Context) (int, error)

CountOASFRecords counts the number of OASF records in the bucket.

func (*NATSValidationClient) CountSourceEntities

func (c *NATSValidationClient) CountSourceEntities(ctx context.Context) (int, error)

CountSourceEntities counts non-container entities in ENTITY_STATES bucket.

func (*NATSValidationClient) CountVirtualEdges

func (c *NATSValidationClient) CountVirtualEdges(ctx context.Context) (*VirtualEdgeCounts, error)

CountVirtualEdges counts virtual edges (inferred relationships) by querying the PREDICATE_INDEX. Virtual edges use predicates starting with "inferred." prefix.

func (*NATSValidationClient) DeleteKV

func (c *NATSValidationClient) DeleteKV(ctx context.Context, bucket, key string) error

DeleteKV deletes a key from a KV bucket (for test cleanup)

func (*NATSValidationClient) GetAllCommunities

func (c *NATSValidationClient) GetAllCommunities(ctx context.Context) ([]*clustering.Community, error)

GetAllCommunities retrieves all communities from the COMMUNITY_INDEX bucket Used for comparing statistical vs LLM-enhanced summaries in E2E tests

func (*NATSValidationClient) GetAllComponentStatuses

func (c *NATSValidationClient) GetAllComponentStatuses(ctx context.Context) (map[string]*ComponentStatus, error)

GetAllComponentStatuses retrieves status of all components in COMPONENT_STATUS bucket.

func (*NATSValidationClient) GetAllContexts

func (c *NATSValidationClient) GetAllContexts(ctx context.Context) ([]string, error)

GetAllContexts lists all context values in the CONTEXT_INDEX bucket. Phase 6: Added for provenance audit scenario - demonstrates querying all inference contexts.

func (*NATSValidationClient) GetAllEntityIDs

func (c *NATSValidationClient) GetAllEntityIDs(ctx context.Context) ([]string, error)

GetAllEntityIDs returns all entity IDs from ENTITY_STATES bucket. Used for hierarchy inference validation in E2E tests (Phase 4).

func (*NATSValidationClient) GetAllWorkflowExecutions

func (c *NATSValidationClient) GetAllWorkflowExecutions(ctx context.Context) ([]*WorkflowExecution, error)

GetAllWorkflowExecutions retrieves all workflow executions

func (*NATSValidationClient) GetAnomalies

func (c *NATSValidationClient) GetAnomalies(ctx context.Context) ([]*Anomaly, error)

GetAnomalies retrieves all anomalies from ANOMALY_INDEX bucket

func (*NATSValidationClient) GetAnomalyCounts

func (c *NATSValidationClient) GetAnomalyCounts(ctx context.Context) (*AnomalyCounts, error)

GetAnomalyCounts retrieves counts of anomalies by type and status from ANOMALY_INDEX bucket

func (*NATSValidationClient) GetAutoAppliedAnomalyCount

func (c *NATSValidationClient) GetAutoAppliedAnomalyCount(ctx context.Context) (int, error)

GetAutoAppliedAnomalyCount returns the count of anomalies with status "auto_applied".

func (*NATSValidationClient) GetBucketKeysSample

func (c *NATSValidationClient) GetBucketKeysSample(ctx context.Context, bucketName string, limit int) ([]string, error)

GetBucketKeysSample returns a sample of keys from a bucket (first n keys) Useful for verifying key patterns without loading all data

func (*NATSValidationClient) GetComponentStatus

func (c *NATSValidationClient) GetComponentStatus(ctx context.Context, componentName string) (*ComponentStatus, error)

GetComponentStatus retrieves the current status of a component from COMPONENT_STATUS bucket.

func (*NATSValidationClient) GetContextEntries

func (c *NATSValidationClient) GetContextEntries(ctx context.Context, contextValue string) ([]ContextEntry, error)

GetContextEntries retrieves all entries for a specific context value. Phase 5: Added to verify ContextIndex is populated by hierarchy inference.

func (*NATSValidationClient) GetEntity

func (c *NATSValidationClient) GetEntity(ctx context.Context, entityID string) (*EntityState, error)

GetEntity retrieves an entity by ID from the ENTITY_STATES bucket

func (*NATSValidationClient) GetEntitySample

func (c *NATSValidationClient) GetEntitySample(ctx context.Context, limit int) ([]*EntityState, error)

GetEntitySample returns a sample of entities from ENTITY_STATES bucket Used for entity structure validation in E2E tests

func (*NATSValidationClient) GetIncomingEntries

func (c *NATSValidationClient) GetIncomingEntries(ctx context.Context, targetEntityID string) ([]IncomingEntry, error)

GetIncomingEntries retrieves incoming relationship entries for a target entity. Phase 5: Added to verify IncomingIndex stores predicates (not just entity IDs).

func (*NATSValidationClient) GetOASFRecord

func (c *NATSValidationClient) GetOASFRecord(ctx context.Context, entityID string) (*OASFRecord, error)

GetOASFRecord retrieves an OASF record by entity ID from the OASF_RECORDS bucket.

func (*NATSValidationClient) GetOutgoingEntries

func (c *NATSValidationClient) GetOutgoingEntries(ctx context.Context, sourceEntityID string) ([]OutgoingEntry, error)

GetOutgoingEntries retrieves outgoing relationship entries for a source entity. Phase 6: Added for inverse edges scenario - verifies containers have outgoing 'contains' edges.

func (*NATSValidationClient) GetStructuralIndexInfo

func (c *NATSValidationClient) GetStructuralIndexInfo(ctx context.Context) (*StructuralIndexInfo, error)

GetStructuralIndexInfo retrieves information about structural indices

func (*NATSValidationClient) GetTrajectory

func (c *NATSValidationClient) GetTrajectory(ctx context.Context, loopID string) (*agentic.Trajectory, error)

GetTrajectory retrieves a trajectory from the AGENT_TRAJECTORIES KV bucket.

func (*NATSValidationClient) GetWorkflowExecution

func (c *NATSValidationClient) GetWorkflowExecution(ctx context.Context, execID string) (*WorkflowExecution, error)

GetWorkflowExecution retrieves a workflow execution by ID

func (*NATSValidationClient) GetWorkflowExecutionsByState

func (c *NATSValidationClient) GetWorkflowExecutionsByState(ctx context.Context, state string) ([]*WorkflowExecution, error)

GetWorkflowExecutionsByState returns executions filtered by state

func (*NATSValidationClient) ListBuckets

func (c *NATSValidationClient) ListBuckets(ctx context.Context) ([]string, error)

ListBuckets lists all KV buckets

func (*NATSValidationClient) ListOASFRecordIDs

func (c *NATSValidationClient) ListOASFRecordIDs(ctx context.Context) ([]string, error)

ListOASFRecordIDs returns all OASF record entity IDs.

func (*NATSValidationClient) Publish

func (c *NATSValidationClient) Publish(ctx context.Context, subject string, data []byte) error

Publish publishes a message to a NATS subject via JetStream. Used for injecting test messages into the system.

func (*NATSValidationClient) PutKV

func (c *NATSValidationClient) PutKV(ctx context.Context, bucket, key string, value []byte) error

PutKV writes a key-value entry to a KV bucket. Used for test setup (e.g., registering workflow definitions).

func (*NATSValidationClient) ValidateIndexPopulated

func (c *NATSValidationClient) ValidateIndexPopulated(ctx context.Context, indexName string) (bool, error)

ValidateIndexPopulated checks if an index bucket has entries Returns false, nil if bucket doesn't exist (graceful degradation)

func (*NATSValidationClient) WaitForAnomalyDetection

func (c *NATSValidationClient) WaitForAnomalyDetection(
	ctx context.Context,
	timeout time.Duration,
	pollInterval time.Duration,
) (total int, err error)

WaitForAnomalyDetection waits for anomaly detection to complete by polling until the anomaly count stabilizes or timeout is reached. Returns the final total count and any error encountered.

func (*NATSValidationClient) WaitForCommunityEnhancement

func (c *NATSValidationClient) WaitForCommunityEnhancement(
	ctx context.Context,
	timeout time.Duration,
	pollInterval time.Duration,
) (enhanced, failed, pending int, err error)

WaitForCommunityEnhancement polls communities until all reach terminal status Terminal statuses: "llm-enhanced" or "llm-failed" Returns counts of enhanced, failed, and pending communities

func (*NATSValidationClient) WaitForComponentCycleComplete

func (c *NATSValidationClient) WaitForComponentCycleComplete(
	ctx context.Context,
	componentName string,
	timeout time.Duration,
) (*ComponentStatus, error)

WaitForComponentCycleComplete waits for a component to complete at least one processing cycle. Returns the component status when a cycle completes successfully.

func (*NATSValidationClient) WaitForComponentStage

func (c *NATSValidationClient) WaitForComponentStage(
	ctx context.Context,
	componentName string,
	targetStage string,
	timeout time.Duration,
) (*ComponentStatus, error)

WaitForComponentStage waits for a component to reach a specific stage. Returns the component status when the stage is reached, or nil on timeout.

func (*NATSValidationClient) WaitForContainerGroupsSSE

func (c *NATSValidationClient) WaitForContainerGroupsSSE(
	ctx context.Context,
	expectedCount int,
	timeout time.Duration,
	sseClient *SSEClient,
) (count int, usedSSE bool, err error)

WaitForContainerGroupsSSE waits for container groups (keys ending in ".group") using SSE. Falls back to polling if SSE is unavailable.

func (*NATSValidationClient) WaitForEntityCountSSE

func (c *NATSValidationClient) WaitForEntityCountSSE(
	ctx context.Context,
	expectedCount int,
	timeout time.Duration,
	sseClient *SSEClient,
) EntityStabilizationResult

WaitForEntityCountSSE waits for entity count to reach target and stabilize using SSE. NATS KV watch sends all existing keys first (initial sync), then streams updates. We use UniqueKeyCountReaches to count unique non-deleted keys for accurate counting. Falls back to polling if SSE is unavailable.

func (*NATSValidationClient) WaitForKeySSE

func (c *NATSValidationClient) WaitForKeySSE(
	ctx context.Context,
	bucket, key string,
	timeout time.Duration,
	sseClient *SSEClient,
) (found bool, usedSSE bool, err error)

WaitForKeySSE waits for a specific key to appear in a bucket using SSE streaming. Falls back to polling if SSE is unavailable.

func (*NATSValidationClient) WaitForOASFRecord

func (c *NATSValidationClient) WaitForOASFRecord(
	ctx context.Context,
	entityID string,
	timeout time.Duration,
) (*OASFRecord, error)

WaitForOASFRecord waits for an OASF record to appear for an entity.

func (*NATSValidationClient) WaitForSourceEntityCountSSE

func (c *NATSValidationClient) WaitForSourceEntityCountSSE(
	ctx context.Context,
	expectedCount int,
	timeout time.Duration,
	sseClient *SSEClient,
) EntityStabilizationResult

WaitForSourceEntityCountSSE waits for SOURCE entity count (excluding containers) to reach target. Container entities (ending in .group, .group.container, .group.container.level) are excluded. This is used to wait for testdata to fully load before validation.

func (*NATSValidationClient) WaitForWorkflowCompletion

func (c *NATSValidationClient) WaitForWorkflowCompletion(
	ctx context.Context,
	workflowID string,
	timeout time.Duration,
) (*WorkflowExecution, error)

WaitForWorkflowCompletion waits for a workflow to complete successfully

func (*NATSValidationClient) WaitForWorkflowState

func (c *NATSValidationClient) WaitForWorkflowState(
	ctx context.Context,
	workflowID string,
	targetStates []string,
	timeout time.Duration,
) (*WorkflowExecution, error)

WaitForWorkflowState waits for any workflow to reach a terminal state (completed or failed) Returns the execution that reached terminal state, or nil on timeout

func (*NATSValidationClient) WaitForWorkflowTerminal

func (c *NATSValidationClient) WaitForWorkflowTerminal(
	ctx context.Context,
	workflowID string,
	timeout time.Duration,
) (*WorkflowExecution, error)

WaitForWorkflowTerminal waits for a workflow to reach any terminal state

type OASFDomain

type OASFDomain struct {
	Name        string `json:"name"`
	Description string `json:"description,omitempty"`
}

OASFDomain represents a domain in an OASF record.

type OASFRecord

type OASFRecord struct {
	Name          string         `json:"name"`
	Version       string         `json:"version"`
	SchemaVersion string         `json:"schema_version"`
	Authors       []string       `json:"authors"`
	CreatedAt     string         `json:"created_at"`
	Description   string         `json:"description"`
	Skills        []OASFSkill    `json:"skills"`
	Domains       []OASFDomain   `json:"domains,omitempty"`
	Extensions    map[string]any `json:"extensions,omitempty"`
}

OASFRecord represents an OASF (Open Agent Specification Framework) record for E2E testing validation.

type OASFSkill

type OASFSkill struct {
	ID          string   `json:"id"`
	Name        string   `json:"name"`
	Description string   `json:"description,omitempty"`
	Confidence  float64  `json:"confidence,omitempty"`
	Permissions []string `json:"permissions,omitempty"`
}

OASFSkill represents a skill in an OASF record.

type ObservabilityClient

type ObservabilityClient struct {
	// contains filtered or unexported fields
}

ObservabilityClient interacts with SemStreams component management endpoints

func NewObservabilityClient

func NewObservabilityClient(baseURL string) *ObservabilityClient

NewObservabilityClient creates a new client for SemStreams observability endpoints

func (*ObservabilityClient) CheckFlowHealth

func (c *ObservabilityClient) CheckFlowHealth(ctx context.Context) error

CheckFlowHealth performs flow validation and returns an error if there are critical issues. This is a convenience method for pre-flight checks in e2e test setup.

func (*ObservabilityClient) CountFileOutputLines

func (c *ObservabilityClient) CountFileOutputLines(
	ctx context.Context,
	containerName string,
	pattern string,
) (int, error)

CountFileOutputLines counts lines in file output inside a container using docker exec. The containerName should match the container running the file output component. The pattern is the file glob pattern (e.g., "/tmp/streamkit-test*.jsonl"). Returns 0 if files don't exist (not an error - just means no output yet).

func (*ObservabilityClient) GetComponents

func (c *ObservabilityClient) GetComponents(ctx context.Context) ([]ComponentInfo, error)

GetComponents retrieves information about all managed components

func (*ObservabilityClient) GetFileOutputLines

func (c *ObservabilityClient) GetFileOutputLines(
	ctx context.Context,
	containerName string,
	pattern string,
	maxLines int,
) ([]string, error)

GetFileOutputLines retrieves the actual content lines from file output inside a container. Returns the lines as a slice of strings for content validation.

func (*ObservabilityClient) GetFlows

func (c *ObservabilityClient) GetFlows(ctx context.Context) ([]FlowInfo, error)

GetFlows retrieves all flows from the flowbuilder API

func (*ObservabilityClient) GetPlatformHealth

func (c *ObservabilityClient) GetPlatformHealth(ctx context.Context) (*PlatformHealth, error)

GetPlatformHealth retrieves overall platform health

func (*ObservabilityClient) ValidateFlowGraph

func (c *ObservabilityClient) ValidateFlowGraph(ctx context.Context) (*FlowValidation, error)

ValidateFlowGraph calls /components/validate and returns the flow validation result. This performs pre-flight validation to catch configuration issues before running tests.

func (*ObservabilityClient) WaitForAllComponentsHealthy

func (c *ObservabilityClient) WaitForAllComponentsHealthy(ctx context.Context, timeout time.Duration) error

WaitForAllComponentsHealthy waits until all components report healthy status.

func (*ObservabilityClient) WaitForComponentHealthy

func (c *ObservabilityClient) WaitForComponentHealthy(ctx context.Context, name string, timeout time.Duration) error

WaitForComponentHealthy waits until a specific component reports healthy status. This is useful after Docker compose --wait passes (which only checks /health endpoint) but before individual components like graph processor have finished initialization.

type OrphanedPort

type OrphanedPort struct {
	ComponentName string `json:"component_name"`
	PortName      string `json:"port_name"`
	Direction     string `json:"direction"`
	ConnectionID  string `json:"connection_id"`
	Pattern       string `json:"pattern"`
	Issue         string `json:"issue"`
	Required      bool   `json:"required"`
}

OrphanedPort represents a port with no connections

type OutgoingEntry

type OutgoingEntry struct {
	Predicate  string `json:"predicate"`
	ToEntityID string `json:"to_entity_id"`
}

OutgoingEntry matches the indexmanager.OutgoingEntry structure. Phase 6: Added to verify inverse edges are materialized in container's outgoing relationships.

type PivotMetadata

type PivotMetadata struct {
	Pivots      []string `json:"pivots"`
	EntityCount int      `json:"entity_count"`
	ComputedAt  string   `json:"computed_at"`
}

PivotMetadata contains pivot distance index metadata

type PlatformHealth

type PlatformHealth struct {
	Healthy bool   `json:"healthy"`
	Status  string `json:"status"`
	Message string `json:"message,omitempty"`
}

PlatformHealth represents overall platform health status

type ProfileClient

type ProfileClient struct {
	// contains filtered or unexported fields
}

ProfileClient captures pprof profiles from a running SemStreams instance. Profiles are saved to disk for analysis with `go tool pprof`.

func NewProfileClient

func NewProfileClient(baseURL, outputDir string) *ProfileClient

NewProfileClient creates a new client for pprof endpoints. baseURL should be the pprof server address (e.g., "http://localhost:6060"). outputDir is where profile files will be saved.

func (*ProfileClient) CaptureAll

func (c *ProfileClient) CaptureAll(ctx context.Context, prefix string) (map[string]string, error)

CaptureAll captures heap, goroutine, and allocs profiles with a common prefix. Useful for getting a baseline or final snapshot.

func (*ProfileClient) CaptureAllocs

func (c *ProfileClient) CaptureAllocs(ctx context.Context, name string) (string, error)

CaptureAllocs saves a memory allocation profile. Returns the path to the saved profile file.

func (*ProfileClient) CaptureBlock

func (c *ProfileClient) CaptureBlock(ctx context.Context, name string) (string, error)

CaptureBlock saves a goroutine blocking profile. Returns the path to the saved profile file.

func (*ProfileClient) CaptureBlockAndMutex

func (c *ProfileClient) CaptureBlockAndMutex(ctx context.Context, prefix string) (map[string]string, error)

CaptureBlockAndMutex captures blocking and mutex contention profiles with a common prefix. Useful for profiling lock contention under load.

func (*ProfileClient) CaptureCPU

func (c *ProfileClient) CaptureCPU(ctx context.Context, name string, seconds int) (string, error)

CaptureCPU captures a CPU profile for the specified duration. Returns the path to the saved profile file.

func (*ProfileClient) CaptureGoroutine

func (c *ProfileClient) CaptureGoroutine(ctx context.Context, name string) (string, error)

CaptureGoroutine saves goroutine stack traces. Returns the path to the saved profile file.

func (*ProfileClient) CaptureHeap

func (c *ProfileClient) CaptureHeap(ctx context.Context, name string) (string, error)

CaptureHeap saves a heap profile snapshot. Returns the path to the saved profile file.

func (*ProfileClient) CaptureMutex

func (c *ProfileClient) CaptureMutex(ctx context.Context, name string) (string, error)

CaptureMutex saves a mutex contention profile. Returns the path to the saved profile file.

func (*ProfileClient) CaptureTrace

func (c *ProfileClient) CaptureTrace(ctx context.Context, name string, seconds int) (string, error)

CaptureTrace captures an execution trace for the specified duration. Returns the path to the saved trace file.

func (*ProfileClient) IsAvailable

func (c *ProfileClient) IsAvailable(ctx context.Context) bool

IsAvailable checks if the pprof endpoint is accessible.

type RuleMetrics

type RuleMetrics struct {
	// Rule evaluation metrics
	Evaluations float64 `json:"evaluations"` // Total rule evaluations
	Firings     float64 `json:"firings"`     // Rules that fired (conditions met)

	// Action metrics
	ActionsDispatched float64 `json:"actions_dispatched"` // Total actions dispatched

	// Execution lifecycle metrics (legacy — kept for backwards compat, always 0)
	ExecutionsCreated   float64 `json:"executions_created"`
	ExecutionsCompleted float64 `json:"executions_completed"`
	ExecutionsFailed    float64 `json:"executions_failed"`

	// Callback metrics (legacy — kept for backwards compat, always 0)
	CallbacksReceived float64 `json:"callbacks_received"`
}

RuleMetrics holds rule engine metrics for E2E tests. These map to the semstreams_rule_* Prometheus metrics from the rule processor.

type SSEClient

type SSEClient struct {
	// contains filtered or unexported fields
}

SSEClient handles Server-Sent Events connections for KV bucket watching

func NewSSEClient

func NewSSEClient(baseURL string) *SSEClient

NewSSEClient creates a new SSE client

func (*SSEClient) Health

func (c *SSEClient) Health(ctx context.Context) error

Health checks if SSE endpoint is available

func (*SSEClient) WatchKVBucket

func (c *SSEClient) WatchKVBucket(
	ctx context.Context,
	bucket string,
	condition KVWatchCondition,
	opts KVWatchOpts,
) ([]KVChangeEvent, error)

WatchKVBucket streams KV changes until the condition is satisfied or timeout

type SSEEvent

type SSEEvent struct {
	Event string          // Event type (e.g., "connected", "kv_change", "error")
	ID    string          // Event ID for reconnection
	Data  json.RawMessage // Event payload
}

SSEEvent represents a parsed SSE event

type StageMetrics

type StageMetrics struct {
	Stage        string             `json:"stage"`
	MessagesIn   int                `json:"messages_in"`
	MessagesOut  int                `json:"messages_out"`
	Errors       int                `json:"errors"`
	AvgLatency   time.Duration      `json:"avg_latency"`
	MetricDeltas map[string]float64 `json:"metric_deltas,omitempty"`
}

StageMetrics tracks metrics for a specific processing stage

type StatusStreamEnvelope

type StatusStreamEnvelope struct {
	Type      string          `json:"type"`
	ID        string          `json:"id"`
	Timestamp int64           `json:"timestamp"`
	FlowID    string          `json:"flow_id"`
	Payload   json.RawMessage `json:"payload,omitempty"`
}

StatusStreamEnvelope matches service.StatusStreamEnvelope

type StepResult

type StepResult struct {
	StepName  string          `json:"step_name"`
	Status    string          `json:"status"`
	Output    json.RawMessage `json:"output,omitempty"`
	Error     string          `json:"error,omitempty"`
	Iteration int             `json:"iteration"`
}

StepResult represents a workflow step result

type StreamWarning

type StreamWarning struct {
	Severity       string   `json:"severity"`
	SubscriberComp string   `json:"subscriber_component"`
	SubscriberPort string   `json:"subscriber_port"`
	Subjects       []string `json:"subjects"`
	PublisherComps []string `json:"publisher_components"`
	Issue          string   `json:"issue"`
}

StreamWarning represents a JetStream subscriber connected to NATS publisher issue

type StructuralIndexInfo

type StructuralIndexInfo struct {
	BucketExists bool           `json:"bucket_exists"`
	KeyCount     int            `json:"key_count"`
	KCore        *KCoreMetadata `json:"kcore,omitempty"`
	Pivot        *PivotMetadata `json:"pivot,omitempty"`
	SampleKeys   []string       `json:"sample_keys,omitempty"`
}

StructuralIndexInfo contains information about structural indices

type SubscribeAck

type SubscribeAck struct {
	Type       string   `json:"type"`       // Always "subscribe_ack"
	Subscribed []string `json:"subscribed"` // Message types now subscribed
	LogLevel   string   `json:"log_level"`  // Current log level filter
	Sources    []string `json:"sources"`    // Current source filters
}

SubscribeAck matches service.SubscribeAck (Server → Client acknowledgment)

type SubscribeCommand

type SubscribeCommand struct {
	Command      string   `json:"command"`
	MessageTypes []string `json:"message_types,omitempty"`
	LogLevel     string   `json:"log_level,omitempty"`
	Sources      []string `json:"sources,omitempty"`
}

SubscribeCommand matches service.SubscribeCommand

type TraceResponse

type TraceResponse struct {
	TraceID string         `json:"trace_id"`
	Count   int            `json:"count"`
	Entries []MessageEntry `json:"entries"`
}

TraceResponse represents the response from the trace query endpoint

type Triple

type Triple struct {
	Subject   string `json:"subject"`
	Predicate string `json:"predicate"`
	Object    any    `json:"object"`
}

Triple represents a semantic triple (subject, predicate, object)

type VirtualEdgeCounts

type VirtualEdgeCounts struct {
	Total       int            // Total virtual edges found
	ByBand      map[string]int // Counts by similarity band (high, medium, related)
	AutoApplied int            // Edges that were auto-applied
}

VirtualEdgeCounts holds counts of virtual edges by predicate and status.

type WaitOpts

type WaitOpts struct {
	Timeout      time.Duration // Max wait time (default 30s)
	PollInterval time.Duration // Check frequency (default 100ms)
	Comparator   string        // ">=", "==", ">", "<", "<=" (default ">=")
}

WaitOpts configures metric waiting behavior

func DefaultWaitOpts

func DefaultWaitOpts() WaitOpts

DefaultWaitOpts returns sensible defaults for metric waiting

type WatchStatusStreamCondition

type WatchStatusStreamCondition func(envelopes []StatusStreamEnvelope) (satisfied bool, err error)

WatchStatusStreamCondition evaluates received envelopes

func CombineWSConditionsAnd

func CombineWSConditionsAnd(conditions ...WatchStatusStreamCondition) WatchStatusStreamCondition

CombineWSConditionsAnd returns condition satisfied when all conditions are satisfied

func CombineWSConditionsOr

func CombineWSConditionsOr(conditions ...WatchStatusStreamCondition) WatchStatusStreamCondition

CombineWSConditionsOr returns condition satisfied when any condition is satisfied

func EnvelopeCountReaches

func EnvelopeCountReaches(target int) WatchStatusStreamCondition

EnvelopeCountReaches returns condition when total envelope count >= target

func HasAllMessageTypes

func HasAllMessageTypes(types []string) WatchStatusStreamCondition

HasAllMessageTypes returns condition satisfied when all types appear

func HasMessageType

func HasMessageType(msgType string) WatchStatusStreamCondition

HasMessageType returns condition satisfied when message type appears

func LogMessageContains

func LogMessageContains(substr string) WatchStatusStreamCondition

LogMessageContains returns condition when log contains substring

func MessageTypeCountReaches

func MessageTypeCountReaches(msgType string, target int) WatchStatusStreamCondition

MessageTypeCountReaches returns condition when count of type >= target

type WatchStatusStreamOpts

type WatchStatusStreamOpts struct {
	Timeout       time.Duration
	MessageTypes  []string      // Filter: flow_status, component_health, component_metrics, log_entry
	LogLevel      string        // Minimum: DEBUG, INFO, WARN, ERROR
	DrainDuration time.Duration // Continue collecting after condition met to capture burst
}

WatchStatusStreamOpts configures status stream watching

func DefaultWatchStatusStreamOpts

func DefaultWatchStatusStreamOpts() WatchStatusStreamOpts

DefaultWatchStatusStreamOpts returns sensible defaults

type WebSocketClient

type WebSocketClient struct {
	// contains filtered or unexported fields
}

WebSocketClient handles WebSocket connections for status streaming

func NewWebSocketClient

func NewWebSocketClient(baseURL string) *WebSocketClient

NewWebSocketClient creates a new WebSocket client

func (*WebSocketClient) Health

func (c *WebSocketClient) Health(ctx context.Context, flowID string) error

Health checks if WebSocket endpoint is available by attempting a connection

func (*WebSocketClient) WatchStatusStream

func (c *WebSocketClient) WatchStatusStream(
	ctx context.Context,
	flowID string,
	condition WatchStatusStreamCondition,
	opts WatchStatusStreamOpts,
) ([]StatusStreamEnvelope, error)

WatchStatusStream connects to WebSocket and collects envelopes until condition is met

type WorkflowExecution

type WorkflowExecution struct {
	ID           string                 `json:"id"`
	WorkflowID   string                 `json:"workflow_id"`
	WorkflowName string                 `json:"workflow_name"`
	State        string                 `json:"state"`
	CurrentStep  int                    `json:"current_step"`
	CurrentName  string                 `json:"current_name"`
	Iteration    int                    `json:"iteration"`
	StepResults  map[string]StepResult  `json:"step_results,omitempty"`
	Error        string                 `json:"error,omitempty"`
	Trigger      map[string]interface{} `json:"trigger,omitempty"`
	CreatedAt    string                 `json:"created_at"`
	UpdatedAt    string                 `json:"updated_at"`
}

WorkflowExecution represents a workflow execution state from KV

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL