server

package
v1.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 19, 2026 License: Apache-2.0 Imports: 57 Imported by: 0

Documentation

Overview

Copyright 2026 Teradata

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Index

Constants

View Source
const (
	// DefaultProgressBufferSize is the default buffer size for progress event channels
	DefaultProgressBufferSize = 10
)
View Source
const (
	// UserIDHeader is the gRPC metadata key for the user ID.
	UserIDHeader = "x-user-id"
)

Variables

This section is empty.

Functions

func ContainsUIIntent added in v1.2.0

func ContainsUIIntent(msg string) bool

ContainsUIIntent reports whether msg suggests the user wants a UI visualization. Case-insensitive keyword scan — fast and deterministic, no LLM cost.

func ConvertMessage

func ConvertMessage(m *agent.Message) *loomv1.Message

ConvertMessage converts an agent.Message to proto format.

func ConvertSession

func ConvertSession(s *agent.Session) *loomv1.Session

ConvertSession converts an agent.Session to proto format.

func ConvertTool

func ConvertTool(t shuttle.Tool) *loomv1.ToolDefinition

ConvertTool converts a shuttle.Tool to proto format with rich metadata. Attempts to load metadata from YAML files; falls back to basic info if not found. Logs warnings if metadata loading fails but continues with basic tool definition.

func GenerateSessionID

func GenerateSessionID() string

GenerateSessionID generates a new session ID.

func UIAppTools added in v1.2.0

func UIAppTools(compiler AppCompiler, provider AppProvider) []shuttle.Tool

UIAppTools returns shuttle.Tool implementations that allow server-side agents to create, update, list, and delete MCP UI apps. These tools are registered lazily — only when ContainsUIIntent signals that the user wants a visualization.

func UserIDStreamInterceptor added in v1.2.0

func UserIDStreamInterceptor(cfg UserIDConfig) grpc.StreamServerInterceptor

UserIDStreamInterceptor extracts X-User-ID from gRPC metadata and injects into context for streaming RPCs.

func UserIDUnaryInterceptor added in v1.2.0

func UserIDUnaryInterceptor(cfg UserIDConfig) grpc.UnaryServerInterceptor

UserIDUnaryInterceptor extracts X-User-ID from gRPC metadata and injects into context via ContextWithUserID. Returns codes.Unauthenticated if missing and RequireUserID is true.

func ValidateProviders added in v1.2.0

func ValidateProviders(ctx context.Context, agents map[string]*agent.Agent) error

ValidateProviders performs a preflight health check on all configured LLM providers. It deduplicates providers across agents (many agents often share one provider) and runs all checks concurrently, so startup time is bounded by the slowest provider rather than O(agents × latency).

Called during server startup to ensure all providers are reachable before serving requests. Returns an error if any provider fails the health check.

Types

type AdminServer added in v1.2.0

type AdminServer struct {
	loomv1.UnimplementedAdminServiceServer
	// contains filtered or unexported fields
}

AdminServer implements the AdminService gRPC service. It wraps an AdminStorage backend to expose admin operations via gRPC. Access is optionally gated by a token passed in the "x-admin-token" gRPC metadata header.

func NewAdminServer added in v1.2.0

func NewAdminServer(store agent.AdminStorage, adminToken string) *AdminServer

NewAdminServer creates a new admin gRPC server. If adminToken is non-empty, every RPC will require the caller to supply a matching "x-admin-token" metadata header. An empty adminToken disables the check. Returns nil if no admin storage is configured (graceful degradation).

func (*AdminServer) CountSessionsByUser added in v1.2.0

CountSessionsByUser returns session counts grouped by user.

func (*AdminServer) GetSystemStats added in v1.2.0

GetSystemStats returns aggregate system statistics across all users.

func (*AdminServer) ListAllSessions added in v1.2.0

ListAllSessions lists sessions across all users (bypasses RLS).

type AppCompiler added in v1.2.0

type AppCompiler interface {
	Compile(spec *loomv1.UIAppSpec) ([]byte, error)
	Validate(spec *loomv1.UIAppSpec) error
	ListComponentTypes() []*loomv1.ComponentType
}

AppCompiler compiles UIAppSpec to HTML. Separated from AppProvider to keep the registry focused on storage and the compiler focused on validation/rendering.

type AppHTMLProvider added in v1.2.0

type AppHTMLProvider interface {
	AppNames() []string
	AppHTML(name string) ([]byte, error)
}

AppHTMLProvider provides HTML content for UI apps. Used by the HTTP server to serve apps in the browser.

type AppProvider added in v1.2.0

type AppProvider interface {
	ListAppInfo() []apps.AppInfo
	GetAppHTML(name string) ([]byte, *apps.AppInfo, error)
	CreateApp(name, displayName, description string, html []byte, overwrite bool) (*apps.AppInfo, bool, error)
	UpdateApp(name, displayName, description string, html []byte) (*apps.AppInfo, error)
	DeleteApp(name string) error
}

AppProvider provides UI app information to the gRPC server. Implemented by UIResourceRegistry to avoid tight coupling between the server package and the apps package internals.

type BusMessage added in v1.1.0

type BusMessage struct {
	// contains filtered or unexported fields
}

BusMessage wraps a bus message with its topic for processing

type CORSConfig added in v1.0.2

type CORSConfig struct {
	Enabled          bool
	AllowedOrigins   []string
	AllowedMethods   []string
	AllowedHeaders   []string
	ExposedHeaders   []string
	AllowCredentials bool
	MaxAge           int
}

CORSConfig holds CORS configuration

func DefaultCORSConfig added in v1.0.2

func DefaultCORSConfig() CORSConfig

DefaultCORSConfig returns a permissive CORS configuration

type HTTPServer

type HTTPServer struct {
	// contains filtered or unexported fields
}

HTTPServer wraps gRPC server with HTTP/REST+SSE endpoints

func NewHTTPServer

func NewHTTPServer(grpcServer *MultiAgentServer, httpAddr, grpcAddr string, logger *zap.Logger) *HTTPServer

NewHTTPServer creates an HTTP server that proxies to gRPC

func NewHTTPServerWithCORS added in v1.0.2

func NewHTTPServerWithCORS(grpcServer *MultiAgentServer, httpAddr, grpcAddr string, logger *zap.Logger, corsConfig CORSConfig) *HTTPServer

NewHTTPServerWithCORS creates an HTTP server with custom CORS configuration

func (*HTTPServer) SetAppHTMLProvider added in v1.2.0

func (h *HTTPServer) SetAppHTMLProvider(p AppHTMLProvider)

SetAppHTMLProvider sets the provider used to serve UI apps over HTTP. Must be called before Start(); not safe for concurrent use.

func (*HTTPServer) Start

func (h *HTTPServer) Start(ctx context.Context) error

Start starts the HTTP server

func (*HTTPServer) Stop

func (h *HTTPServer) Stop(ctx context.Context) error

Stop gracefully stops the HTTP server

type JudgeServer added in v1.2.0

type JudgeServer struct {
	loomv1.UnimplementedJudgeServiceServer
	// contains filtered or unexported fields
}

JudgeServer implements loomv1.JudgeServiceServer. It stores judge configurations in memory and executes evaluations via judges.LLMJudge (no hawk build tag required).

func NewJudgeServer added in v1.2.0

func NewJudgeServer(tracer observability.Tracer, logger *zap.Logger) *JudgeServer

NewJudgeServer creates a new JudgeServer with the given tracer and logger.

func (*JudgeServer) EvaluateWithJudges added in v1.2.0

func (s *JudgeServer) EvaluateWithJudges(ctx context.Context, req *loomv1.EvaluateRequest) (*loomv1.EvaluateResponse, error)

EvaluateWithJudges runs named judges against the provided context and returns aggregated results.

func (*JudgeServer) EvaluateWithJudgesStream added in v1.2.0

EvaluateWithJudgesStream runs judges with streaming progress events.

func (*JudgeServer) GetJudgeConfig added in v1.2.0

func (s *JudgeServer) GetJudgeConfig(id string) (*loomv1.JudgeConfig, error)

GetJudgeConfig returns the judge config for the given ID. Used by Server to resolve req.JudgeId in ABTest.

func (*JudgeServer) GetJudgeHistory added in v1.2.0

GetJudgeHistory returns empty history (no persistence in this implementation).

func (*JudgeServer) RegisterJudge added in v1.2.0

RegisterJudge stores a judge configuration.

func (*JudgeServer) SetProviderPool added in v1.2.0

func (s *JudgeServer) SetProviderPool(pool map[string]agent.LLMProvider, defaultP agent.LLMProvider)

SetProviderPool configures the provider pool and default fallback provider.

type LearningService

type LearningService struct {
	loomv1.UnimplementedLearningAgentServiceServer
	// contains filtered or unexported fields
}

LearningService implements the LearningAgentService gRPC interface. It wraps the LearningAgent and provides gRPC service endpoints.

func NewLearningService

func NewLearningService(agent *learning.LearningAgent) *LearningService

NewLearningService creates a new LearningService gRPC wrapper

func (*LearningService) AnalyzePatternEffectiveness

AnalyzePatternEffectiveness analyzes pattern performance over a time window

func (*LearningService) ApplyImprovement

ApplyImprovement applies an improvement proposal

func (*LearningService) GenerateImprovements

GenerateImprovements generates improvement proposals based on analysis

func (*LearningService) GetImprovementHistory

GetImprovementHistory retrieves improvement history

func (*LearningService) RollbackImprovement

RollbackImprovement rolls back a previously applied improvement

func (*LearningService) StreamPatternMetrics

StreamPatternMetrics streams real-time pattern metrics 🚨 CRITICAL: This implements the streaming RPC using MessageBus

type MultiAgentServer

type MultiAgentServer struct {
	loomv1.UnimplementedLoomServiceServer
	// contains filtered or unexported fields
}

MultiAgentServer implements the LoomService gRPC server with support for multiple agents. It routes requests to the appropriate agent based on agent_id in the request.

func NewMultiAgentServer

func NewMultiAgentServer(agents map[string]*agent.Agent, store agent.SessionStorage) *MultiAgentServer

NewMultiAgentServer creates a new multi-agent LoomService server.

func (*MultiAgentServer) ABTest added in v1.2.0

ABTest runs an A/B test across multiple named providers. Delegates to the internal single-agent Server so the implementation is shared.

func (*MultiAgentServer) AddAgent

func (s *MultiAgentServer) AddAgent(id string, ag *agent.Agent)

AddAgent adds a new agent to the server at runtime

func (*MultiAgentServer) AddMCPServer

AddMCPServer adds a new MCP server configuration.

func (*MultiAgentServer) AnswerClarificationQuestion

AnswerClarificationQuestion provides an answer to a clarification question asked by an agent.

func (*MultiAgentServer) ConfigureCommunication

ConfigureCommunication initializes the tri-modal communication system for all agents. This should be called after NewMultiAgentServer() but before starting the server.

The three communication modes: 1. MessageBus - Broadcast/pub-sub for topic-based multicast 2. MessageQueue - Point-to-point async messaging with queuing 3. SharedMemoryStore - Zero-copy data sharing with namespaces

func (*MultiAgentServer) ConfigureLegacyCommunication

func (s *MultiAgentServer) ConfigureLegacyCommunication(refStore communication.ReferenceStore, policy *communication.PolicyManager) error

ConfigureLegacyCommunication initializes basic inter-agent communication with reference store and policy. Deprecated: Use ConfigureCommunication() from communication_handlers.go for tri-modal communication.

func (*MultiAgentServer) ConfigureScheduler

func (s *MultiAgentServer) ConfigureScheduler(sched *scheduler.Scheduler)

ConfigureScheduler injects the workflow scheduler for cron-based execution. This should be called after NewMultiAgentServer() to enable schedule management RPCs.

func (*MultiAgentServer) ConfigureSharedMemory

func (s *MultiAgentServer) ConfigureSharedMemory(config *storage.Config) error

ConfigureSharedMemory initializes shared memory for all agents with the given configuration. This should be called after NewMultiAgentServer() but before starting the server. All agents will share the same memory store, enabling efficient data passing between them.

func (*MultiAgentServer) ConfigureTLS

func (s *MultiAgentServer) ConfigureTLS(manager *tls.Manager, config *loomv1.ServerConfig)

ConfigureTLS sets the TLS manager and merges TLS config into the server configuration. This should be called after SetServerConfig() if TLS is enabled.

func (*MultiAgentServer) CreateAgentFromConfig added in v1.2.0

func (s *MultiAgentServer) CreateAgentFromConfig(ctx context.Context, req *loomv1.CreateAgentRequest) (*loomv1.AgentInfo, error)

CreateAgentFromConfig creates a new agent from the provided configuration or config file path.

func (*MultiAgentServer) CreatePattern

CreatePattern creates a new pattern at runtime for a specific agent.

func (*MultiAgentServer) CreateSession

CreateSession creates a new conversation session for an agent.

func (*MultiAgentServer) CreateUIApp added in v1.2.0

CreateUIApp creates a dynamic UI app from a declarative spec.

func (*MultiAgentServer) DeleteAgent added in v1.2.0

DeleteAgent removes an agent from the server. If force is false and the agent is running, returns FailedPrecondition. If force is true, the agent is stopped first then deleted.

func (*MultiAgentServer) DeleteArtifact

DeleteArtifact deletes an artifact (soft or hard delete).

func (*MultiAgentServer) DeleteMCPServer

DeleteMCPServer removes an MCP server.

func (*MultiAgentServer) DeleteScheduledWorkflow

func (s *MultiAgentServer) DeleteScheduledWorkflow(ctx context.Context, req *loomv1.DeleteScheduledWorkflowRequest) (*emptypb.Empty, error)

DeleteScheduledWorkflow deletes a scheduled workflow. YAML-sourced schedules cannot be deleted via RPC - they must be removed by deleting the YAML file.

func (*MultiAgentServer) DeleteSession

DeleteSession deletes a session.

func (*MultiAgentServer) DeleteSharedMemory

DeleteSharedMemory removes a value from shared memory.

func (*MultiAgentServer) DeleteUIApp added in v1.2.0

DeleteUIApp deletes a dynamic UI app.

func (*MultiAgentServer) DespawnSubAgent added in v1.1.0

DespawnSubAgent terminates a spawned sub-agent. This implements the builtin.DespawnHandler interface.

func (*MultiAgentServer) ExecuteWorkflow

ExecuteWorkflow executes a workflow pattern loaded from YAML or programmatically defined. This RPC enables automatic execution of multi-agent workflows.

func (*MultiAgentServer) GetAgent added in v1.2.0

GetAgent retrieves information about a specific agent.

func (*MultiAgentServer) GetAgentIDs

func (s *MultiAgentServer) GetAgentIDs() []string

GetAgentIDs returns available agent IDs (internal helper)

func (*MultiAgentServer) GetArtifact

GetArtifact retrieves artifact metadata.

func (*MultiAgentServer) GetArtifactContent

GetArtifactContent reads artifact file content.

func (*MultiAgentServer) GetArtifactStats

GetArtifactStats retrieves storage statistics.

func (*MultiAgentServer) GetCommunicationComponents

GetCommunicationComponents returns the communication system components (for testing/inspection).

func (*MultiAgentServer) GetCommunicationPolicy

func (s *MultiAgentServer) GetCommunicationPolicy() *communication.PolicyManager

GetCommunicationPolicy returns the communication policy (for testing/inspection).

func (*MultiAgentServer) GetConversationHistory

GetConversationHistory retrieves conversation history.

func (*MultiAgentServer) GetHealth

GetHealth performs a health check by pinging each unique LLM provider. Providers are deduplicated across agents (many agents share the same provider) and checked concurrently, so latency is O(slowest_provider) not O(agents × latency). Returns per-provider status in the components map.

func (*MultiAgentServer) GetMCPServer

GetMCPServer retrieves a specific MCP server.

func (*MultiAgentServer) GetPattern added in v1.2.0

GetPattern retrieves a specific pattern by exact name match. Searches across all agents' pattern libraries.

func (*MultiAgentServer) GetPendingPermissions added in v1.2.0

func (s *MultiAgentServer) GetPendingPermissions() map[string]*pendingPermission

GetPendingPermissions returns all pending permission requests. Useful for UI layers that need to display pending permission requests.

func (*MultiAgentServer) GetReferenceStore

func (s *MultiAgentServer) GetReferenceStore() communication.ReferenceStore

GetReferenceStore returns the reference store (for testing/inspection).

func (*MultiAgentServer) GetScheduleHistory

GetScheduleHistory returns execution history for a schedule. Includes the last N executions with their status, timing, and error information.

func (*MultiAgentServer) GetScheduledWorkflow

GetScheduledWorkflow retrieves a scheduled workflow by ID.

func (*MultiAgentServer) GetServerConfig

GetServerConfig returns the current server configuration.

func (*MultiAgentServer) GetSession

GetSession retrieves session details.

func (*MultiAgentServer) GetSharedMemory

GetSharedMemory retrieves a value from shared memory.

func (*MultiAgentServer) GetSharedMemoryStats

GetSharedMemoryStats retrieves statistics for a namespace.

func (*MultiAgentServer) GetStorageStatus added in v1.2.0

GetStorageStatus returns the health status of the storage backend.

func (*MultiAgentServer) GetTLSStatus

GetTLSStatus returns the current TLS/certificate status.

func (*MultiAgentServer) GetTrace added in v1.2.0

GetTrace retrieves a trace by ID from the server's local trace store. The observability.Tracer interface does not support trace retrieval, so the server maintains its own trace store. If no trace store is initialized, it returns FailedPrecondition. If the trace is not found, it returns NotFound.

func (*MultiAgentServer) GetUIApp added in v1.2.0

GetUIApp retrieves a specific UI app by short name, returning its metadata and HTML content.

func (*MultiAgentServer) GetWorkflowExecution added in v1.1.0

GetWorkflowExecution retrieves a specific workflow execution by ID. Returns NotFound if the execution doesn't exist.

func (*MultiAgentServer) GrantToolPermission added in v1.2.0

func (s *MultiAgentServer) GrantToolPermission(permID string, granted bool, message string, remember bool) error

GrantToolPermission provides an answer to a pending tool permission request. This is the counterpart to RequestToolPermission, called by the UI layer when the user makes a decision. It follows the same pattern as AnswerClarificationQuestion.

func (*MultiAgentServer) HealthCheckMCPServers

HealthCheckMCPServers checks health of all MCP servers.

func (*MultiAgentServer) ListAgents

ListAgents implements the gRPC ListAgents RPC method

func (*MultiAgentServer) ListArtifacts

ListArtifacts lists artifacts with optional filtering.

func (*MultiAgentServer) ListAvailableModels

ListAvailableModels lists all LLM models available to this server instance. Uses the provider factory to dynamically determine which models are actually available based on configured credentials and environment variables.

func (*MultiAgentServer) ListComponentTypes added in v1.2.0

ListComponentTypes returns the catalog of available component types for building dynamic apps.

func (*MultiAgentServer) ListMCPServerTools

ListMCPServerTools lists all tools from a specific MCP server. This queries the MCP server directly through the manager, not the agent's tool registry.

func (*MultiAgentServer) ListMCPServers

ListMCPServers lists all configured MCP servers.

func (*MultiAgentServer) ListPatterns added in v1.2.0

ListPatterns retrieves available patterns with optional filtering by domain, category, or search query. Iterates through all agents' pattern libraries and aggregates results.

func (*MultiAgentServer) ListProviders added in v1.2.0

ListProviders lists named providers in the pool configured via SetProviderPool.

func (*MultiAgentServer) ListScheduledWorkflows

ListScheduledWorkflows lists all scheduled workflows. Can optionally filter to only enabled schedules.

func (*MultiAgentServer) ListSessions

ListSessions lists all sessions across all agents.

func (*MultiAgentServer) ListSharedMemoryKeys

ListSharedMemoryKeys lists all keys matching a pattern in a namespace.

func (*MultiAgentServer) ListTools

ListTools lists all registered tools from the default agent. If req.Backend is specified, only tools for that backend are returned.

func (*MultiAgentServer) ListUIApps added in v1.2.0

ListUIApps lists all available UI apps registered with the app provider.

func (*MultiAgentServer) ListWorkflowExecutions added in v1.1.0

ListWorkflowExecutions lists workflow executions with optional filtering. Supports filtering by status and pattern type.

func (*MultiAgentServer) LoadPatterns added in v1.2.0

LoadPatterns loads pattern definitions from a directory for one or all agents. If agent_id is specified, loads patterns for that agent only. If source is provided without agent_id, loads for all agents. If force_reload is true, clears the pattern cache before loading.

func (*MultiAgentServer) PauseSchedule

PauseSchedule pauses a schedule without deleting it. The schedule will remain in the database but will not execute until resumed.

func (*MultiAgentServer) Publish

Publish publishes a message to a topic on the broadcast bus. All subscribers to the topic will receive the message.

func (*MultiAgentServer) PutSharedMemory

PutSharedMemory writes or updates a value in shared memory.

func (*MultiAgentServer) RecordTraceSpan added in v1.2.0

func (s *MultiAgentServer) RecordTraceSpan(span *observability.Span)

RecordTraceSpan records a completed span in the server's local trace store. Call this after ending a span to make it available via the GetTrace RPC.

func (*MultiAgentServer) RegisterServerProgressListener

func (s *MultiAgentServer) RegisterServerProgressListener(agentName string, logger *zap.Logger)

RegisterServerProgressListener creates and registers a progress listener for the server. This listener tracks pending clarification questions for TUI/RPC answer routing.

func (*MultiAgentServer) RegisterTool added in v1.2.0

RegisterTool registers a tool definition with the server's tool registry. The tool becomes available for discovery via SearchTools and can be used by agents.

func (*MultiAgentServer) ReloadAgent added in v1.2.0

ReloadAgent hot-reloads an agent's configuration. If reload_from_file is true, the agent's config is re-read from disk. If config is provided, it replaces the current configuration.

func (*MultiAgentServer) RemoveAgent

func (s *MultiAgentServer) RemoveAgent(id string) error

RemoveAgent removes an agent from the server

func (*MultiAgentServer) RenewCertificate

RenewCertificate manually triggers certificate renewal.

func (*MultiAgentServer) RequestToolPermission added in v1.2.0

RequestToolPermission requests user permission before executing a tool. It creates a pending permission request and waits for a response on a channel, following the same pattern as AnswerClarificationQuestion for HITL workflows.

If no answering mechanism responds within the timeout, the request times out with granted=false, timed_out=true.

func (*MultiAgentServer) RestartMCPServer

RestartMCPServer restarts a running MCP server.

func (*MultiAgentServer) ResumeSchedule

ResumeSchedule resumes a paused schedule. The schedule will start executing again according to its cron expression.

func (*MultiAgentServer) RunMigration added in v1.2.0

RunMigration runs database migrations on the storage backend.

func (*MultiAgentServer) ScheduleWorkflow

ScheduleWorkflow creates a new scheduled workflow via RPC. The schedule will be persisted in SQLite and executed automatically by the cron engine.

func (*MultiAgentServer) SearchArtifacts

SearchArtifacts performs full-text search on artifacts.

func (*MultiAgentServer) SendAndReceive

SendAndReceive sends a request message and waits for a response (RPC-style). This implements synchronous request-response communication via the message queue.

func (*MultiAgentServer) SendAsync

SendAsync sends a fire-and-forget message to another agent via the message queue. If the destination agent is offline, the message is queued for later delivery.

func (*MultiAgentServer) SetAgentRegistry

func (s *MultiAgentServer) SetAgentRegistry(registry *agent.Registry)

SetAgentRegistry injects the agent registry for workflow execution. This should be called after NewMultiAgentServer() to enable ExecuteWorkflow RPC.

func (*MultiAgentServer) SetAppCompiler added in v1.2.0

func (s *MultiAgentServer) SetAppCompiler(c AppCompiler)

SetAppCompiler sets the compiler for CreateUIApp/UpdateUIApp RPCs.

func (*MultiAgentServer) SetAppProvider added in v1.2.0

func (s *MultiAgentServer) SetAppProvider(p AppProvider)

SetAppProvider sets the app provider for ListUIApps/GetUIApp RPCs.

func (*MultiAgentServer) SetArtifactStore

func (s *MultiAgentServer) SetArtifactStore(store artifacts.ArtifactStore)

SetArtifactStore sets the artifact store for file management. This should be called after NewMultiAgentServer() to enable artifact operations.

func (*MultiAgentServer) SetClarificationConfig

func (s *MultiAgentServer) SetClarificationConfig(channelSendTimeoutMs int)

SetClarificationConfig sets the clarification question timeout configuration.

func (*MultiAgentServer) SetEvalStore added in v1.2.0

func (s *MultiAgentServer) SetEvalStore(store *evals.Store)

SetEvalStore configures the evaluation store for persisting ABTest results. It propagates the store to the internal delegate server used for ABTest.

func (*MultiAgentServer) SetJudgeServer added in v1.2.0

func (s *MultiAgentServer) SetJudgeServer(js *JudgeServer)

SetJudgeServer wires the JudgeServer for ABTest judge_id resolution. It propagates the judge server to singleServer if it has already been initialized.

func (*MultiAgentServer) SetLLMConcurrencyLimit

func (s *MultiAgentServer) SetLLMConcurrencyLimit(limit int)

SetLLMConcurrencyLimit configures the maximum number of concurrent LLM calls. This prevents rate limiting by serializing or limiting parallel LLM API calls.

limit = 1: Fully serialized (safest for strict rate limits) limit = 2-5: Some parallelism while staying under quota limit > 5: Higher throughput, higher risk of rate limiting

This should be called after NewMultiAgentServer() but before spawning workflow sub-agents.

func (*MultiAgentServer) SetLogger

func (s *MultiAgentServer) SetLogger(logger *zap.Logger)

SetLogger injects the logger for server operations. This should be called after NewMultiAgentServer() to enable logging.

func (*MultiAgentServer) SetMCPManager

func (s *MultiAgentServer) SetMCPManager(mgr *manager.Manager, configPath string, logger *zap.Logger)

SetMCPManager injects the MCP manager for runtime management. This should be called after NewMultiAgentServer() to enable MCP server management.

func (*MultiAgentServer) SetPatternTracker added in v1.2.0

func (s *MultiAgentServer) SetPatternTracker(tracker *learning.PatternEffectivenessTracker)

SetPatternTracker sets the pattern effectiveness tracker on the server. All existing and future agents (via AddAgent) will have their orchestrators wired to record metrics automatically.

func (*MultiAgentServer) SetProgressMultiplexer

func (s *MultiAgentServer) SetProgressMultiplexer(agentID string, pm *metaagent.ProgressMultiplexer)

SetProgressMultiplexer sets the progress multiplexer for an agent. This enables multi-turn conversations and progress event broadcasting.

func (*MultiAgentServer) SetProviderFactory

func (s *MultiAgentServer) SetProviderFactory(f *factory.ProviderFactory)

SetProviderFactory sets the LLM provider factory for dynamic model switching.

func (*MultiAgentServer) SetProviderPool added in v1.2.0

func (s *MultiAgentServer) SetProviderPool(pool map[string]agent.LLMProvider, active string)

SetProviderPool configures the named provider pool used by ListProviders and ABTest RPCs. It builds an internal *Server wrapping the default agent so that both RPCs can delegate to the same implementation used by the single-agent Server.

func (*MultiAgentServer) SetServerConfig added in v1.2.0

func (s *MultiAgentServer) SetServerConfig(config *loomv1.ServerConfig)

SetServerConfig sets the server configuration (network, metadata, etc.). This should be called after NewMultiAgentServer() to populate GetServerConfig responses.

func (*MultiAgentServer) SetStorageBackend added in v1.2.0

func (s *MultiAgentServer) SetStorageBackend(sb backend.StorageBackend)

SetStorageBackend sets the storage backend for health checks and migration RPCs.

func (*MultiAgentServer) SetStorageBackendType added in v1.2.0

func (s *MultiAgentServer) SetStorageBackendType(backendType loomv1.StorageBackendType)

SetStorageBackendType sets the storage backend type for health status reporting.

func (*MultiAgentServer) SetToolRegistry

func (s *MultiAgentServer) SetToolRegistry(registry *toolregistry.Registry)

SetToolRegistry injects the tool registry for dynamic tool discovery. This should be called after NewMultiAgentServer() to enable tool_search indexing of MCP tools.

func (*MultiAgentServer) SetTraceStore added in v1.2.0

func (s *MultiAgentServer) SetTraceStore(maxAge time.Duration)

SetTraceStore initializes the server's local trace store for GetTrace RPC support. This should be called after NewMultiAgentServer() to enable trace retrieval.

func (*MultiAgentServer) SetTracer added in v1.1.0

func (s *MultiAgentServer) SetTracer(tracer observability.Tracer)

SetTracer injects an observability tracer for workflow and agent tracing. This should be called after NewMultiAgentServer() to enable observability. Also ensures traceStoreLocal is initialized so GetTrace RPC works.

func (*MultiAgentServer) SharedMemoryStore

func (s *MultiAgentServer) SharedMemoryStore() *storage.SharedMemoryStore

GetSharedMemory returns the shared memory store (for testing/inspection).

func (*MultiAgentServer) Shutdown

func (s *MultiAgentServer) Shutdown(ctx context.Context) error

Shutdown gracefully shuts down the server, closing all pending question channels. This should be called during server shutdown to notify waiting agents that no more answers will be received.

func (*MultiAgentServer) SpawnSubAgent added in v1.1.0

SpawnSubAgent spawns a new agent as a child of the current session. This implements the builtin.SpawnHandler interface.

func (*MultiAgentServer) StartAgent added in v1.2.0

StartAgent starts a stopped agent, making it available for requests.

func (*MultiAgentServer) StartHotReload

func (s *MultiAgentServer) StartHotReload(ctx context.Context, logger *zap.Logger) error

StartHotReload initializes hot-reload watchers for all agents with pattern directories. Hot-reload events are automatically broadcast to clients via StreamPatternUpdates.

func (*MultiAgentServer) StartMessageQueueMonitor

func (s *MultiAgentServer) StartMessageQueueMonitor(ctx context.Context)

StartMessageQueueMonitor starts a background goroutine that monitors the message queue and notifies workflow sub-agents when they have pending messages (event-driven, not polling).

func (*MultiAgentServer) StopAgent added in v1.2.0

StopAgent stops a running agent. The agent remains registered but will not process requests.

func (*MultiAgentServer) StopHotReload

func (s *MultiAgentServer) StopHotReload() error

StopHotReload stops all hot-reload watchers. Should be called during server shutdown.

func (*MultiAgentServer) StreamPatternUpdates

StreamPatternUpdates streams pattern update events to clients in real-time. Used by TUI and other clients to show live pattern changes without polling.

func (*MultiAgentServer) StreamWeave

StreamWeave streams agent execution progress.

func (*MultiAgentServer) StreamWorkflow

StreamWorkflow executes a workflow and streams progress updates to the client. This provides real-time feedback during long-running multi-agent workflows.

func (*MultiAgentServer) Subscribe

Subscribe creates a subscription to a topic and streams messages back to the client. The stream will remain open until the client closes it or an error occurs.

func (*MultiAgentServer) SubscribeToSession

SubscribeToSession subscribes to real-time updates for a session. Streams updates when new messages arrive in the session conversation. This allows clients to receive asynchronous responses from workflow coordinators and sub-agents without polling.

func (*MultiAgentServer) SwitchModel

SwitchModel switches the LLM provider/model for a specific agent. The agent is identified by agent_id in the request.

func (*MultiAgentServer) TestMCPServerConnection

TestMCPServerConnection tests an MCP server configuration without persisting it. This allows users to validate their configuration before saving.

func (*MultiAgentServer) TriggerScheduledWorkflow

TriggerScheduledWorkflow manually triggers a scheduled workflow immediately. This bypasses the cron schedule and executes the workflow right away.

func (*MultiAgentServer) UpdateAgent

func (s *MultiAgentServer) UpdateAgent(id string, ag *agent.Agent) error

UpdateAgent replaces an existing agent with a new instance (for hot-reload). The new agent will inherit shared memory and communication configuration if set.

func (*MultiAgentServer) UpdateMCPServer

UpdateMCPServer updates an existing MCP server configuration.

func (*MultiAgentServer) UpdateScheduledWorkflow

UpdateScheduledWorkflow updates an existing scheduled workflow. YAML-sourced schedules cannot be updated via RPC - they must be modified by editing the YAML file.

func (*MultiAgentServer) UpdateUIApp added in v1.2.0

UpdateUIApp updates an existing dynamic app's spec.

func (*MultiAgentServer) UploadArtifact

UploadArtifact uploads a file to artifacts storage.

func (*MultiAgentServer) WatchSharedMemory

WatchSharedMemory watches for changes to keys in a namespace and streams updates.

func (*MultiAgentServer) Weave

Weave executes a user query using the specified agent.

type PatternEventBroadcaster

type PatternEventBroadcaster struct {
	// contains filtered or unexported fields
}

PatternEventBroadcaster broadcasts pattern update events to multiple subscribers. Thread-safe for concurrent subscribe/unsubscribe/broadcast operations.

func NewPatternEventBroadcaster

func NewPatternEventBroadcaster() *PatternEventBroadcaster

NewPatternEventBroadcaster creates a new pattern event broadcaster.

func (*PatternEventBroadcaster) Broadcast

func (b *PatternEventBroadcaster) Broadcast(event *loomv1.PatternUpdateEvent)

Broadcast sends an event to all subscribers. Non-blocking: if a subscriber's channel is full, the event is dropped for that subscriber.

func (*PatternEventBroadcaster) BroadcastPatternCreated

func (b *PatternEventBroadcaster) BroadcastPatternCreated(agentID, patternName, category, filePath string)

BroadcastPatternCreated broadcasts a pattern creation event.

func (*PatternEventBroadcaster) BroadcastPatternDeleted

func (b *PatternEventBroadcaster) BroadcastPatternDeleted(agentID, patternName, category string)

BroadcastPatternDeleted broadcasts a pattern deletion event.

func (*PatternEventBroadcaster) BroadcastPatternModified

func (b *PatternEventBroadcaster) BroadcastPatternModified(agentID, patternName, category, filePath string)

BroadcastPatternModified broadcasts a pattern modification event.

func (*PatternEventBroadcaster) BroadcastPatternValidationFailed

func (b *PatternEventBroadcaster) BroadcastPatternValidationFailed(agentID, patternName, errorMsg string)

BroadcastPatternValidationFailed broadcasts a pattern validation failure event.

func (*PatternEventBroadcaster) Close

func (b *PatternEventBroadcaster) Close()

Close closes all subscriber channels.

func (*PatternEventBroadcaster) Subscribe

func (b *PatternEventBroadcaster) Subscribe() chan *loomv1.PatternUpdateEvent

Subscribe registers a new subscriber and returns a channel for receiving events. The channel buffer size is 100 to handle bursts of events. Caller must call Unsubscribe() to clean up.

func (*PatternEventBroadcaster) Unsubscribe

func (b *PatternEventBroadcaster) Unsubscribe(ch chan *loomv1.PatternUpdateEvent)

Unsubscribe removes a subscriber and closes their channel.

type PubSubEvent added in v1.1.0

type PubSubEvent struct {
	Type      string    // "agent_message"
	Topic     string    // Topic name
	FromAgent string    // Agent ID that sent the message
	ToAgents  int       // Number of agents that received the message
	Content   string    // Message content
	Timestamp time.Time // When the message was published
}

PubSubEvent represents a pub/sub message event for SSE streaming

type Server

type Server struct {
	loomv1.UnimplementedLoomServiceServer
	// contains filtered or unexported fields
}

Server implements the LoomService gRPC server.

func NewServer

func NewServer(ag *agent.Agent, store agent.SessionStorage) *Server

NewServer creates a new LoomService server.

func (*Server) ABTest added in v1.2.0

ABTest runs an A/B test across multiple named providers.

func (*Server) CreateSession

func (s *Server) CreateSession(ctx context.Context, req *loomv1.CreateSessionRequest) (*loomv1.Session, error)

CreateSession creates a new conversation session.

func (*Server) DeleteSession

DeleteSession deletes a session.

func (*Server) GetConversationHistory

func (s *Server) GetConversationHistory(ctx context.Context, req *loomv1.GetConversationHistoryRequest) (*loomv1.ConversationHistory, error)

GetConversationHistory retrieves conversation history.

func (*Server) GetHealth

func (s *Server) GetHealth(ctx context.Context, req *loomv1.GetHealthRequest) (*loomv1.HealthStatus, error)

GetHealth performs a health check by pinging each configured LLM provider. Returns per-component status in the components map with keys like "llm.agent", "llm.judge", etc. Overall status is "healthy" if all pass, "degraded" if some fail, "unhealthy" if all fail.

func (*Server) GetPattern

func (s *Server) GetPattern(ctx context.Context, req *loomv1.GetPatternRequest) (*loomv1.Pattern, error)

GetPattern retrieves a specific pattern.

func (*Server) GetProviderPool added in v1.2.0

func (s *Server) GetProviderPool() map[string]agent.LLMProvider

GetProviderPool returns the server's named provider pool (may be nil).

func (*Server) GetSession

func (s *Server) GetSession(ctx context.Context, req *loomv1.GetSessionRequest) (*loomv1.Session, error)

GetSession retrieves session details.

func (*Server) GetTrace

func (s *Server) GetTrace(ctx context.Context, req *loomv1.GetTraceRequest) (*loomv1.Trace, error)

GetTrace retrieves execution trace.

func (*Server) ListAvailableModels

ListAvailableModels lists all available LLM models/providers.

func (*Server) ListAvailableModelsLegacy

func (s *Server) ListAvailableModelsLegacy(ctx context.Context, req *loomv1.ListAvailableModelsRequest) (*loomv1.ListAvailableModelsResponse, error)

ListAvailableModelsLegacy is the old static implementation (deprecated).

func (*Server) ListPatterns

ListPatterns lists available patterns.

func (*Server) ListProviders added in v1.2.0

ListProviders lists named providers in the pool.

func (*Server) ListSessions

ListSessions lists all sessions.

func (*Server) ListTools

ListTools lists all registered tools. If req.Backend is specified, only tools for that backend are returned.

func (*Server) LoadPatterns

LoadPatterns loads pattern definitions.

func (*Server) RegisterTool

RegisterTool dynamically registers a new tool.

func (*Server) RequestToolPermission

func (s *Server) RequestToolPermission(ctx context.Context, req *loomv1.ToolPermissionRequest) (*loomv1.ToolPermissionResponse, error)

RequestToolPermission requests user permission to execute a tool.

func (*Server) SetActiveProviderName added in v1.2.0

func (s *Server) SetActiveProviderName(name string)

SetActiveProviderName updates the tracked active provider name.

func (*Server) SetEvalStore added in v1.2.0

func (s *Server) SetEvalStore(store *evals.Store)

SetEvalStore configures the evaluation store for persisting ABTest results.

func (*Server) SetJudgeServer added in v1.2.0

func (s *Server) SetJudgeServer(js *JudgeServer)

SetJudgeServer wires an in-memory JudgeServer so that ABTest can resolve req.JudgeId.

func (*Server) SetProviderFactory

func (s *Server) SetProviderFactory(f *factory.ProviderFactory)

SetProviderFactory sets the LLM provider factory for dynamic model switching.

func (*Server) SetProviderPool added in v1.2.0

func (s *Server) SetProviderPool(pool map[string]agent.LLMProvider, active string)

SetProviderPool configures the server's named provider pool.

func (*Server) StreamWeave

func (s *Server) StreamWeave(req *loomv1.WeaveRequest, stream loomv1.LoomService_StreamWeaveServer) error

StreamWeave streams agent execution progress.

func (*Server) SwitchModel

SwitchModel switches the LLM model/provider for a session.

func (*Server) Weave

Weave executes a user query using the agent.

type UserIDConfig added in v1.2.0

type UserIDConfig struct {
	// RequireUserID when true returns Unauthenticated if X-User-ID is missing.
	RequireUserID bool

	// DefaultUserID is used when RequireUserID is false and no header is present.
	// Falls back to "default-user" if empty.
	DefaultUserID string

	// Logger is used for audit logging of user ID extraction. If nil, a no-op
	// logger is used.
	Logger *zap.Logger
}

UserIDConfig controls the behavior of the user ID interceptors.

type WorkflowExecution

type WorkflowExecution struct {
	ExecutionID string
	Pattern     *loomv1.WorkflowPattern
	Result      *loomv1.WorkflowResult
	StartTime   time.Time
	EndTime     time.Time
	Status      WorkflowStatus
	Error       string // Error message if failed
}

WorkflowExecution represents a single workflow execution with its metadata and results.

type WorkflowStatus

type WorkflowStatus string

WorkflowStatus represents the execution state.

const (
	WorkflowStatusRunning   WorkflowStatus = "running"
	WorkflowStatusCompleted WorkflowStatus = "completed"
	WorkflowStatusFailed    WorkflowStatus = "failed"
	WorkflowStatusCanceled  WorkflowStatus = "canceled"
)

type WorkflowStore

type WorkflowStore struct {
	// contains filtered or unexported fields
}

WorkflowStore persists workflow execution history in memory. In production, this could be backed by a database.

func NewWorkflowStore

func NewWorkflowStore() *WorkflowStore

NewWorkflowStore creates a new workflow execution store.

func (*WorkflowStore) Count

func (s *WorkflowStore) Count() int

Count returns the total number of workflow executions.

func (*WorkflowStore) Delete

func (s *WorkflowStore) Delete(executionID string) error

Delete removes a workflow execution from the store.

func (*WorkflowStore) Get

func (s *WorkflowStore) Get(executionID string) (*WorkflowExecution, error)

Get retrieves a workflow execution by ID.

func (*WorkflowStore) List

func (s *WorkflowStore) List(status WorkflowStatus) []*WorkflowExecution

List returns all workflow executions, optionally filtered by status.

func (*WorkflowStore) Store

func (s *WorkflowStore) Store(exec *WorkflowExecution)

Store persists a workflow execution record.

func (*WorkflowStore) StoreResult

func (s *WorkflowStore) StoreResult(executionID string, result *loomv1.WorkflowResult) error

StoreResult updates the result of a completed workflow execution.

func (*WorkflowStore) UpdateStatus

func (s *WorkflowStore) UpdateStatus(executionID string, status WorkflowStatus, errorMsg string) error

UpdateStatus updates the status of a workflow execution.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL