e2e

package
v0.0.0-...-606ac71 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 18, 2026 License: Apache-2.0 Imports: 40 Imported by: 0

Documentation

Overview

Package e2e provides end-to-end test infrastructure for the tarsy pipeline.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func AnnotateAPITimelineWithAgent

func AnnotateAPITimelineWithAgent(projected []map[string]interface{}, apiEvents []interface{}, agentIndex map[string]string)

AnnotateAPITimelineWithAgent adds "agent" field to projected API timeline maps by looking up execution_id → agent_name. Session-level events (no execution_id) are left without an agent field.

func AnnotateTimelineWithAgent

func AnnotateTimelineWithAgent(projected []map[string]interface{}, timeline []*ent.TimelineEvent, agentIndex map[string]string)

AnnotateTimelineWithAgent adds "agent" field to projected timeline maps by looking up execution_id → agent_name. Session-level events (nil execution_id) are left without an agent field.

func AssertAllEventsHaveSessionID

func AssertAllEventsHaveSessionID(t *testing.T, actual []WSEvent, expectedSessionID string)

AssertAllEventsHaveSessionID verifies that every non-infra WS event carries the correct session_id. This is a contract check: the frontend routes events by data.session_id, so any event missing it would be silently dropped.

func AssertEventsInOrder

func AssertEventsInOrder(t *testing.T, actual []WSEvent, expected []testdata.ExpectedEvent)

AssertEventsInOrder verifies that each expected event appears in the actual WS events in the correct relative order. Extra and duplicate actual events are tolerated — only the expected sequence must be found in order.

Infra events (connection.established, subscription.confirmed, pong, catchup.overflow) are filtered out before matching.

func AssertGolden

func AssertGolden(t *testing.T, goldenPath string, actual []byte)

AssertGolden compares actual output against a golden file. If -update flag is set, writes actual to the golden file instead.

func AssertGoldenJSON

func AssertGoldenJSON(t *testing.T, goldenPath string, actual interface{}, normalizer *Normalizer)

AssertGoldenJSON normalizes JSON and compares against a golden file. The actual value is marshalled with sorted keys and indentation.

func AssertGoldenLLMInteraction

func AssertGoldenLLMInteraction(t *testing.T, goldenPath string, detail map[string]interface{}, normalizer *Normalizer)

AssertGoldenLLMInteraction renders an LLM interaction detail response in a human-readable format: metadata as JSON, then conversation messages as readable text blocks (not JSON-escaped strings).

func AssertGoldenMCPInteraction

func AssertGoldenMCPInteraction(t *testing.T, goldenPath string, detail map[string]interface{}, normalizer *Normalizer)

AssertGoldenMCPInteraction renders an MCP interaction detail response in a human-readable format: fields as pretty-printed JSON with tool_arguments and tool_result expanded for readability.

func AssertSessionTraceGoldens

func AssertSessionTraceGoldens(t *testing.T, app *TestApp, sessionID, goldenScenario string)

AssertSessionTraceGoldens performs the full golden file assertion sequence for a session: normalizer registration, timeline projection, trace list, and per-interaction golden files. Extracts the pattern shared across multiple e2e tests to avoid duplication.

func BuildAgentNameIndex

func BuildAgentNameIndex(execs []*ent.AgentExecution) map[string]string

BuildAgentNameIndex creates a map from execution_id → agent_name for annotating timeline projections with the agent that produced each event.

func ErrorToolHandler

func ErrorToolHandler(err error) mcpsdk.ToolHandler

ErrorToolHandler returns an mcpsdk.ToolHandler that returns the given error.

func GoldenPath

func GoldenPath(scenario, filename string) string

GoldenPath returns the path to a specific golden file for a scenario.

func ProjectAPITimelineForGolden

func ProjectAPITimelineForGolden(event map[string]interface{}) map[string]interface{}

ProjectAPITimelineForGolden extracts the same key fields from a JSON-parsed timeline event (from the API response) as ProjectTimelineForGolden does from an ent object. This enables golden comparison of API and DB results.

func ProjectStageForGolden

func ProjectStageForGolden(s *ent.Stage) map[string]interface{}

ProjectStageForGolden extracts key fields from a stage record for golden comparison.

func ProjectTimelineForGolden

func ProjectTimelineForGolden(te *ent.TimelineEvent) map[string]interface{}

ProjectTimelineForGolden extracts key fields from a timeline event for golden comparison.

func SetupInMemoryMCP

func SetupInMemoryMCP(t *testing.T, servers map[string]map[string]mcpsdk.ToolHandler) *mcp.ClientFactory

SetupInMemoryMCP creates in-memory MCP servers with scripted tool handlers and returns a real *mcp.ClientFactory backed by those servers.

Each call to ClientFactory.CreateClient creates fresh in-memory transports and sessions, so consecutive MCP clients (e.g. investigation → chat) get independent, non-shared connections.

servers maps serverID → (toolName → handler).

func SortTimelineProjection

func SortTimelineProjection(projected []map[string]interface{})

SortTimelineProjection sorts projected timeline maps deterministically. Primary sort: agent name (groups events by agent). Then sequence, event_type, content. Session-level events (no agent) sort last.

func StaticToolHandler

func StaticToolHandler(text string) mcpsdk.ToolHandler

StaticToolHandler returns an mcpsdk.ToolHandler that always returns the given text.

Types

type LLMScriptEntry

type LLMScriptEntry struct {
	// Response content (exactly one must be set)
	Chunks []agent.Chunk // Pre-built chunks to return
	Text   string        // Shorthand: auto-wrapped as TextChunk + UsageChunk
	Error  error         // Return error from Generate()

	// Test control
	BlockUntilCancelled bool            // Block Generate() until ctx is cancelled
	WaitCh              <-chan struct{} // Block Generate() until closed, then return normal response
	OnBlock             chan<- struct{} // Notified when Generate() enters its blocking path (BlockUntilCancelled or WaitCh)

	// RewriteChunks dynamically modifies chunks at Generate-time using the
	// current conversation. Used when tool call arguments depend on prior
	// results (e.g., cancel_agent needs an execution_id from dispatch_agent).
	RewriteChunks func(messages []agent.ConversationMessage, chunks []agent.Chunk) []agent.Chunk
}

LLMScriptEntry defines a single scripted LLM response.

type Normalizer

type Normalizer struct {
	// contains filtered or unexported fields
}

Normalizer replaces dynamic values with stable placeholders for golden comparison. IDs that appear multiple times get the same placeholder (preserving referential integrity).

func NewNormalizer

func NewNormalizer(sessionID string) *Normalizer

NewNormalizer creates a normalizer that knows the session ID to replace.

func (*Normalizer) Normalize

func (n *Normalizer) Normalize(data string) string

Normalize replaces dynamic values in data with stable placeholders.

func (*Normalizer) NormalizeBytes

func (n *Normalizer) NormalizeBytes(data []byte) []byte

NormalizeBytes is a convenience wrapper for Normalize on byte slices.

func (*Normalizer) RegisterChatID

func (n *Normalizer) RegisterChatID(id string)

RegisterChatID registers a chat UUID for stable replacement.

func (*Normalizer) RegisterExecutionID

func (n *Normalizer) RegisterExecutionID(id string)

RegisterExecutionID registers an execution UUID for stable replacement.

func (*Normalizer) RegisterInteractionID

func (n *Normalizer) RegisterInteractionID(id string)

RegisterInteractionID registers an interaction UUID for stable replacement.

func (*Normalizer) RegisterMessageID

func (n *Normalizer) RegisterMessageID(id string)

RegisterMessageID registers a message UUID for stable replacement.

func (*Normalizer) RegisterStageID

func (n *Normalizer) RegisterStageID(id string)

RegisterStageID registers a stage UUID for stable replacement. Call this in order of first appearance.

type ScriptedLLMClient

type ScriptedLLMClient struct {
	// contains filtered or unexported fields
}

ScriptedLLMClient implements agent.LLMClient with a dual-dispatch mock: sequential fallback for single-agent stages, plus agent-aware routing for parallel stages where call order is non-deterministic.

func NewScriptedLLMClient

func NewScriptedLLMClient() *ScriptedLLMClient

NewScriptedLLMClient creates a new ScriptedLLMClient.

func (*ScriptedLLMClient) AddRouted

func (c *ScriptedLLMClient) AddRouted(agentName string, entry LLMScriptEntry)

AddRouted adds an entry for a specific agent name (matched from system prompt). Used for parallel stages where agents need differentiated responses.

func (*ScriptedLLMClient) AddSequential

func (c *ScriptedLLMClient) AddSequential(entry LLMScriptEntry)

AddSequential adds an entry consumed in order for non-routed calls. Used for single-agent stages, synthesis, executive summary, chat, summarization, etc.

func (*ScriptedLLMClient) CallCount

func (c *ScriptedLLMClient) CallCount() int

CallCount returns the total number of Generate() calls made.

func (*ScriptedLLMClient) CapturedInputs

func (c *ScriptedLLMClient) CapturedInputs() []*agent.GenerateInput

CapturedInputs returns a copy of all captured GenerateInput values in call order.

func (*ScriptedLLMClient) Close

func (c *ScriptedLLMClient) Close() error

Close implements agent.LLMClient.

func (*ScriptedLLMClient) Generate

func (c *ScriptedLLMClient) Generate(ctx context.Context, input *agent.GenerateInput) (<-chan agent.Chunk, error)

Generate implements agent.LLMClient.

type TestApp

type TestApp struct {
	// Core
	Config    *config.Config
	DBClient  *database.Client
	EntClient *ent.Client

	// Mocks / test wiring
	LLMClient  *ScriptedLLMClient
	MCPFactory *mcp.ClientFactory // real factory backed by in-memory MCP SDK servers

	// Real infrastructure
	EventPublisher  *events.EventPublisher
	ConnManager     *events.ConnectionManager
	NotifyListener  *events.NotifyListener
	WorkerPool      *queue.WorkerPool
	ScoringExecutor *queue.ScoringExecutor
	ChatExecutor    *queue.ChatMessageExecutor
	Server          *api.Server

	// Runtime
	BaseURL string // e.g. "http://127.0.0.1:54321"
	WSURL   string // e.g. "ws://127.0.0.1:54321/ws"
	// contains filtered or unexported fields
}

TestApp boots a complete TARSy instance for e2e testing.

func NewTestApp

func NewTestApp(t *testing.T, opts ...TestAppOption) *TestApp

NewTestApp creates and starts a full TARSy test instance. Shutdown is registered via t.Cleanup automatically.

func (*TestApp) AwaitLLMInteractionIncrease

func (app *TestApp) AwaitLLMInteractionIncrease(sessionID string, baseline int) bool

AwaitLLMInteractionIncrease polls until the LLM interaction count exceeds the given baseline, indicating the orchestrator has recorded a new response. Returns true on success, false on timeout (10s). The test's own timeout via WaitForSessionStatus (30s) is the primary failsafe for goroutine callers.

func (*TestApp) CancelSession

func (app *TestApp) CancelSession(t *testing.T, sessionID string) map[string]interface{}

CancelSession sends POST /api/v1/sessions/:id/cancel.

func (*TestApp) CountLLMInteractions

func (app *TestApp) CountLLMInteractions(sessionID string) (int, error)

CountLLMInteractions returns the current LLM interaction count for a session.

func (*TestApp) DeleteMemory

func (app *TestApp) DeleteMemory(t *testing.T, memoryID string)

DeleteMemory calls DELETE /api/v1/memories/:id.

func (*TestApp) GetActiveSessions

func (app *TestApp) GetActiveSessions(t *testing.T) map[string]interface{}

GetActiveSessions calls GET /api/v1/sessions/active.

func (*TestApp) GetAlertTypes

func (app *TestApp) GetAlertTypes(t *testing.T) map[string]interface{}

GetAlertTypes calls GET /api/v1/alert-types.

func (*TestApp) GetDefaultTools

func (app *TestApp) GetDefaultTools(t *testing.T) map[string]interface{}

GetDefaultTools calls GET /api/v1/system/default-tools.

func (*TestApp) GetFilterOptions

func (app *TestApp) GetFilterOptions(t *testing.T) map[string]interface{}

GetFilterOptions calls GET /api/v1/sessions/filter-options.

func (*TestApp) GetHealth

func (app *TestApp) GetHealth(t *testing.T) map[string]interface{}

GetHealth calls GET /health.

func (*TestApp) GetInjectedMemories

func (app *TestApp) GetInjectedMemories(t *testing.T, sessionID string) []interface{}

GetInjectedMemories calls GET /api/v1/sessions/:id/injected-memories.

func (*TestApp) GetLLMInteractionDetail

func (app *TestApp) GetLLMInteractionDetail(t *testing.T, sessionID, interactionID string) map[string]interface{}

GetLLMInteractionDetail calls GET /api/v1/sessions/:id/trace/llm/:interaction_id.

func (*TestApp) GetMCPInteractionDetail

func (app *TestApp) GetMCPInteractionDetail(t *testing.T, sessionID, interactionID string) map[string]interface{}

GetMCPInteractionDetail calls GET /api/v1/sessions/:id/trace/mcp/:interaction_id.

func (*TestApp) GetMCPServers

func (app *TestApp) GetMCPServers(t *testing.T) map[string]interface{}

GetMCPServers calls GET /api/v1/system/mcp-servers.

func (*TestApp) GetMemory

func (app *TestApp) GetMemory(t *testing.T, memoryID string) map[string]interface{}

GetMemory calls GET /api/v1/memories/:id.

func (*TestApp) GetMemoryExpectStatus

func (app *TestApp) GetMemoryExpectStatus(t *testing.T, memoryID string, expectedStatus int)

GetMemoryExpectStatus calls GET /api/v1/memories/:id with an expected status code.

func (*TestApp) GetReviewActivity

func (app *TestApp) GetReviewActivity(t *testing.T, sessionID string) map[string]interface{}

GetReviewActivity calls GET /api/v1/sessions/:id/review-activity.

func (*TestApp) GetRunbooks

func (app *TestApp) GetRunbooks(t *testing.T) []interface{}

GetRunbooks calls GET /api/v1/runbooks and returns the parsed JSON array.

func (*TestApp) GetScore

func (app *TestApp) GetScore(t *testing.T, sessionID string) map[string]interface{}

GetScore calls GET /api/v1/sessions/:id/score and returns the response body.

func (*TestApp) GetSession

func (app *TestApp) GetSession(t *testing.T, sessionID string) map[string]interface{}

GetSession retrieves a session by ID.

func (*TestApp) GetSessionList

func (app *TestApp) GetSessionList(t *testing.T, queryParams string) map[string]interface{}

GetSessionList calls GET /api/v1/sessions with optional query params.

func (*TestApp) GetSessionMemories

func (app *TestApp) GetSessionMemories(t *testing.T, sessionID string) []interface{}

GetSessionMemories calls GET /api/v1/sessions/:id/memories.

func (*TestApp) GetSessionStatus

func (app *TestApp) GetSessionStatus(t *testing.T, sessionID string) map[string]interface{}

GetSessionStatus calls GET /api/v1/sessions/:id/status.

func (*TestApp) GetSessionSummary

func (app *TestApp) GetSessionSummary(t *testing.T, sessionID string) map[string]interface{}

GetSessionSummary calls GET /api/v1/sessions/:id/summary.

func (*TestApp) GetSystemWarnings

func (app *TestApp) GetSystemWarnings(t *testing.T) map[string]interface{}

GetSystemWarnings calls GET /api/v1/system/warnings.

func (*TestApp) GetTimeline

func (app *TestApp) GetTimeline(t *testing.T, sessionID string) []interface{}

GetTimeline calls GET /api/v1/sessions/:id/timeline. Returns the parsed JSON array of timeline events.

func (*TestApp) GetTraceList

func (app *TestApp) GetTraceList(t *testing.T, sessionID string) map[string]interface{}

GetTraceList calls GET /api/v1/sessions/:id/trace.

func (*TestApp) GetTriageGroup

func (app *TestApp) GetTriageGroup(t *testing.T, group string, queryParams string) map[string]interface{}

GetTriageGroup calls GET /api/v1/sessions/triage/:group with optional query params.

func (*TestApp) ListMemories

func (app *TestApp) ListMemories(t *testing.T, queryParams string) map[string]interface{}

ListMemories calls GET /api/v1/memories with optional query params.

func (*TestApp) PatchReview

func (app *TestApp) PatchReview(t *testing.T, sessionID string, body map[string]interface{}) map[string]interface{}

PatchReview calls PATCH /api/v1/sessions/review with the given session ID injected into the body as session_ids. Returns the raw response map.

func (*TestApp) QueryExecutions

func (app *TestApp) QueryExecutions(t *testing.T, sessionID string) []*ent.AgentExecution

QueryExecutions returns all agent executions for a session.

func (*TestApp) QuerySessionsByStatus

func (app *TestApp) QuerySessionsByStatus(t *testing.T, status string) []string

QuerySessionsByStatus returns session IDs matching the given status.

func (*TestApp) QueryStages

func (app *TestApp) QueryStages(t *testing.T, sessionID string) []*ent.Stage

QueryStages returns all stages for a session, ordered by index.

func (*TestApp) QueryTimeline

func (app *TestApp) QueryTimeline(t *testing.T, sessionID string) []*ent.TimelineEvent

QueryTimeline returns all timeline events for a session, ordered by sequence.

func (*TestApp) SendChatMessage

func (app *TestApp) SendChatMessage(t *testing.T, sessionID, content string) map[string]interface{}

SendChatMessage sends a POST /api/v1/sessions/:id/chat/messages. Returns the response map with chat_id, message_id, stage_id.

func (*TestApp) SubmitAlert

func (app *TestApp) SubmitAlert(t *testing.T, alertType, data string) map[string]interface{}

SubmitAlert posts an alert and returns the parsed response.

func (*TestApp) SubmitAlertWithFingerprint

func (app *TestApp) SubmitAlertWithFingerprint(t *testing.T, alertType, data, fingerprint string) map[string]interface{}

SubmitAlertWithFingerprint posts an alert with a Slack message fingerprint and returns the parsed response.

func (*TestApp) SubmitAlertWithRunbook

func (app *TestApp) SubmitAlertWithRunbook(t *testing.T, alertType, data, runbookURL string) map[string]interface{}

SubmitAlertWithRunbook posts an alert with a runbook URL and returns the parsed response.

func (*TestApp) UpdateMemory

func (app *TestApp) UpdateMemory(t *testing.T, memoryID string, body map[string]interface{}) map[string]interface{}

UpdateMemory calls PATCH /api/v1/memories/:id.

func (*TestApp) WaitForActiveStage

func (app *TestApp) WaitForActiveStage(t *testing.T, sessionID string) *ent.Stage

WaitForActiveStage polls the DB until a stage with "active" status exists for the given session and returns it. Useful for cancellation tests where you need to wait until execution has started before cancelling.

func (*TestApp) WaitForNSessionsInStatus

func (app *TestApp) WaitForNSessionsInStatus(t *testing.T, n int, status string)

WaitForNSessionsInStatus waits until exactly n sessions have the given status. It inlines the DB query (instead of calling QuerySessionsByStatus) so that transient DB errors cause a retry rather than aborting the test via require.NoError.

func (*TestApp) WaitForSessionStatus

func (app *TestApp) WaitForSessionStatus(t *testing.T, sessionID string, expected ...string) string

WaitForSessionStatus polls the DB until the session reaches the expected status.

func (*TestApp) WaitForStageStatus

func (app *TestApp) WaitForStageStatus(t *testing.T, stageID string, expected ...string) string

WaitForStageStatus polls the DB until the stage reaches a terminal status. Returns the terminal status string.

type TestAppOption

type TestAppOption func(*testAppConfig)

TestAppOption configures the test app.

func WithChatTimeout

func WithChatTimeout(d time.Duration) TestAppOption

WithChatTimeout sets the timeout for chat message execution.

func WithConfig

func WithConfig(cfg *config.Config) TestAppOption

WithConfig sets a custom config.

func WithDBClient

func WithDBClient(client *database.Client) TestAppOption

WithDBClient injects a pre-created database client, skipping the default per-test schema creation. Used for multi-replica tests where multiple TestApp instances share the same database schema.

func WithLLMClient

func WithLLMClient(client *ScriptedLLMClient) TestAppOption

WithLLMClient sets a pre-scripted LLM client.

func WithMCPServers

func WithMCPServers(servers map[string]map[string]mcpsdk.ToolHandler) TestAppOption

WithMCPServers sets in-memory MCP SDK servers. Maps serverID → (toolName → handler).

func WithMaxConcurrentSessions

func WithMaxConcurrentSessions(n int) TestAppOption

WithMaxConcurrentSessions sets the maximum number of concurrently executing sessions.

func WithMemoryService

func WithMemoryService(svc *memory.Service, cfg *config.MemoryConfig) TestAppOption

WithMemoryService injects a pre-created memory service for memory injection and recall tests. Must be paired with WithDBClient using the same database.

func WithPodID

func WithPodID(id string) TestAppOption

WithPodID overrides the auto-generated pod ID. Required for multi-replica tests so each replica gets a distinct identity for worker claiming and orphan detection.

func WithSessionTimeout

func WithSessionTimeout(d time.Duration) TestAppOption

WithSessionTimeout sets the timeout for investigation session execution.

func WithSlackService

func WithSlackService(svc *tarsyslack.Service) TestAppOption

WithSlackService injects a Slack notification service into the worker pool. Used for testing Slack integration with a mock API server.

func WithWorkerCount

func WithWorkerCount(n int) TestAppOption

WithWorkerCount sets the number of worker pool goroutines.

type WSClient

type WSClient struct {
	// contains filtered or unexported fields
}

WSClient connects to the TARSy WebSocket endpoint and collects events.

func WSConnect

func WSConnect(ctx context.Context, wsURL string) (*WSClient, error)

WSConnect establishes a WebSocket connection to the test server and starts collecting events in a background goroutine.

func (*WSClient) Close

func (c *WSClient) Close() error

Close closes the WebSocket connection and waits for the read loop to exit.

func (*WSClient) Events

func (c *WSClient) Events() []WSEvent

Events returns a snapshot of all collected events.

func (*WSClient) Subscribe

func (c *WSClient) Subscribe(channel string) error

Subscribe sends a subscribe action for the given channel and waits for the server to confirm. This ensures the LISTEN + auto-catchup has completed before the caller proceeds, avoiding a race where events are checked before the server has delivered catchup events.

func (*WSClient) WaitForEvent

func (c *WSClient) WaitForEvent(t interface {
	Helper()
	Fatalf(string, ...interface{})
}, match func(WSEvent) bool, timeout time.Duration, msgAndArgs ...interface{})

WaitForEvent polls the collected events until one matches the predicate or the timeout expires. This is preferred over time.Sleep for waiting on trailing WS events, as it adapts to CI load automatically.

type WSEvent

type WSEvent struct {
	Type     string                 `json:"type"`
	Raw      json.RawMessage        // Original JSON
	Parsed   map[string]interface{} // Parsed for assertions
	Received time.Time              // When we received it
}

WSEvent represents a received WebSocket event.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL