Documentation
¶
Overview ¶
Package client provides test utilities for SemStreams E2E tests
Package client provides HTTP clients for SemStreams E2E tests ¶
Package client provides HTTP clients for SemStreams E2E tests ¶
Package client provides test utilities for SemStreams E2E tests ¶
Package client provides HTTP clients for SemStreams E2E tests ¶
Package client provides HTTP clients for SemStreams E2E tests ¶
Package client provides HTTP clients for SemStreams E2E tests ¶
Package client provides HTTP clients for SemStreams E2E tests ¶
Package client provides HTTP and WebSocket clients for SemStreams E2E tests
Index ¶
- Constants
- Variables
- func CountMessageTypes(envelopes []StatusStreamEnvelope) map[string]int
- func CountSourceEntities(events []KVChangeEvent) int
- func CountUniqueKeys(events []KVChangeEvent) int
- type A2AAgentCard
- type A2AClient
- func (c *A2AClient) GetAgentCard(ctx context.Context) (*A2AAgentCard, error)
- func (c *A2AClient) GetTask(ctx context.Context, taskID string) (*A2ATask, error)
- func (c *A2AClient) Health(ctx context.Context) error
- func (c *A2AClient) SubmitTask(ctx context.Context, taskID, prompt string) (*A2ATask, error)
- type A2AMessage
- type A2ATask
- type AGNTCYMockClient
- func (c *AGNTCYMockClient) Health(ctx context.Context) error
- func (c *AGNTCYMockClient) ListRegistrations(ctx context.Context) ([]DirectoryRegistration, error)
- func (c *AGNTCYMockClient) WaitForRegistration(ctx context.Context, agentIDSubstring string, timeout time.Duration) (*DirectoryRegistration, error)
- type Anomaly
- type AnomalyCounts
- type Category
- type ComponentInfo
- type ComponentStatus
- type ContextEntry
- type DirectoryRegistration
- type DisconnectedNode
- type EntityStabilizationResult
- type EntityState
- type FlowExpectation
- type FlowInfo
- type FlowResult
- type FlowSnapshot
- type FlowTracer
- func (t *FlowTracer) CaptureFlowSnapshot(ctx context.Context) (*FlowSnapshot, error)
- func (t *FlowTracer) ValidateFlow(ctx context.Context, preSnapshot *FlowSnapshot, expected FlowExpectation) (*FlowResult, error)
- func (t *FlowTracer) WaitForFlowCompletion(ctx context.Context, expected FlowExpectation) error
- type FlowValidation
- type FlowValidationSummary
- type FlowsResponse
- type IncomingEntry
- type KCoreMetadata
- type KVChangeEvent
- type KVEntry
- type KVQueryResult
- type KVWatchCondition
- func AllMatch(predicate func(KVChangeEvent) bool, minCount int) KVWatchCondition
- func AnyMatch(predicate func(KVChangeEvent) bool) KVWatchCondition
- func CombineAnd(conditions ...KVWatchCondition) KVWatchCondition
- func CombineOr(conditions ...KVWatchCondition) KVWatchCondition
- func CountCreatesReaches(target int) KVWatchCondition
- func CountReaches(target int) KVWatchCondition
- func KeyExists(key string) KVWatchCondition
- func KeyPrefixCount(prefix string, target int) KVWatchCondition
- func KeySuffixCount(suffix string, target int) KVWatchCondition
- func KeysExist(keys []string) KVWatchCondition
- func SourceEntityCountReaches(target int) KVWatchCondition
- func UniqueKeyCountReaches(target int) KVWatchCondition
- func ValueFieldEquals(field string, expectedValue interface{}) KVWatchCondition
- type KVWatchOpts
- type LoggerStats
- type MessageEntry
- type MessageLoggerClient
- func (c *MessageLoggerClient) CountMessagesBySubject(ctx context.Context, pattern string) (int, error)
- func (c *MessageLoggerClient) GetEntries(ctx context.Context, limit int, subjectPattern string) ([]MessageEntry, error)
- func (c *MessageLoggerClient) GetEntriesByTrace(ctx context.Context, traceID string) (*TraceResponse, error)
- func (c *MessageLoggerClient) GetStats(ctx context.Context) (*LoggerStats, error)
- func (c *MessageLoggerClient) GetSubjects(ctx context.Context) ([]string, error)
- func (c *MessageLoggerClient) Health(ctx context.Context) error
- func (c *MessageLoggerClient) QueryKV(ctx context.Context, bucket, pattern string, limit int) (*KVQueryResult, error)
- func (c *MessageLoggerClient) TraceMessage(ctx context.Context, messageID string) (*MessageTrace, error)
- func (c *MessageLoggerClient) WaitForMessages(ctx context.Context, pattern string, count int, timeout time.Duration) ([]MessageEntry, error)
- type MessageTrace
- type Metric
- type MetricsBaseline
- type MetricsClient
- func (c *MetricsClient) CaptureBaseline(ctx context.Context) (*MetricsBaseline, error)
- func (c *MetricsClient) CompareToBaseline(ctx context.Context, baseline *MetricsBaseline) (*MetricsDiff, error)
- func (c *MetricsClient) ExtractRuleMetrics(ctx context.Context) (*RuleMetrics, error)
- func (c *MetricsClient) FetchRaw(ctx context.Context) (string, error)
- func (c *MetricsClient) FetchReport(ctx context.Context, duration time.Duration) (*MetricsReport, error)
- func (c *MetricsClient) FetchSnapshot(ctx context.Context) (*MetricsSnapshot, error)
- func (c *MetricsClient) GetMetricByLabels(ctx context.Context, metricName string, labelFilters map[string]string) ([]Metric, error)
- func (c *MetricsClient) GetMetricValue(ctx context.Context, metricName string) (float64, error)
- func (c *MetricsClient) GetMetricsByPrefix(ctx context.Context, prefix string) (map[string]float64, error)
- func (c *MetricsClient) Health(ctx context.Context) error
- func (c *MetricsClient) SumMetricsByName(ctx context.Context, metricName string) (float64, error)
- func (c *MetricsClient) WaitForMetric(ctx context.Context, metricName string, expected float64, opts WaitOpts) error
- func (c *MetricsClient) WaitForMetricChange(ctx context.Context, metricName string, baseline float64, opts WaitOpts) error
- func (c *MetricsClient) WaitForMetricDelta(ctx context.Context, metricName string, baseline, delta float64, opts WaitOpts) error
- type MetricsDiff
- type MetricsReport
- type MetricsSnapshot
- type NATSValidationClient
- func (c *NATSValidationClient) BucketExists(ctx context.Context, bucketName string) (bool, error)
- func (c *NATSValidationClient) Close(ctx context.Context) error
- func (c *NATSValidationClient) CountBucketKeys(ctx context.Context, bucketName string) (int, error)
- func (c *NATSValidationClient) CountEntities(ctx context.Context) (int, error)
- func (c *NATSValidationClient) CountOASFRecords(ctx context.Context) (int, error)
- func (c *NATSValidationClient) CountSourceEntities(ctx context.Context) (int, error)
- func (c *NATSValidationClient) CountVirtualEdges(ctx context.Context) (*VirtualEdgeCounts, error)
- func (c *NATSValidationClient) DeleteKV(ctx context.Context, bucket, key string) error
- func (c *NATSValidationClient) GetAllCommunities(ctx context.Context) ([]*clustering.Community, error)
- func (c *NATSValidationClient) GetAllComponentStatuses(ctx context.Context) (map[string]*ComponentStatus, error)
- func (c *NATSValidationClient) GetAllContexts(ctx context.Context) ([]string, error)
- func (c *NATSValidationClient) GetAllEntityIDs(ctx context.Context) ([]string, error)
- func (c *NATSValidationClient) GetAllWorkflowExecutions(ctx context.Context) ([]*WorkflowExecution, error)
- func (c *NATSValidationClient) GetAnomalies(ctx context.Context) ([]*Anomaly, error)
- func (c *NATSValidationClient) GetAnomalyCounts(ctx context.Context) (*AnomalyCounts, error)
- func (c *NATSValidationClient) GetAutoAppliedAnomalyCount(ctx context.Context) (int, error)
- func (c *NATSValidationClient) GetBucketKeysSample(ctx context.Context, bucketName string, limit int) ([]string, error)
- func (c *NATSValidationClient) GetComponentStatus(ctx context.Context, componentName string) (*ComponentStatus, error)
- func (c *NATSValidationClient) GetContextEntries(ctx context.Context, contextValue string) ([]ContextEntry, error)
- func (c *NATSValidationClient) GetEntity(ctx context.Context, entityID string) (*EntityState, error)
- func (c *NATSValidationClient) GetEntitySample(ctx context.Context, limit int) ([]*EntityState, error)
- func (c *NATSValidationClient) GetIncomingEntries(ctx context.Context, targetEntityID string) ([]IncomingEntry, error)
- func (c *NATSValidationClient) GetOASFRecord(ctx context.Context, entityID string) (*OASFRecord, error)
- func (c *NATSValidationClient) GetOutgoingEntries(ctx context.Context, sourceEntityID string) ([]OutgoingEntry, error)
- func (c *NATSValidationClient) GetStructuralIndexInfo(ctx context.Context) (*StructuralIndexInfo, error)
- func (c *NATSValidationClient) GetTrajectory(ctx context.Context, loopID string) (*agentic.Trajectory, error)
- func (c *NATSValidationClient) GetWorkflowExecution(ctx context.Context, execID string) (*WorkflowExecution, error)
- func (c *NATSValidationClient) GetWorkflowExecutionsByState(ctx context.Context, state string) ([]*WorkflowExecution, error)
- func (c *NATSValidationClient) ListBuckets(ctx context.Context) ([]string, error)
- func (c *NATSValidationClient) ListOASFRecordIDs(ctx context.Context) ([]string, error)
- func (c *NATSValidationClient) Publish(ctx context.Context, subject string, data []byte) error
- func (c *NATSValidationClient) PutKV(ctx context.Context, bucket, key string, value []byte) error
- func (c *NATSValidationClient) ValidateIndexPopulated(ctx context.Context, indexName string) (bool, error)
- func (c *NATSValidationClient) WaitForAnomalyDetection(ctx context.Context, timeout time.Duration, pollInterval time.Duration) (total int, err error)
- func (c *NATSValidationClient) WaitForCommunityEnhancement(ctx context.Context, timeout time.Duration, pollInterval time.Duration) (enhanced, failed, pending int, err error)
- func (c *NATSValidationClient) WaitForComponentCycleComplete(ctx context.Context, componentName string, timeout time.Duration) (*ComponentStatus, error)
- func (c *NATSValidationClient) WaitForComponentStage(ctx context.Context, componentName string, targetStage string, ...) (*ComponentStatus, error)
- func (c *NATSValidationClient) WaitForContainerGroupsSSE(ctx context.Context, expectedCount int, timeout time.Duration, ...) (count int, usedSSE bool, err error)
- func (c *NATSValidationClient) WaitForEntityCountSSE(ctx context.Context, expectedCount int, timeout time.Duration, ...) EntityStabilizationResult
- func (c *NATSValidationClient) WaitForKeySSE(ctx context.Context, bucket, key string, timeout time.Duration, ...) (found bool, usedSSE bool, err error)
- func (c *NATSValidationClient) WaitForOASFRecord(ctx context.Context, entityID string, timeout time.Duration) (*OASFRecord, error)
- func (c *NATSValidationClient) WaitForSourceEntityCountSSE(ctx context.Context, expectedCount int, timeout time.Duration, ...) EntityStabilizationResult
- func (c *NATSValidationClient) WaitForWorkflowCompletion(ctx context.Context, workflowID string, timeout time.Duration) (*WorkflowExecution, error)
- func (c *NATSValidationClient) WaitForWorkflowState(ctx context.Context, workflowID string, targetStates []string, ...) (*WorkflowExecution, error)
- func (c *NATSValidationClient) WaitForWorkflowTerminal(ctx context.Context, workflowID string, timeout time.Duration) (*WorkflowExecution, error)
- type OASFDomain
- type OASFRecord
- type OASFSkill
- type ObservabilityClient
- func (c *ObservabilityClient) CheckFlowHealth(ctx context.Context) error
- func (c *ObservabilityClient) CountFileOutputLines(ctx context.Context, containerName string, pattern string) (int, error)
- func (c *ObservabilityClient) GetComponents(ctx context.Context) ([]ComponentInfo, error)
- func (c *ObservabilityClient) GetFileOutputLines(ctx context.Context, containerName string, pattern string, maxLines int) ([]string, error)
- func (c *ObservabilityClient) GetFlows(ctx context.Context) ([]FlowInfo, error)
- func (c *ObservabilityClient) GetPlatformHealth(ctx context.Context) (*PlatformHealth, error)
- func (c *ObservabilityClient) ValidateFlowGraph(ctx context.Context) (*FlowValidation, error)
- func (c *ObservabilityClient) WaitForAllComponentsHealthy(ctx context.Context, timeout time.Duration) error
- func (c *ObservabilityClient) WaitForComponentHealthy(ctx context.Context, name string, timeout time.Duration) error
- type OrphanedPort
- type OutgoingEntry
- type PivotMetadata
- type PlatformHealth
- type ProfileClient
- func (c *ProfileClient) CaptureAll(ctx context.Context, prefix string) (map[string]string, error)
- func (c *ProfileClient) CaptureAllocs(ctx context.Context, name string) (string, error)
- func (c *ProfileClient) CaptureBlock(ctx context.Context, name string) (string, error)
- func (c *ProfileClient) CaptureBlockAndMutex(ctx context.Context, prefix string) (map[string]string, error)
- func (c *ProfileClient) CaptureCPU(ctx context.Context, name string, seconds int) (string, error)
- func (c *ProfileClient) CaptureGoroutine(ctx context.Context, name string) (string, error)
- func (c *ProfileClient) CaptureHeap(ctx context.Context, name string) (string, error)
- func (c *ProfileClient) CaptureMutex(ctx context.Context, name string) (string, error)
- func (c *ProfileClient) CaptureTrace(ctx context.Context, name string, seconds int) (string, error)
- func (c *ProfileClient) IsAvailable(ctx context.Context) bool
- type RuleMetrics
- type SSEClient
- type SSEEvent
- type StageMetrics
- type StatusStreamEnvelope
- type StepResult
- type StreamWarning
- type StructuralIndexInfo
- type SubscribeAck
- type SubscribeCommand
- type TraceResponse
- type Triple
- type VirtualEdgeCounts
- type WaitOpts
- type WatchStatusStreamCondition
- func CombineWSConditionsAnd(conditions ...WatchStatusStreamCondition) WatchStatusStreamCondition
- func CombineWSConditionsOr(conditions ...WatchStatusStreamCondition) WatchStatusStreamCondition
- func EnvelopeCountReaches(target int) WatchStatusStreamCondition
- func HasAllMessageTypes(types []string) WatchStatusStreamCondition
- func HasMessageType(msgType string) WatchStatusStreamCondition
- func LogMessageContains(substr string) WatchStatusStreamCondition
- func MessageTypeCountReaches(msgType string, target int) WatchStatusStreamCondition
- type WatchStatusStreamOpts
- type WebSocketClient
- type WorkflowExecution
Constants ¶
const BucketComponentStatus = "COMPONENT_STATUS"
BucketComponentStatus is the KV bucket for component lifecycle status
const BucketEntityStates = "ENTITY_STATES"
BucketEntityStates is the KV bucket name for entity states
const BucketOASFRecords = "OASF_RECORDS"
BucketOASFRecords is the KV bucket name for OASF records
const BucketWorkflowDefinitions = "WORKFLOW_DEFINITIONS"
BucketWorkflowDefinitions is the KV bucket for workflow definitions
const BucketWorkflowExecutions = "WORKFLOW_EXECUTIONS"
BucketWorkflowExecutions is the KV bucket for workflow executions
const StructuralIndexBucket = "STRUCTURAL_INDEX"
StructuralIndexBucket is the KV bucket for structural indices
Variables ¶
var IndexBuckets = struct { EntityStates string Predicate string Incoming string Outgoing string Alias string Spatial string Temporal string Embedding string EmbeddingDedp string Community string Structural string Context string }{ EntityStates: "ENTITY_STATES", Predicate: "PREDICATE_INDEX", Incoming: "INCOMING_INDEX", Outgoing: "OUTGOING_INDEX", Alias: "ALIAS_INDEX", Spatial: "SPATIAL_INDEX", Temporal: "TEMPORAL_INDEX", Embedding: "EMBEDDING_INDEX", EmbeddingDedp: "EMBEDDING_DEDUP", Community: "COMMUNITY_INDEX", Structural: "STRUCTURAL_INDEX", Context: "CONTEXT_INDEX", }
IndexBuckets defines the standard index bucket names
Functions ¶
func CountMessageTypes ¶
func CountMessageTypes(envelopes []StatusStreamEnvelope) map[string]int
CountMessageTypes returns a count of each message type in the envelopes
func CountSourceEntities ¶
func CountSourceEntities(events []KVChangeEvent) int
CountSourceEntities returns the count of source entities (non-container) from events. Container entities (ending in .group, .group.container, .group.container.level) are excluded.
func CountUniqueKeys ¶
func CountUniqueKeys(events []KVChangeEvent) int
CountUniqueKeys returns the count of unique non-deleted keys from events. It tracks creates/updates and removes deletes to give an accurate count of keys that currently exist. This is essential for SSE-based counting since NATS KV watch sends all existing keys first before streaming updates.
Types ¶
type A2AAgentCard ¶
type A2AAgentCard struct {
Name string `json:"name"`
Description string `json:"description"`
URL string `json:"url"`
Capabilities struct {
Streaming bool `json:"streaming"`
PushNotifications bool `json:"pushNotifications"`
} `json:"capabilities"`
Skills []struct {
ID string `json:"id"`
Name string `json:"name"`
Description string `json:"description,omitempty"`
} `json:"skills,omitempty"`
}
A2AAgentCard represents an A2A agent card response.
type A2AClient ¶
type A2AClient struct {
// contains filtered or unexported fields
}
A2AClient provides HTTP client for A2A adapter testing.
func NewA2AClient ¶
NewA2AClient creates a new A2A test client.
func (*A2AClient) GetAgentCard ¶
func (c *A2AClient) GetAgentCard(ctx context.Context) (*A2AAgentCard, error)
GetAgentCard retrieves the agent card from the A2A adapter.
type A2AMessage ¶
type A2AMessage struct {
Role string `json:"role"`
Parts []struct {
Text string `json:"text,omitempty"`
} `json:"parts"`
}
A2AMessage represents an A2A message.
type A2ATask ¶
type A2ATask struct {
ID string `json:"id"`
Status struct {
State string `json:"state"`
} `json:"status"`
Messages []A2AMessage `json:"messages,omitempty"`
}
A2ATask represents an A2A task for testing.
type AGNTCYMockClient ¶
type AGNTCYMockClient struct {
// contains filtered or unexported fields
}
AGNTCYMockClient provides HTTP client for testing the AGNTCY mock server.
func NewAGNTCYMockClient ¶
func NewAGNTCYMockClient(baseURL string) *AGNTCYMockClient
NewAGNTCYMockClient creates a new client for the AGNTCY mock server.
func (*AGNTCYMockClient) Health ¶
func (c *AGNTCYMockClient) Health(ctx context.Context) error
Health checks if the AGNTCY mock server is healthy.
func (*AGNTCYMockClient) ListRegistrations ¶
func (c *AGNTCYMockClient) ListRegistrations(ctx context.Context) ([]DirectoryRegistration, error)
ListRegistrations returns all agent registrations from the mock directory.
func (*AGNTCYMockClient) WaitForRegistration ¶
func (c *AGNTCYMockClient) WaitForRegistration( ctx context.Context, agentIDSubstring string, timeout time.Duration, ) (*DirectoryRegistration, error)
WaitForRegistration waits for an agent to be registered in the directory.
type Anomaly ¶
type Anomaly struct {
ID string `json:"id"`
Type string `json:"type"`
EntityA string `json:"entity_a"`
EntityB string `json:"entity_b,omitempty"`
Confidence float64 `json:"confidence"`
Status string `json:"status"`
Evidence map[string]interface{} `json:"evidence,omitempty"`
DetectedAt string `json:"detected_at,omitempty"`
}
Anomaly represents a structural anomaly detected by the inference system
type AnomalyCounts ¶
type AnomalyCounts struct {
ByType map[string]int `json:"by_type"`
ByStatus map[string]int `json:"by_status"`
Total int `json:"total"`
}
AnomalyCounts holds counts of anomalies by type and status
type ComponentInfo ¶
type ComponentInfo struct {
Name string `json:"name"`
Component string `json:"component"` // Component factory name (e.g., "udp", "graph-processor")
Type string `json:"type"` // Component category (input/processor/output/storage/gateway)
Enabled bool `json:"enabled"`
State string `json:"state"`
Healthy bool `json:"healthy"`
LastError string `json:"last_error,omitempty"`
}
ComponentInfo represents a single component's information Matches SemStreams /components/list API response format
type ComponentStatus ¶
type ComponentStatus struct {
Component string `json:"component"`
Stage string `json:"stage"`
CycleID string `json:"cycle_id,omitempty"`
CycleStartedAt string `json:"cycle_started_at,omitempty"`
StageStartedAt string `json:"stage_started_at"`
LastCompletedAt string `json:"last_completed_at,omitempty"`
LastResult string `json:"last_result,omitempty"` // "success" or "error"
LastError string `json:"last_error,omitempty"`
}
ComponentStatus represents the current processing state of a component. Matches component.Status from the core package.
type ContextEntry ¶
ContextEntry matches the indexmanager.ContextEntry structure. Phase 5: Added to verify ContextIndex stores entity+predicate pairs.
type DirectoryRegistration ¶
type DirectoryRegistration struct {
AgentID string `json:"agent_id"`
OASFRecord map[string]any `json:"oasf_record"`
RegisteredAt string `json:"registered_at"`
LastHeartbeat string `json:"last_heartbeat"`
TTL string `json:"ttl"`
}
DirectoryRegistration represents an agent registration in the mock directory.
type DisconnectedNode ¶
type DisconnectedNode struct {
ComponentName string `json:"component_name"`
Issue string `json:"issue"`
Suggestions []string `json:"suggestions,omitempty"`
}
DisconnectedNode represents a component with no connections
type EntityStabilizationResult ¶
type EntityStabilizationResult struct {
FinalCount int
WaitDuration time.Duration
Stabilized bool
TimedOut bool
UsedSSE bool
}
EntityStabilizationResult contains the result of waiting for entity count to stabilize.
type EntityState ¶
type EntityState struct {
ID string `json:"id"`
Type string `json:"type"`
Properties map[string]any `json:"properties"`
Triples []Triple `json:"triples,omitempty"`
Version int `json:"version"`
UpdatedAt string `json:"updated_at,omitempty"`
}
EntityState represents an entity stored in NATS KV
type FlowExpectation ¶
type FlowExpectation struct {
InputSubject string // Expected input subject pattern (e.g., "input.udp")
ProcessingStages []string // Expected processing stages (e.g., ["process.rule", "process.graph"])
MinMessages int // Minimum number of messages expected
MaxLatencyMs int // Maximum acceptable p99 latency in ms
Timeout time.Duration // How long to wait for flow completion
}
FlowExpectation defines what we expect to see in a flow
type FlowInfo ¶
type FlowInfo struct {
ID string `json:"id"`
Name string `json:"name"`
RuntimeState string `json:"runtime_state"`
}
FlowInfo represents a flow from the flowbuilder API
type FlowResult ¶
type FlowResult struct {
Valid bool `json:"valid"`
Messages int `json:"messages"`
AvgLatency time.Duration `json:"avg_latency"`
P99Latency time.Duration `json:"p99_latency,omitempty"`
StageMetrics map[string]StageMetrics `json:"stage_metrics,omitempty"`
Errors []string `json:"errors,omitempty"`
Warnings []string `json:"warnings,omitempty"`
}
FlowResult contains the results of flow validation
type FlowSnapshot ¶
type FlowSnapshot struct {
Timestamp time.Time `json:"timestamp"`
MetricsBaseline *MetricsBaseline `json:"metrics_baseline"`
MessageCount int `json:"message_count"`
LastMessageID string `json:"last_message_id,omitempty"`
}
FlowSnapshot captures the state of metrics and messages at a point in time
type FlowTracer ¶
type FlowTracer struct {
// contains filtered or unexported fields
}
FlowTracer correlates metrics and message logs to trace data flow through the system
func NewFlowTracer ¶
func NewFlowTracer(metrics *MetricsClient, logger *MessageLoggerClient) *FlowTracer
NewFlowTracer creates a new flow tracer with metrics and message logger clients
func (*FlowTracer) CaptureFlowSnapshot ¶
func (t *FlowTracer) CaptureFlowSnapshot(ctx context.Context) (*FlowSnapshot, error)
CaptureFlowSnapshot captures the current state of metrics and messages
func (*FlowTracer) ValidateFlow ¶
func (t *FlowTracer) ValidateFlow(ctx context.Context, preSnapshot *FlowSnapshot, expected FlowExpectation) (*FlowResult, error)
ValidateFlow validates that data flowed through expected stages
func (*FlowTracer) WaitForFlowCompletion ¶
func (t *FlowTracer) WaitForFlowCompletion(ctx context.Context, expected FlowExpectation) error
WaitForFlowCompletion waits for a complete data flow from input to output
type FlowValidation ¶
type FlowValidation struct {
Timestamp string `json:"timestamp"`
ValidationStatus string `json:"validation_status"`
ConnectedComponents [][]string `json:"connected_components"`
ConnectedEdges []map[string]interface{} `json:"connected_edges"`
DisconnectedNodes []DisconnectedNode `json:"disconnected_nodes"`
OrphanedPorts []OrphanedPort `json:"orphaned_ports"`
StreamWarnings []StreamWarning `json:"stream_warnings"`
Summary FlowValidationSummary `json:"summary"`
}
FlowValidation represents the result of flowgraph validation from /components/validate
type FlowValidationSummary ¶
type FlowValidationSummary struct {
TotalComponents int `json:"total_components"`
TotalConnections int `json:"total_connections"`
ComponentGroups int `json:"component_groups"`
OrphanedPortCount int `json:"orphaned_port_count"`
DisconnectedNodeCount int `json:"disconnected_node_count"`
StreamWarningCount int `json:"stream_warning_count"`
HasStreamIssues bool `json:"has_stream_issues"`
}
FlowValidationSummary contains summary statistics from flow validation
type FlowsResponse ¶
type FlowsResponse struct {
Flows []FlowInfo `json:"flows"`
}
FlowsResponse represents the response from GET /flowbuilder/flows
type IncomingEntry ¶
type IncomingEntry struct {
Predicate string `json:"predicate"`
FromEntityID string `json:"from_entity_id"`
}
IncomingEntry matches the indexmanager.IncomingEntry structure. Phase 5: Added to verify IncomingIndex predicate storage.
type KCoreMetadata ¶
type KCoreMetadata struct {
MaxCore int `json:"max_core"`
EntityCount int `json:"entity_count"`
ComputedAt string `json:"computed_at"`
CoreBuckets map[int]int `json:"core_buckets"` // core number -> count of entities
}
KCoreMetadata contains k-core index metadata
type KVChangeEvent ¶
type KVChangeEvent struct {
Bucket string `json:"bucket"`
Key string `json:"key"`
Operation string `json:"operation"` // "create", "update", "delete"
Value json.RawMessage `json:"value,omitempty"`
Revision uint64 `json:"revision"`
Timestamp time.Time `json:"timestamp"`
}
KVChangeEvent represents a KV bucket change from SSE
type KVEntry ¶
type KVEntry struct {
Key string `json:"key"`
Value any `json:"value"`
Revision uint64 `json:"revision"`
Created time.Time `json:"created"`
}
KVEntry represents a single KV bucket entry
type KVQueryResult ¶
type KVQueryResult struct {
Bucket string `json:"bucket"`
Pattern string `json:"pattern"`
Count int `json:"count"`
Entries []KVEntry `json:"entries"`
}
KVQueryResult represents the result of a KV bucket query
type KVWatchCondition ¶
type KVWatchCondition func(events []KVChangeEvent) (satisfied bool, err error)
KVWatchCondition is a function that evaluates KV events and returns true when satisfied
func AllMatch ¶
func AllMatch(predicate func(KVChangeEvent) bool, minCount int) KVWatchCondition
AllMatch returns a condition satisfied when all collected events match predicate
func AnyMatch ¶
func AnyMatch(predicate func(KVChangeEvent) bool) KVWatchCondition
AnyMatch returns a condition satisfied when any event matches predicate
func CombineAnd ¶
func CombineAnd(conditions ...KVWatchCondition) KVWatchCondition
CombineAnd returns a condition satisfied when all conditions are satisfied
func CombineOr ¶
func CombineOr(conditions ...KVWatchCondition) KVWatchCondition
CombineOr returns a condition satisfied when any condition is satisfied
func CountCreatesReaches ¶
func CountCreatesReaches(target int) KVWatchCondition
CountCreatesReaches returns a condition satisfied when create operations >= target
func CountReaches ¶
func CountReaches(target int) KVWatchCondition
CountReaches returns a condition that is satisfied when event count >= target
func KeyExists ¶
func KeyExists(key string) KVWatchCondition
KeyExists returns a condition satisfied when a specific key appears
func KeyPrefixCount ¶
func KeyPrefixCount(prefix string, target int) KVWatchCondition
KeyPrefixCount returns a condition satisfied when keys with prefix >= target
func KeySuffixCount ¶
func KeySuffixCount(suffix string, target int) KVWatchCondition
KeySuffixCount returns a condition satisfied when keys with suffix >= target
func KeysExist ¶
func KeysExist(keys []string) KVWatchCondition
KeysExist returns a condition satisfied when all specified keys appear
func SourceEntityCountReaches ¶
func SourceEntityCountReaches(target int) KVWatchCondition
SourceEntityCountReaches returns a condition satisfied when source (non-container) entity count >= target. Container entities have suffixes like .group, .group.container, or .group.container.level and are excluded from the count.
func UniqueKeyCountReaches ¶
func UniqueKeyCountReaches(target int) KVWatchCondition
UniqueKeyCountReaches returns a condition satisfied when unique non-deleted keys >= target. This properly counts entities by tracking creates/updates and removing deletes, giving an accurate count of keys that currently exist in the bucket.
func ValueFieldEquals ¶
func ValueFieldEquals(field string, expectedValue interface{}) KVWatchCondition
ValueFieldEquals returns a condition for checking a JSON field in event values
type KVWatchOpts ¶
type KVWatchOpts struct {
Timeout time.Duration // Max wait time (default 60s)
Pattern string // Key pattern filter (default "*")
}
KVWatchOpts configures KV watching behavior
func DefaultKVWatchOpts ¶
func DefaultKVWatchOpts() KVWatchOpts
DefaultKVWatchOpts returns sensible defaults
type LoggerStats ¶
type LoggerStats struct {
TotalMessages int64 `json:"total_messages"`
ValidMessages int64 `json:"valid_messages"`
InvalidMessages int64 `json:"invalid_messages"`
StartTime time.Time `json:"start_time"`
LastMessageTime time.Time `json:"last_message_time"`
UptimeSeconds float64 `json:"uptime_seconds"`
MonitoredSubjects []string `json:"monitored_subjects"`
MaxEntries int `json:"max_entries"`
}
LoggerStats represents statistics from the MessageLogger service
type MessageEntry ¶
type MessageEntry struct {
Sequence uint64 `json:"sequence"`
Timestamp time.Time `json:"timestamp"`
Subject string `json:"subject"`
MessageType string `json:"message_type,omitempty"`
MessageID string `json:"message_id,omitempty"`
TraceID string `json:"trace_id,omitempty"`
SpanID string `json:"span_id,omitempty"`
Summary string `json:"summary"`
RawData json.RawMessage `json:"raw_data,omitempty"`
Metadata map[string]any `json:"metadata,omitempty"`
}
MessageEntry represents a logged message from the MessageLogger service
type MessageLoggerClient ¶
type MessageLoggerClient struct {
// contains filtered or unexported fields
}
MessageLoggerClient provides HTTP client access to the MessageLogger service
func NewMessageLoggerClient ¶
func NewMessageLoggerClient(baseURL string) *MessageLoggerClient
NewMessageLoggerClient creates a new client for MessageLogger HTTP endpoints
func (*MessageLoggerClient) CountMessagesBySubject ¶
func (c *MessageLoggerClient) CountMessagesBySubject(ctx context.Context, pattern string) (int, error)
CountMessagesBySubject returns count of messages matching subject pattern
func (*MessageLoggerClient) GetEntries ¶
func (c *MessageLoggerClient) GetEntries(ctx context.Context, limit int, subjectPattern string) ([]MessageEntry, error)
GetEntries fetches logged messages with optional subject filter
func (*MessageLoggerClient) GetEntriesByTrace ¶
func (c *MessageLoggerClient) GetEntriesByTrace(ctx context.Context, traceID string) (*TraceResponse, error)
GetEntriesByTrace fetches all entries for a specific W3C trace ID
func (*MessageLoggerClient) GetStats ¶
func (c *MessageLoggerClient) GetStats(ctx context.Context) (*LoggerStats, error)
GetStats returns message logger statistics
func (*MessageLoggerClient) GetSubjects ¶
func (c *MessageLoggerClient) GetSubjects(ctx context.Context) ([]string, error)
GetSubjects returns list of monitored NATS subjects
func (*MessageLoggerClient) Health ¶
func (c *MessageLoggerClient) Health(ctx context.Context) error
Health checks if the message logger endpoint is reachable
func (*MessageLoggerClient) QueryKV ¶
func (c *MessageLoggerClient) QueryKV(ctx context.Context, bucket, pattern string, limit int) (*KVQueryResult, error)
QueryKV queries a NATS KV bucket
func (*MessageLoggerClient) TraceMessage ¶
func (c *MessageLoggerClient) TraceMessage(ctx context.Context, messageID string) (*MessageTrace, error)
TraceMessage finds all log entries related to a specific message ID
func (*MessageLoggerClient) WaitForMessages ¶
func (c *MessageLoggerClient) WaitForMessages(ctx context.Context, pattern string, count int, timeout time.Duration) ([]MessageEntry, error)
WaitForMessages waits until N messages matching pattern are logged
type MessageTrace ¶
type MessageTrace struct {
MessageID string // Original message ID
Entries []MessageEntry // All log entries related to this message
Flow []string // Subject flow path: ["input.udp", "process.rule", "process.graph"]
Duration time.Duration // Time from first to last entry
}
MessageTrace tracks a single message through the processing pipeline
type Metric ¶
type Metric struct {
Name string `json:"name"`
Labels map[string]string `json:"labels,omitempty"`
Value float64 `json:"value"`
}
Metric represents a single parsed Prometheus metric
type MetricsBaseline ¶
type MetricsBaseline struct {
Timestamp time.Time `json:"timestamp"`
Metrics map[string]float64 `json:"metrics"`
}
MetricsBaseline captures metrics at a point in time for comparison
type MetricsClient ¶
type MetricsClient struct {
// contains filtered or unexported fields
}
MetricsClient fetches and parses Prometheus metrics from SemStreams
func NewMetricsClient ¶
func NewMetricsClient(baseURL string) *MetricsClient
NewMetricsClient creates a new client for Prometheus metrics endpoints
func (*MetricsClient) CaptureBaseline ¶
func (c *MetricsClient) CaptureBaseline(ctx context.Context) (*MetricsBaseline, error)
CaptureBaseline takes a snapshot of all counter metrics for later comparison
func (*MetricsClient) CompareToBaseline ¶
func (c *MetricsClient) CompareToBaseline(ctx context.Context, baseline *MetricsBaseline) (*MetricsDiff, error)
CompareToBaseline compares current metrics to a baseline and returns deltas
func (*MetricsClient) ExtractRuleMetrics ¶
func (c *MetricsClient) ExtractRuleMetrics(ctx context.Context) (*RuleMetrics, error)
ExtractRuleMetrics gets all rule engine metrics in a single call. This enables consistent rule validation across E2E scenarios.
func (*MetricsClient) FetchRaw ¶
func (c *MetricsClient) FetchRaw(ctx context.Context) (string, error)
FetchRaw retrieves raw metrics text from the Prometheus endpoint
func (*MetricsClient) FetchReport ¶
func (c *MetricsClient) FetchReport(ctx context.Context, duration time.Duration) (*MetricsReport, error)
FetchReport retrieves metrics and organizes them into a categorized report
func (*MetricsClient) FetchSnapshot ¶
func (c *MetricsClient) FetchSnapshot(ctx context.Context) (*MetricsSnapshot, error)
FetchSnapshot retrieves and parses all metrics into a snapshot
func (*MetricsClient) GetMetricByLabels ¶
func (c *MetricsClient) GetMetricByLabels(ctx context.Context, metricName string, labelFilters map[string]string) ([]Metric, error)
GetMetricByLabels retrieves metrics matching the given name and optional label filters
func (*MetricsClient) GetMetricValue ¶
GetMetricValue retrieves a specific metric value by name
func (*MetricsClient) GetMetricsByPrefix ¶
func (c *MetricsClient) GetMetricsByPrefix(ctx context.Context, prefix string) (map[string]float64, error)
GetMetricsByPrefix retrieves all metrics matching a prefix
func (*MetricsClient) Health ¶
func (c *MetricsClient) Health(ctx context.Context) error
Health checks if the metrics endpoint is reachable
func (*MetricsClient) SumMetricsByName ¶
SumMetricsByName sums all metrics with the given name (across all label combinations)
func (*MetricsClient) WaitForMetric ¶
func (c *MetricsClient) WaitForMetric(ctx context.Context, metricName string, expected float64, opts WaitOpts) error
WaitForMetric polls until a metric reaches the expected value or timeout
func (*MetricsClient) WaitForMetricChange ¶
func (c *MetricsClient) WaitForMetricChange(ctx context.Context, metricName string, baseline float64, opts WaitOpts) error
WaitForMetricChange waits for any change in a metric from its baseline value
func (*MetricsClient) WaitForMetricDelta ¶
func (c *MetricsClient) WaitForMetricDelta(ctx context.Context, metricName string, baseline, delta float64, opts WaitOpts) error
WaitForMetricDelta waits for a metric to increase by at least delta from baseline
type MetricsDiff ¶
type MetricsDiff struct {
BaselineTime time.Time `json:"baseline_time"`
CurrentTime time.Time `json:"current_time"`
Duration time.Duration `json:"duration"`
Deltas map[string]float64 `json:"deltas"` // current - baseline
RatePerSec map[string]float64 `json:"rate_per_sec"` // delta / duration.Seconds()
}
MetricsDiff represents the difference between two metric snapshots
type MetricsReport ¶
type MetricsReport struct {
Timestamp time.Time `json:"timestamp"`
Duration time.Duration `json:"duration_ns"`
DurationText string `json:"duration"`
Counters map[string]float64 `json:"counters"`
Gauges map[string]float64 `json:"gauges"`
Histograms map[string]float64 `json:"histograms,omitempty"`
Categories map[string]Category `json:"categories"`
}
MetricsReport is a structured summary of key metrics for comparison
type MetricsSnapshot ¶
type MetricsSnapshot struct {
Timestamp time.Time `json:"timestamp"`
Metrics map[string]Metric `json:"metrics"`
Raw string `json:"raw,omitempty"`
}
MetricsSnapshot represents a collection of metrics at a point in time
type NATSValidationClient ¶
type NATSValidationClient struct {
// contains filtered or unexported fields
}
NATSValidationClient wraps natsclient.Client for E2E test validation
func NewNATSValidationClient ¶
func NewNATSValidationClient(ctx context.Context, natsURL string) (*NATSValidationClient, error)
NewNATSValidationClient creates a new NATS validation client
func (*NATSValidationClient) BucketExists ¶
BucketExists checks if a KV bucket exists
func (*NATSValidationClient) Close ¶
func (c *NATSValidationClient) Close(ctx context.Context) error
Close closes the NATS connection
func (*NATSValidationClient) CountBucketKeys ¶
CountBucketKeys counts the number of keys in a specific KV bucket Returns 0, nil if bucket doesn't exist (graceful degradation)
func (*NATSValidationClient) CountEntities ¶
func (c *NATSValidationClient) CountEntities(ctx context.Context) (int, error)
CountEntities counts the number of entities in the ENTITY_STATES bucket Returns 0, nil if bucket doesn't exist (graceful degradation)
func (*NATSValidationClient) CountOASFRecords ¶
func (c *NATSValidationClient) CountOASFRecords(ctx context.Context) (int, error)
CountOASFRecords counts the number of OASF records in the bucket.
func (*NATSValidationClient) CountSourceEntities ¶
func (c *NATSValidationClient) CountSourceEntities(ctx context.Context) (int, error)
CountSourceEntities counts non-container entities in ENTITY_STATES bucket.
func (*NATSValidationClient) CountVirtualEdges ¶
func (c *NATSValidationClient) CountVirtualEdges(ctx context.Context) (*VirtualEdgeCounts, error)
CountVirtualEdges counts virtual edges (inferred relationships) by querying the PREDICATE_INDEX. Virtual edges use predicates starting with "inferred." prefix.
func (*NATSValidationClient) DeleteKV ¶
func (c *NATSValidationClient) DeleteKV(ctx context.Context, bucket, key string) error
DeleteKV deletes a key from a KV bucket (for test cleanup)
func (*NATSValidationClient) GetAllCommunities ¶
func (c *NATSValidationClient) GetAllCommunities(ctx context.Context) ([]*clustering.Community, error)
GetAllCommunities retrieves all communities from the COMMUNITY_INDEX bucket Used for comparing statistical vs LLM-enhanced summaries in E2E tests
func (*NATSValidationClient) GetAllComponentStatuses ¶
func (c *NATSValidationClient) GetAllComponentStatuses(ctx context.Context) (map[string]*ComponentStatus, error)
GetAllComponentStatuses retrieves status of all components in COMPONENT_STATUS bucket.
func (*NATSValidationClient) GetAllContexts ¶
func (c *NATSValidationClient) GetAllContexts(ctx context.Context) ([]string, error)
GetAllContexts lists all context values in the CONTEXT_INDEX bucket. Phase 6: Added for provenance audit scenario - demonstrates querying all inference contexts.
func (*NATSValidationClient) GetAllEntityIDs ¶
func (c *NATSValidationClient) GetAllEntityIDs(ctx context.Context) ([]string, error)
GetAllEntityIDs returns all entity IDs from ENTITY_STATES bucket. Used for hierarchy inference validation in E2E tests (Phase 4).
func (*NATSValidationClient) GetAllWorkflowExecutions ¶
func (c *NATSValidationClient) GetAllWorkflowExecutions(ctx context.Context) ([]*WorkflowExecution, error)
GetAllWorkflowExecutions retrieves all workflow executions
func (*NATSValidationClient) GetAnomalies ¶
func (c *NATSValidationClient) GetAnomalies(ctx context.Context) ([]*Anomaly, error)
GetAnomalies retrieves all anomalies from ANOMALY_INDEX bucket
func (*NATSValidationClient) GetAnomalyCounts ¶
func (c *NATSValidationClient) GetAnomalyCounts(ctx context.Context) (*AnomalyCounts, error)
GetAnomalyCounts retrieves counts of anomalies by type and status from ANOMALY_INDEX bucket
func (*NATSValidationClient) GetAutoAppliedAnomalyCount ¶
func (c *NATSValidationClient) GetAutoAppliedAnomalyCount(ctx context.Context) (int, error)
GetAutoAppliedAnomalyCount returns the count of anomalies with status "auto_applied".
func (*NATSValidationClient) GetBucketKeysSample ¶
func (c *NATSValidationClient) GetBucketKeysSample(ctx context.Context, bucketName string, limit int) ([]string, error)
GetBucketKeysSample returns a sample of keys from a bucket (first n keys) Useful for verifying key patterns without loading all data
func (*NATSValidationClient) GetComponentStatus ¶
func (c *NATSValidationClient) GetComponentStatus(ctx context.Context, componentName string) (*ComponentStatus, error)
GetComponentStatus retrieves the current status of a component from COMPONENT_STATUS bucket.
func (*NATSValidationClient) GetContextEntries ¶
func (c *NATSValidationClient) GetContextEntries(ctx context.Context, contextValue string) ([]ContextEntry, error)
GetContextEntries retrieves all entries for a specific context value. Phase 5: Added to verify ContextIndex is populated by hierarchy inference.
func (*NATSValidationClient) GetEntity ¶
func (c *NATSValidationClient) GetEntity(ctx context.Context, entityID string) (*EntityState, error)
GetEntity retrieves an entity by ID from the ENTITY_STATES bucket
func (*NATSValidationClient) GetEntitySample ¶
func (c *NATSValidationClient) GetEntitySample(ctx context.Context, limit int) ([]*EntityState, error)
GetEntitySample returns a sample of entities from ENTITY_STATES bucket Used for entity structure validation in E2E tests
func (*NATSValidationClient) GetIncomingEntries ¶
func (c *NATSValidationClient) GetIncomingEntries(ctx context.Context, targetEntityID string) ([]IncomingEntry, error)
GetIncomingEntries retrieves incoming relationship entries for a target entity. Phase 5: Added to verify IncomingIndex stores predicates (not just entity IDs).
func (*NATSValidationClient) GetOASFRecord ¶
func (c *NATSValidationClient) GetOASFRecord(ctx context.Context, entityID string) (*OASFRecord, error)
GetOASFRecord retrieves an OASF record by entity ID from the OASF_RECORDS bucket.
func (*NATSValidationClient) GetOutgoingEntries ¶
func (c *NATSValidationClient) GetOutgoingEntries(ctx context.Context, sourceEntityID string) ([]OutgoingEntry, error)
GetOutgoingEntries retrieves outgoing relationship entries for a source entity. Phase 6: Added for inverse edges scenario - verifies containers have outgoing 'contains' edges.
func (*NATSValidationClient) GetStructuralIndexInfo ¶
func (c *NATSValidationClient) GetStructuralIndexInfo(ctx context.Context) (*StructuralIndexInfo, error)
GetStructuralIndexInfo retrieves information about structural indices
func (*NATSValidationClient) GetTrajectory ¶
func (c *NATSValidationClient) GetTrajectory(ctx context.Context, loopID string) (*agentic.Trajectory, error)
GetTrajectory retrieves a trajectory via the agentic.query.trajectory NATS request/reply handler. The trajectory is served from the agentic-loop's in-memory cache.
func (*NATSValidationClient) GetWorkflowExecution ¶
func (c *NATSValidationClient) GetWorkflowExecution(ctx context.Context, execID string) (*WorkflowExecution, error)
GetWorkflowExecution retrieves a workflow execution by ID
func (*NATSValidationClient) GetWorkflowExecutionsByState ¶
func (c *NATSValidationClient) GetWorkflowExecutionsByState(ctx context.Context, state string) ([]*WorkflowExecution, error)
GetWorkflowExecutionsByState returns executions filtered by state
func (*NATSValidationClient) ListBuckets ¶
func (c *NATSValidationClient) ListBuckets(ctx context.Context) ([]string, error)
ListBuckets lists all KV buckets
func (*NATSValidationClient) ListOASFRecordIDs ¶
func (c *NATSValidationClient) ListOASFRecordIDs(ctx context.Context) ([]string, error)
ListOASFRecordIDs returns all OASF record entity IDs.
func (*NATSValidationClient) Publish ¶
Publish publishes a message to a NATS subject via JetStream. Used for injecting test messages into the system.
func (*NATSValidationClient) PutKV ¶
PutKV writes a key-value entry to a KV bucket. Used for test setup (e.g., registering workflow definitions).
func (*NATSValidationClient) ValidateIndexPopulated ¶
func (c *NATSValidationClient) ValidateIndexPopulated(ctx context.Context, indexName string) (bool, error)
ValidateIndexPopulated checks if an index bucket has entries Returns false, nil if bucket doesn't exist (graceful degradation)
func (*NATSValidationClient) WaitForAnomalyDetection ¶
func (c *NATSValidationClient) WaitForAnomalyDetection( ctx context.Context, timeout time.Duration, pollInterval time.Duration, ) (total int, err error)
WaitForAnomalyDetection waits for anomaly detection to complete by polling until the anomaly count stabilizes or timeout is reached. Returns the final total count and any error encountered.
func (*NATSValidationClient) WaitForCommunityEnhancement ¶
func (c *NATSValidationClient) WaitForCommunityEnhancement( ctx context.Context, timeout time.Duration, pollInterval time.Duration, ) (enhanced, failed, pending int, err error)
WaitForCommunityEnhancement polls communities until all reach terminal status Terminal statuses: "llm-enhanced" or "llm-failed" Returns counts of enhanced, failed, and pending communities
func (*NATSValidationClient) WaitForComponentCycleComplete ¶
func (c *NATSValidationClient) WaitForComponentCycleComplete( ctx context.Context, componentName string, timeout time.Duration, ) (*ComponentStatus, error)
WaitForComponentCycleComplete waits for a component to complete at least one processing cycle. Returns the component status when a cycle completes successfully.
func (*NATSValidationClient) WaitForComponentStage ¶
func (c *NATSValidationClient) WaitForComponentStage( ctx context.Context, componentName string, targetStage string, timeout time.Duration, ) (*ComponentStatus, error)
WaitForComponentStage waits for a component to reach a specific stage. Returns the component status when the stage is reached, or nil on timeout.
func (*NATSValidationClient) WaitForContainerGroupsSSE ¶
func (c *NATSValidationClient) WaitForContainerGroupsSSE( ctx context.Context, expectedCount int, timeout time.Duration, sseClient *SSEClient, ) (count int, usedSSE bool, err error)
WaitForContainerGroupsSSE waits for container groups (keys ending in ".group") using SSE. Falls back to polling if SSE is unavailable.
func (*NATSValidationClient) WaitForEntityCountSSE ¶
func (c *NATSValidationClient) WaitForEntityCountSSE( ctx context.Context, expectedCount int, timeout time.Duration, sseClient *SSEClient, ) EntityStabilizationResult
WaitForEntityCountSSE waits for entity count to reach target and stabilize using SSE. NATS KV watch sends all existing keys first (initial sync), then streams updates. We use UniqueKeyCountReaches to count unique non-deleted keys for accurate counting. Falls back to polling if SSE is unavailable.
func (*NATSValidationClient) WaitForKeySSE ¶
func (c *NATSValidationClient) WaitForKeySSE( ctx context.Context, bucket, key string, timeout time.Duration, sseClient *SSEClient, ) (found bool, usedSSE bool, err error)
WaitForKeySSE waits for a specific key to appear in a bucket using SSE streaming. Falls back to polling if SSE is unavailable.
func (*NATSValidationClient) WaitForOASFRecord ¶
func (c *NATSValidationClient) WaitForOASFRecord( ctx context.Context, entityID string, timeout time.Duration, ) (*OASFRecord, error)
WaitForOASFRecord waits for an OASF record to appear for an entity.
func (*NATSValidationClient) WaitForSourceEntityCountSSE ¶
func (c *NATSValidationClient) WaitForSourceEntityCountSSE( ctx context.Context, expectedCount int, timeout time.Duration, sseClient *SSEClient, ) EntityStabilizationResult
WaitForSourceEntityCountSSE waits for SOURCE entity count (excluding containers) to reach target. Container entities (ending in .group, .group.container, .group.container.level) are excluded. This is used to wait for testdata to fully load before validation.
func (*NATSValidationClient) WaitForWorkflowCompletion ¶
func (c *NATSValidationClient) WaitForWorkflowCompletion( ctx context.Context, workflowID string, timeout time.Duration, ) (*WorkflowExecution, error)
WaitForWorkflowCompletion waits for a workflow to complete successfully
func (*NATSValidationClient) WaitForWorkflowState ¶
func (c *NATSValidationClient) WaitForWorkflowState( ctx context.Context, workflowID string, targetStates []string, timeout time.Duration, ) (*WorkflowExecution, error)
WaitForWorkflowState waits for any workflow to reach a terminal state (completed or failed) Returns the execution that reached terminal state, or nil on timeout
func (*NATSValidationClient) WaitForWorkflowTerminal ¶
func (c *NATSValidationClient) WaitForWorkflowTerminal( ctx context.Context, workflowID string, timeout time.Duration, ) (*WorkflowExecution, error)
WaitForWorkflowTerminal waits for a workflow to reach any terminal state
type OASFDomain ¶
type OASFDomain struct {
Name string `json:"name"`
Description string `json:"description,omitempty"`
}
OASFDomain represents a domain in an OASF record.
type OASFRecord ¶
type OASFRecord struct {
Name string `json:"name"`
Version string `json:"version"`
SchemaVersion string `json:"schema_version"`
Authors []string `json:"authors"`
CreatedAt string `json:"created_at"`
Description string `json:"description"`
Skills []OASFSkill `json:"skills"`
Domains []OASFDomain `json:"domains,omitempty"`
Extensions map[string]any `json:"extensions,omitempty"`
}
OASFRecord represents an OASF (Open Agent Specification Framework) record for E2E testing validation.
type OASFSkill ¶
type OASFSkill struct {
ID string `json:"id"`
Name string `json:"name"`
Description string `json:"description,omitempty"`
Confidence float64 `json:"confidence,omitempty"`
Permissions []string `json:"permissions,omitempty"`
}
OASFSkill represents a skill in an OASF record.
type ObservabilityClient ¶
type ObservabilityClient struct {
// contains filtered or unexported fields
}
ObservabilityClient interacts with SemStreams component management endpoints
func NewObservabilityClient ¶
func NewObservabilityClient(baseURL string) *ObservabilityClient
NewObservabilityClient creates a new client for SemStreams observability endpoints
func (*ObservabilityClient) CheckFlowHealth ¶
func (c *ObservabilityClient) CheckFlowHealth(ctx context.Context) error
CheckFlowHealth performs flow validation and returns an error if there are critical issues. This is a convenience method for pre-flight checks in e2e test setup.
func (*ObservabilityClient) CountFileOutputLines ¶
func (c *ObservabilityClient) CountFileOutputLines( ctx context.Context, containerName string, pattern string, ) (int, error)
CountFileOutputLines counts lines in file output inside a container using docker exec. The containerName should match the container running the file output component. The pattern is the file glob pattern (e.g., "/tmp/streamkit-test*.jsonl"). Returns 0 if files don't exist (not an error - just means no output yet).
func (*ObservabilityClient) GetComponents ¶
func (c *ObservabilityClient) GetComponents(ctx context.Context) ([]ComponentInfo, error)
GetComponents retrieves information about all managed components
func (*ObservabilityClient) GetFileOutputLines ¶
func (c *ObservabilityClient) GetFileOutputLines( ctx context.Context, containerName string, pattern string, maxLines int, ) ([]string, error)
GetFileOutputLines retrieves the actual content lines from file output inside a container. Returns the lines as a slice of strings for content validation.
func (*ObservabilityClient) GetFlows ¶
func (c *ObservabilityClient) GetFlows(ctx context.Context) ([]FlowInfo, error)
GetFlows retrieves all flows from the flowbuilder API
func (*ObservabilityClient) GetPlatformHealth ¶
func (c *ObservabilityClient) GetPlatformHealth(ctx context.Context) (*PlatformHealth, error)
GetPlatformHealth retrieves overall platform health
func (*ObservabilityClient) ValidateFlowGraph ¶
func (c *ObservabilityClient) ValidateFlowGraph(ctx context.Context) (*FlowValidation, error)
ValidateFlowGraph calls /components/validate and returns the flow validation result. This performs pre-flight validation to catch configuration issues before running tests.
func (*ObservabilityClient) WaitForAllComponentsHealthy ¶
func (c *ObservabilityClient) WaitForAllComponentsHealthy(ctx context.Context, timeout time.Duration) error
WaitForAllComponentsHealthy waits until all components report healthy status.
func (*ObservabilityClient) WaitForComponentHealthy ¶
func (c *ObservabilityClient) WaitForComponentHealthy(ctx context.Context, name string, timeout time.Duration) error
WaitForComponentHealthy waits until a specific component reports healthy status. This is useful after Docker compose --wait passes (which only checks /health endpoint) but before individual components like graph processor have finished initialization.
type OrphanedPort ¶
type OrphanedPort struct {
ComponentName string `json:"component_name"`
PortName string `json:"port_name"`
Direction string `json:"direction"`
ConnectionID string `json:"connection_id"`
Pattern string `json:"pattern"`
Issue string `json:"issue"`
Required bool `json:"required"`
}
OrphanedPort represents a port with no connections
type OutgoingEntry ¶
type OutgoingEntry struct {
Predicate string `json:"predicate"`
ToEntityID string `json:"to_entity_id"`
}
OutgoingEntry matches the indexmanager.OutgoingEntry structure. Phase 6: Added to verify inverse edges are materialized in container's outgoing relationships.
type PivotMetadata ¶
type PivotMetadata struct {
Pivots []string `json:"pivots"`
EntityCount int `json:"entity_count"`
ComputedAt string `json:"computed_at"`
}
PivotMetadata contains pivot distance index metadata
type PlatformHealth ¶
type PlatformHealth struct {
Healthy bool `json:"healthy"`
Status string `json:"status"`
Message string `json:"message,omitempty"`
}
PlatformHealth represents overall platform health status
type ProfileClient ¶
type ProfileClient struct {
// contains filtered or unexported fields
}
ProfileClient captures pprof profiles from a running SemStreams instance. Profiles are saved to disk for analysis with `go tool pprof`.
func NewProfileClient ¶
func NewProfileClient(baseURL, outputDir string) *ProfileClient
NewProfileClient creates a new client for pprof endpoints. baseURL should be the pprof server address (e.g., "http://localhost:6060"). outputDir is where profile files will be saved.
func (*ProfileClient) CaptureAll ¶
CaptureAll captures heap, goroutine, and allocs profiles with a common prefix. Useful for getting a baseline or final snapshot.
func (*ProfileClient) CaptureAllocs ¶
CaptureAllocs saves a memory allocation profile. Returns the path to the saved profile file.
func (*ProfileClient) CaptureBlock ¶
CaptureBlock saves a goroutine blocking profile. Returns the path to the saved profile file.
func (*ProfileClient) CaptureBlockAndMutex ¶
func (c *ProfileClient) CaptureBlockAndMutex(ctx context.Context, prefix string) (map[string]string, error)
CaptureBlockAndMutex captures blocking and mutex contention profiles with a common prefix. Useful for profiling lock contention under load.
func (*ProfileClient) CaptureCPU ¶
CaptureCPU captures a CPU profile for the specified duration. Returns the path to the saved profile file.
func (*ProfileClient) CaptureGoroutine ¶
CaptureGoroutine saves goroutine stack traces. Returns the path to the saved profile file.
func (*ProfileClient) CaptureHeap ¶
CaptureHeap saves a heap profile snapshot. Returns the path to the saved profile file.
func (*ProfileClient) CaptureMutex ¶
CaptureMutex saves a mutex contention profile. Returns the path to the saved profile file.
func (*ProfileClient) CaptureTrace ¶
CaptureTrace captures an execution trace for the specified duration. Returns the path to the saved trace file.
func (*ProfileClient) IsAvailable ¶
func (c *ProfileClient) IsAvailable(ctx context.Context) bool
IsAvailable checks if the pprof endpoint is accessible.
type RuleMetrics ¶
type RuleMetrics struct {
// Rule evaluation metrics
Evaluations float64 `json:"evaluations"` // Total rule evaluations
Firings float64 `json:"firings"` // Rules that fired (conditions met)
// Action metrics
ActionsDispatched float64 `json:"actions_dispatched"` // Total actions dispatched
// Execution lifecycle metrics (legacy — kept for backwards compat, always 0)
ExecutionsCreated float64 `json:"executions_created"`
ExecutionsCompleted float64 `json:"executions_completed"`
ExecutionsFailed float64 `json:"executions_failed"`
// Callback metrics (legacy — kept for backwards compat, always 0)
CallbacksReceived float64 `json:"callbacks_received"`
}
RuleMetrics holds rule engine metrics for E2E tests. These map to the semstreams_rule_* Prometheus metrics from the rule processor.
type SSEClient ¶
type SSEClient struct {
// contains filtered or unexported fields
}
SSEClient handles Server-Sent Events connections for KV bucket watching
func NewSSEClient ¶
NewSSEClient creates a new SSE client
func (*SSEClient) WatchKVBucket ¶
func (c *SSEClient) WatchKVBucket( ctx context.Context, bucket string, condition KVWatchCondition, opts KVWatchOpts, ) ([]KVChangeEvent, error)
WatchKVBucket streams KV changes until the condition is satisfied or timeout
type SSEEvent ¶
type SSEEvent struct {
Event string // Event type (e.g., "connected", "kv_change", "error")
ID string // Event ID for reconnection
Data json.RawMessage // Event payload
}
SSEEvent represents a parsed SSE event
type StageMetrics ¶
type StageMetrics struct {
Stage string `json:"stage"`
MessagesIn int `json:"messages_in"`
MessagesOut int `json:"messages_out"`
Errors int `json:"errors"`
AvgLatency time.Duration `json:"avg_latency"`
MetricDeltas map[string]float64 `json:"metric_deltas,omitempty"`
}
StageMetrics tracks metrics for a specific processing stage
type StatusStreamEnvelope ¶
type StatusStreamEnvelope struct {
Type string `json:"type"`
ID string `json:"id"`
Timestamp int64 `json:"timestamp"`
FlowID string `json:"flow_id"`
Payload json.RawMessage `json:"payload,omitempty"`
}
StatusStreamEnvelope matches service.StatusStreamEnvelope
type StepResult ¶
type StepResult struct {
StepName string `json:"step_name"`
Status string `json:"status"`
Output json.RawMessage `json:"output,omitempty"`
Error string `json:"error,omitempty"`
Iteration int `json:"iteration"`
}
StepResult represents a workflow step result
type StreamWarning ¶
type StreamWarning struct {
Severity string `json:"severity"`
SubscriberComp string `json:"subscriber_component"`
SubscriberPort string `json:"subscriber_port"`
Subjects []string `json:"subjects"`
PublisherComps []string `json:"publisher_components"`
Issue string `json:"issue"`
}
StreamWarning represents a JetStream subscriber connected to NATS publisher issue
type StructuralIndexInfo ¶
type StructuralIndexInfo struct {
BucketExists bool `json:"bucket_exists"`
KeyCount int `json:"key_count"`
KCore *KCoreMetadata `json:"kcore,omitempty"`
Pivot *PivotMetadata `json:"pivot,omitempty"`
SampleKeys []string `json:"sample_keys,omitempty"`
}
StructuralIndexInfo contains information about structural indices
type SubscribeAck ¶
type SubscribeAck struct {
Type string `json:"type"` // Always "subscribe_ack"
Subscribed []string `json:"subscribed"` // Message types now subscribed
LogLevel string `json:"log_level"` // Current log level filter
Sources []string `json:"sources"` // Current source filters
}
SubscribeAck matches service.SubscribeAck (Server → Client acknowledgment)
type SubscribeCommand ¶
type SubscribeCommand struct {
Command string `json:"command"`
MessageTypes []string `json:"message_types,omitempty"`
LogLevel string `json:"log_level,omitempty"`
Sources []string `json:"sources,omitempty"`
}
SubscribeCommand matches service.SubscribeCommand
type TraceResponse ¶
type TraceResponse struct {
TraceID string `json:"trace_id"`
Count int `json:"count"`
Entries []MessageEntry `json:"entries"`
}
TraceResponse represents the response from the trace query endpoint
type Triple ¶
type Triple struct {
Subject string `json:"subject"`
Predicate string `json:"predicate"`
Object any `json:"object"`
}
Triple represents a semantic triple (subject, predicate, object)
type VirtualEdgeCounts ¶
type VirtualEdgeCounts struct {
Total int // Total virtual edges found
ByBand map[string]int // Counts by similarity band (high, medium, related)
AutoApplied int // Edges that were auto-applied
}
VirtualEdgeCounts holds counts of virtual edges by predicate and status.
type WaitOpts ¶
type WaitOpts struct {
Timeout time.Duration // Max wait time (default 30s)
PollInterval time.Duration // Check frequency (default 100ms)
Comparator string // ">=", "==", ">", "<", "<=" (default ">=")
}
WaitOpts configures metric waiting behavior
func DefaultWaitOpts ¶
func DefaultWaitOpts() WaitOpts
DefaultWaitOpts returns sensible defaults for metric waiting
type WatchStatusStreamCondition ¶
type WatchStatusStreamCondition func(envelopes []StatusStreamEnvelope) (satisfied bool, err error)
WatchStatusStreamCondition evaluates received envelopes
func CombineWSConditionsAnd ¶
func CombineWSConditionsAnd(conditions ...WatchStatusStreamCondition) WatchStatusStreamCondition
CombineWSConditionsAnd returns condition satisfied when all conditions are satisfied
func CombineWSConditionsOr ¶
func CombineWSConditionsOr(conditions ...WatchStatusStreamCondition) WatchStatusStreamCondition
CombineWSConditionsOr returns condition satisfied when any condition is satisfied
func EnvelopeCountReaches ¶
func EnvelopeCountReaches(target int) WatchStatusStreamCondition
EnvelopeCountReaches returns condition when total envelope count >= target
func HasAllMessageTypes ¶
func HasAllMessageTypes(types []string) WatchStatusStreamCondition
HasAllMessageTypes returns condition satisfied when all types appear
func HasMessageType ¶
func HasMessageType(msgType string) WatchStatusStreamCondition
HasMessageType returns condition satisfied when message type appears
func LogMessageContains ¶
func LogMessageContains(substr string) WatchStatusStreamCondition
LogMessageContains returns condition when log contains substring
func MessageTypeCountReaches ¶
func MessageTypeCountReaches(msgType string, target int) WatchStatusStreamCondition
MessageTypeCountReaches returns condition when count of type >= target
type WatchStatusStreamOpts ¶
type WatchStatusStreamOpts struct {
Timeout time.Duration
MessageTypes []string // Filter: flow_status, component_health, component_metrics, log_entry
LogLevel string // Minimum: DEBUG, INFO, WARN, ERROR
DrainDuration time.Duration // Continue collecting after condition met to capture burst
}
WatchStatusStreamOpts configures status stream watching
func DefaultWatchStatusStreamOpts ¶
func DefaultWatchStatusStreamOpts() WatchStatusStreamOpts
DefaultWatchStatusStreamOpts returns sensible defaults
type WebSocketClient ¶
type WebSocketClient struct {
// contains filtered or unexported fields
}
WebSocketClient handles WebSocket connections for status streaming
func NewWebSocketClient ¶
func NewWebSocketClient(baseURL string) *WebSocketClient
NewWebSocketClient creates a new WebSocket client
func (*WebSocketClient) Health ¶
func (c *WebSocketClient) Health(ctx context.Context, flowID string) error
Health checks if WebSocket endpoint is available by attempting a connection
func (*WebSocketClient) WatchStatusStream ¶
func (c *WebSocketClient) WatchStatusStream( ctx context.Context, flowID string, condition WatchStatusStreamCondition, opts WatchStatusStreamOpts, ) ([]StatusStreamEnvelope, error)
WatchStatusStream connects to WebSocket and collects envelopes until condition is met
type WorkflowExecution ¶
type WorkflowExecution struct {
ID string `json:"id"`
WorkflowID string `json:"workflow_id"`
WorkflowName string `json:"workflow_name"`
State string `json:"state"`
CurrentStep int `json:"current_step"`
CurrentName string `json:"current_name"`
Iteration int `json:"iteration"`
StepResults map[string]StepResult `json:"step_results,omitempty"`
Error string `json:"error,omitempty"`
Trigger map[string]interface{} `json:"trigger,omitempty"`
CreatedAt string `json:"created_at"`
UpdatedAt string `json:"updated_at"`
}
WorkflowExecution represents a workflow execution state from KV