Versions in this module Expand all Collapse all v1 v1.0.0 Sep 13, 2025 Changes in this version + const ErrCodeCircuitBreakerOpen + const ErrCodeCircuitBreakerTimeout + const ErrCodeDuplicatePluginName + const ErrCodeExecTransportError + const ErrCodeGRPCTransportError + const ErrCodeHTTPTransportError + const ErrCodeHealthCheckFailed + const ErrCodeHealthCheckTimeout + const ErrCodeInvalidEndpointFormat + const ErrCodeInvalidEndpointURL + const ErrCodeInvalidJSONConfig + const ErrCodeInvalidPluginName + const ErrCodeInvalidTransport + const ErrCodeLoadBalancerFailed + const ErrCodeMissingAPIKey + const ErrCodeMissingBasicCredentials + const ErrCodeMissingBearerToken + const ErrCodeMissingEndpoint + const ErrCodeMissingExecutable + const ErrCodeMissingMTLSCerts + const ErrCodeMissingSocketPath + const ErrCodeNoAvailablePlugins + const ErrCodeNoPluginsConfigured + const ErrCodePluginConnectionFailed + const ErrCodePluginExecutionFailed + const ErrCodePluginNotEnabled + const ErrCodePluginNotFound + const ErrCodePluginTimeout + const ErrCodeRateLimitExceeded + const ErrCodeUnixTransportError + const ErrCodeUnsupportedAuthMethod + const ErrCodeUnsupportedTransport + func NewAuthConfigValidationError(cause error) *errors.Error + func NewCircuitBreakerOpenError(pluginName string) *errors.Error + func NewCircuitBreakerTimeoutError(pluginName string, timeout interface{}) *errors.Error + func NewDuplicatePluginNameError(name string) *errors.Error + func NewExecTransportError(cause error) *errors.Error + func NewGRPCTransportError(cause error) *errors.Error + func NewHTTPTransportError(cause error) *errors.Error + func NewHealthCheckFailedError(pluginName string, cause error) *errors.Error + func NewHealthCheckTimeoutError(pluginName string, timeout interface{}) *errors.Error + func NewInvalidEndpointFormatError() *errors.Error + func NewInvalidEndpointURLError(endpoint string, cause error) *errors.Error + func NewInvalidJSONConfigError(cause error) *errors.Error + func NewInvalidPluginNameError(name string) *errors.Error + func NewInvalidTransportError() *errors.Error + func NewLoadBalancerFailedError(cause error) *errors.Error + func NewMissingAPIKeyError() *errors.Error + func NewMissingBasicCredentialsError() *errors.Error + func NewMissingBearerTokenError() *errors.Error + func NewMissingEndpointError(transport TransportType) *errors.Error + func NewMissingExecutableError() *errors.Error + func NewMissingMTLSCertsError() *errors.Error + func NewMissingSocketPathError() *errors.Error + func NewNoAvailablePluginsError(pluginType string) *errors.Error + func NewNoPluginsConfiguredError() *errors.Error + func NewPluginConnectionFailedError(name string, cause error) *errors.Error + func NewPluginExecutionFailedError(name string, cause error) *errors.Error + func NewPluginNotEnabledError(name string) *errors.Error + func NewPluginNotFoundError(name string) *errors.Error + func NewPluginTimeoutError(name string, timeout interface{}) *errors.Error + func NewPluginValidationError(pluginIndex int, cause error) *errors.Error + func NewRateLimitExceededError(pluginName string, limit interface{}) *errors.Error + func NewUnixTransportError(cause error) *errors.Error + func NewUnsupportedAuthMethodError(method AuthMethod) *errors.Error + func NewUnsupportedTransportError(transport TransportType) *errors.Error + type AuthConfig struct + APIKey string + CAFile string + CertFile string + Headers map[string]string + KeyFile string + Method AuthMethod + Password string + Token string + Username string + func (ac *AuthConfig) Validate() error + type AuthMethod string + const AuthAPIKey + const AuthBasic + const AuthBearer + const AuthCustom + const AuthMTLS + const AuthNone + type CircuitBreaker struct + func NewCircuitBreaker(config CircuitBreakerConfig) *CircuitBreaker + func (cb *CircuitBreaker) AllowRequest() bool + func (cb *CircuitBreaker) GetState() CircuitBreakerState + func (cb *CircuitBreaker) GetStats() CircuitBreakerStats + func (cb *CircuitBreaker) RecordFailure() + func (cb *CircuitBreaker) RecordSuccess() + func (cb *CircuitBreaker) Reset() + type CircuitBreakerConfig struct + Enabled bool + FailureThreshold int + MinRequestThreshold int + RecoveryTimeout time.Duration + SuccessThreshold int + type CircuitBreakerState int32 + const StateClosed + const StateHalfOpen + const StateOpen + func (s CircuitBreakerState) String() string + type CircuitBreakerStats struct + FailureCount int64 + LastFailure time.Time + RequestCount int64 + State CircuitBreakerState + SuccessCount int64 + type CommonPluginMetrics struct + ActiveRequests GaugeMetric + CircuitBreakerState GaugeMetric + ErrorCount CounterMetric + RequestCount CounterMetric + RequestDuration HistogramMetric + func CreateCommonPluginMetrics(collector EnhancedMetricsCollector) *CommonPluginMetrics + func (cpm *CommonPluginMetrics) DecrementActiveRequests(pluginName string) + func (cpm *CommonPluginMetrics) IncrementActiveRequests(pluginName string) + func (cpm *CommonPluginMetrics) RecordRequest(pluginName string, duration time.Duration, err error) + func (cpm *CommonPluginMetrics) SetCircuitBreakerState(pluginName string, state int) + type ConnectionConfig struct + ConnectionTimeout time.Duration + DisableCompression bool + IdleTimeout time.Duration + KeepAlive bool + MaxConnections int + MaxIdleConnections int + RequestTimeout time.Duration + type CounterMetric interface + Add func(value float64, labelValues ...string) + Inc func(labelValues ...string) + type DefaultEnhancedMetricsCollector struct + func NewDefaultEnhancedMetricsCollector() *DefaultEnhancedMetricsCollector + func (demc *DefaultEnhancedMetricsCollector) CounterWithLabels(name, description string, labelNames ...string) CounterMetric + func (demc *DefaultEnhancedMetricsCollector) GaugeWithLabels(name, description string, labelNames ...string) GaugeMetric + func (demc *DefaultEnhancedMetricsCollector) GetPrometheusMetrics() []PrometheusMetric + func (demc *DefaultEnhancedMetricsCollector) HistogramWithLabels(name, description string, buckets []float64, labelNames ...string) HistogramMetric + type DefaultMetricsCollector struct + func NewDefaultMetricsCollector() *DefaultMetricsCollector + func (dmc *DefaultMetricsCollector) GetMetrics() map[string]interface{} + func (dmc *DefaultMetricsCollector) IncrementCounter(name string, labels map[string]string, value int64) + func (dmc *DefaultMetricsCollector) RecordCustomMetric(name string, labels map[string]string, value interface{}) + func (dmc *DefaultMetricsCollector) RecordHistogram(name string, labels map[string]string, value float64) + func (dmc *DefaultMetricsCollector) SetGauge(name string, labels map[string]string, value float64) + type DiscoveryConfig struct + Directories []string + Enabled bool + Patterns []string + WatchMode bool + type EnhancedMetricsCollector interface + CounterWithLabels func(name, description string, labelNames ...string) CounterMetric + GaugeWithLabels func(name, description string, labelNames ...string) GaugeMetric + GetPrometheusMetrics func() []PrometheusMetric + HistogramWithLabels func(name, description string, buckets []float64, labelNames ...string) HistogramMetric + func MigrateToEnhancedMetrics(legacy MetricsCollector) EnhancedMetricsCollector + func NewEnhancedMetricsCollector() EnhancedMetricsCollector + type ExecutionContext struct + Headers map[string]string + MaxRetries int + Metadata map[string]string + RequestID string + Timeout time.Duration + type GRPCPlugin struct + func NewGRPCPlugin[Req, Resp any](config PluginConfig, logger *slog.Logger) (*GRPCPlugin[Req, Resp], error) + func (g *GRPCPlugin[Req, Resp]) Close() error + func (g *GRPCPlugin[Req, Resp]) Execute(ctx context.Context, execCtx ExecutionContext, request Req) (Resp, error) + func (g *GRPCPlugin[Req, Resp]) Health(ctx context.Context) HealthStatus + func (g *GRPCPlugin[Req, Resp]) Info() PluginInfo + type GRPCPluginFactory struct + func NewGRPCPluginFactory[Req, Resp any](logger *slog.Logger) *GRPCPluginFactory[Req, Resp] + func (f *GRPCPluginFactory[Req, Resp]) CreatePlugin(config PluginConfig) (Plugin[Req, Resp], error) + func (f *GRPCPluginFactory[Req, Resp]) SupportedTransports() []string + func (f *GRPCPluginFactory[Req, Resp]) ValidateConfig(config PluginConfig) error + type GRPCPluginService interface + Execute func(ctx context.Context, request []byte) ([]byte, error) + Health func(ctx context.Context) error + Info func(ctx context.Context) ([]byte, error) + type GRPCPluginServiceClient struct + func NewGRPCPluginServiceClient(conn *grpc.ClientConn) *GRPCPluginServiceClient + func (c *GRPCPluginServiceClient) Execute(ctx context.Context, request []byte) ([]byte, error) + func (c *GRPCPluginServiceClient) Health(ctx context.Context) error + func (c *GRPCPluginServiceClient) Info(ctx context.Context) ([]byte, error) + type GaugeMetric interface + Add func(value float64, labelValues ...string) + Dec func(labelValues ...string) + Inc func(labelValues ...string) + Set func(value float64, labelValues ...string) + type GlobalMetrics struct + ActiveRequests int64 + TotalErrors int64 + TotalRequests int64 + type HTTPPlugin struct + func (p *HTTPPlugin[Req, Resp]) Close() error + func (p *HTTPPlugin[Req, Resp]) Execute(ctx context.Context, execCtx ExecutionContext, request Req) (Resp, error) + func (p *HTTPPlugin[Req, Resp]) Health(ctx context.Context) HealthStatus + func (p *HTTPPlugin[Req, Resp]) Info() PluginInfo + type HTTPPluginFactory struct + func NewHTTPPluginFactory[Req, Resp any]() *HTTPPluginFactory[Req, Resp] + func (f *HTTPPluginFactory[Req, Resp]) CreatePlugin(config PluginConfig) (Plugin[Req, Resp], error) + func (f *HTTPPluginFactory[Req, Resp]) SupportedTransports() []string + func (f *HTTPPluginFactory[Req, Resp]) ValidateConfig(config PluginConfig) error + type HTTPPluginRequest struct + Data T + Headers map[string]string + Metadata map[string]string + RequestID string + Timeout string + type HTTPPluginResponse struct + Data T + Error string + Metadata map[string]string + RequestID string + type HealthCheckConfig struct + Enabled bool + Endpoint string + FailureLimit int + Interval time.Duration + Timeout time.Duration + type HealthChecker struct + func NewHealthChecker(plugin interface{ ... }, config HealthCheckConfig) *HealthChecker + func (hc *HealthChecker) Check() HealthStatus + func (hc *HealthChecker) Done() <-chan struct{} + func (hc *HealthChecker) GetConsecutiveFailures() int64 + func (hc *HealthChecker) GetLastCheck() time.Time + func (hc *HealthChecker) IsRunning() bool + func (hc *HealthChecker) Start() + func (hc *HealthChecker) Stop() + type HealthMonitor struct + func NewHealthMonitor() *HealthMonitor + func (hm *HealthMonitor) AddChecker(name string, checker *HealthChecker) + func (hm *HealthMonitor) GetAllStatus() map[string]HealthStatus + func (hm *HealthMonitor) GetOverallHealth() HealthStatus + func (hm *HealthMonitor) GetStatus(name string) (HealthStatus, bool) + func (hm *HealthMonitor) RemoveChecker(name string) + func (hm *HealthMonitor) Shutdown() + func (hm *HealthMonitor) UpdateStatus(name string, status HealthStatus) + type HealthStatus struct + LastCheck time.Time + Message string + Metadata map[string]string + ResponseTime time.Duration + Status PluginStatus + type HistogramMetric interface + Observe func(value float64, labelValues ...string) + type LoadBalanceRequest struct + Key string + Preferences []string + Priority int + RequestID string + type LoadBalancer struct + func NewLoadBalancer[Req, Resp any](strategy LoadBalancingStrategy, logger *slog.Logger) *LoadBalancer[Req, Resp] + func (lb *LoadBalancer[Req, Resp]) AddPlugin(name string, plugin Plugin[Req, Resp], weight, priority int) error + func (lb *LoadBalancer[Req, Resp]) DisablePlugin(name string) error + func (lb *LoadBalancer[Req, Resp]) EnablePlugin(name string) error + func (lb *LoadBalancer[Req, Resp]) Execute(ctx context.Context, execCtx ExecutionContext, lbReq LoadBalanceRequest, ...) (Resp, error) + func (lb *LoadBalancer[Req, Resp]) GetStats() map[string]LoadBalancerStats + func (lb *LoadBalancer[Req, Resp]) RemovePlugin(name string) error + func (lb *LoadBalancer[Req, Resp]) SelectPlugin(lbReq LoadBalanceRequest) (string, Plugin[Req, Resp], error) + type LoadBalancerStats struct + ActiveConnections int32 + AverageLatency time.Duration + Enabled bool + FailedRequests int64 + HealthScore int32 + LastUsed time.Time + PluginName string + Priority int + SuccessfulRequests int64 + TotalRequests int64 + Weight int + func (stats LoadBalancerStats) GetSuccessRate() float64 + type LoadBalancingStrategy string + const StrategyConsistentHash + const StrategyLeastConnections + const StrategyLeastLatency + const StrategyPriority + const StrategyRandom + const StrategyRoundRobin + const StrategyWeightedRandom + type Manager struct + func NewManager[Req, Resp any](logger *slog.Logger) *Manager[Req, Resp] + func (m *Manager[Req, Resp]) Execute(ctx context.Context, pluginName string, request Req) (Resp, error) + func (m *Manager[Req, Resp]) ExecuteWithOptions(ctx context.Context, pluginName string, execCtx ExecutionContext, request Req) (Resp, error) + func (m *Manager[Req, Resp]) GetMetrics() ManagerMetrics + func (m *Manager[Req, Resp]) GetPlugin(name string) (Plugin[Req, Resp], error) + func (m *Manager[Req, Resp]) Health() map[string]HealthStatus + func (m *Manager[Req, Resp]) ListPlugins() map[string]HealthStatus + func (m *Manager[Req, Resp]) LoadFromConfig(config ManagerConfig) error + func (m *Manager[Req, Resp]) Register(plugin Plugin[Req, Resp]) error + func (m *Manager[Req, Resp]) RegisterFactory(pluginType string, factory PluginFactory[Req, Resp]) error + func (m *Manager[Req, Resp]) ReloadConfig(config ManagerConfig) error + func (m *Manager[Req, Resp]) ReloadConfigWithStrategy(config ManagerConfig, strategy ReloadStrategy) error + func (m *Manager[Req, Resp]) Shutdown(ctx context.Context) error + func (m *Manager[Req, Resp]) Unregister(name string) error + type ManagerConfig struct + DefaultCircuitBreaker CircuitBreakerConfig + DefaultConnection ConnectionConfig + DefaultHealthCheck HealthCheckConfig + DefaultRateLimit RateLimitConfig + DefaultRetry RetryConfig + Discovery DiscoveryConfig + LogLevel string + MetricsPort int + Plugins []PluginConfig + func GetDefaultManagerConfig() ManagerConfig + func (mc *ManagerConfig) ApplyDefaults() + func (mc *ManagerConfig) FromJSON(data []byte) error + func (mc *ManagerConfig) ToJSON() ([]byte, error) + func (mc *ManagerConfig) Validate() error + type ManagerMetrics struct + CircuitBreakerTrips atomic.Int64 + HealthCheckFailures atomic.Int64 + RequestDuration atomic.Int64 + RequestsFailure atomic.Int64 + RequestsSuccess atomic.Int64 + RequestsTotal atomic.Int64 + type MetricsCollector interface + GetMetrics func() map[string]interface{} + IncrementCounter func(name string, labels map[string]string, value int64) + RecordCustomMetric func(name string, labels map[string]string, value interface{}) + RecordHistogram func(name string, labels map[string]string, value float64) + SetGauge func(name string, labels map[string]string, value float64) + type ObservabilityConfig struct + EnhancedMetricsCollector EnhancedMetricsCollector + ErrorMetrics bool + HealthMetrics bool + LogLevel string + LoggingEnabled bool + MetricsCollector MetricsCollector + MetricsEnabled bool + MetricsPrefix string + PerformanceMetrics bool + StructuredLogging bool + TracingEnabled bool + TracingProvider TracingProvider + TracingSampleRate float64 + func DefaultObservabilityConfig() ObservabilityConfig + func EnhancedObservabilityConfig() ObservabilityConfig + type ObservabilityReport struct + GeneratedAt time.Time + Global GlobalMetrics + Plugins map[string]PluginMetricsReport + UpTime time.Duration + type ObservableManager struct + func NewObservableManager[Req, Resp any](baseManager *Manager[Req, Resp], config ObservabilityConfig) *ObservableManager[Req, Resp] + func (om *ObservableManager[Req, Resp]) ExecuteWithObservability(ctx context.Context, pluginName string, execCtx ExecutionContext, request Req) (Resp, error) + func (om *ObservableManager[Req, Resp]) GetObservabilityMetrics() ObservabilityReport + type Plugin interface + Close func() error + Execute func(ctx context.Context, execCtx ExecutionContext, request Req) (Resp, error) + Health func(ctx context.Context) HealthStatus + Info func() PluginInfo + type PluginConfig struct + Annotations map[string]string + Args []string + Auth AuthConfig + CircuitBreaker CircuitBreakerConfig + Connection ConnectionConfig + Enabled bool + Endpoint string + Env []string + Executable string + HealthCheck HealthCheckConfig + Labels map[string]string + Name string + Options map[string]interface{} + Priority int + RateLimit RateLimitConfig + Retry RetryConfig + Transport TransportType + Type string + WorkDir string + func (pc *PluginConfig) Validate() error + type PluginDiff struct + Added []PluginConfig + Removed []string + Unchanged []string + Updated []PluginUpdate + type PluginFactory interface + CreatePlugin func(config PluginConfig) (Plugin[Req, Resp], error) + SupportedTransports func() []string + ValidateConfig func(config PluginConfig) error + type PluginInfo struct + Author string + Capabilities []string + Description string + Metadata map[string]string + Name string + Version string + type PluginLoadMetrics struct + ActiveConnections atomic.Int32 + AverageLatency atomic.Int64 + FailedRequests atomic.Int64 + HealthScore atomic.Int32 + LastLatency atomic.Int64 + LastUpdate atomic.Int64 + SuccessfulRequests atomic.Int64 + TotalRequests atomic.Int64 + type PluginManager interface + Execute func(ctx context.Context, pluginName string, request Req) (Resp, error) + ExecuteWithOptions func(ctx context.Context, pluginName string, execCtx ExecutionContext, request Req) (Resp, error) + GetPlugin func(name string) (Plugin[Req, Resp], error) + Health func() map[string]HealthStatus + ListPlugins func() map[string]HealthStatus + LoadFromConfig func(config ManagerConfig) error + Register func(plugin Plugin[Req, Resp]) error + ReloadConfig func(config ManagerConfig) error + Shutdown func(ctx context.Context) error + Unregister func(name string) error + type PluginMetricsReport struct + ActiveRequests int64 + AuthErrors int64 + AvgLatency time.Duration + CircuitBreakerTrips int64 + ConnectionErrors int64 + FailedRequests int64 + HealthCheckFailed int64 + HealthCheckTotal int64 + MaxLatency time.Duration + MinLatency time.Duration + OtherErrors int64 + SuccessRate float64 + SuccessfulRequests int64 + TimeoutErrors int64 + TotalRequests int64 + type PluginObservabilityMetrics struct + ActiveRequests *atomic.Int64 + AuthErrors *atomic.Int64 + AvgLatency *atomic.Int64 + CircuitBreakerState string + CircuitBreakerTrips *atomic.Int64 + ConnectionErrors *atomic.Int64 + FailedRequests *atomic.Int64 + HealthCheckFailed *atomic.Int64 + HealthCheckTotal *atomic.Int64 + HealthStatus string + LastHealthCheck *atomic.Int64 + MaxLatency *atomic.Int64 + MinLatency *atomic.Int64 + OtherErrors *atomic.Int64 + SuccessfulRequests *atomic.Int64 + TimeoutErrors *atomic.Int64 + TotalLatency *atomic.Int64 + TotalRequests *atomic.Int64 + type PluginReloader struct + func NewPluginReloader[Req, Resp any](manager *Manager[Req, Resp], options ReloadOptions, logger *slog.Logger) *PluginReloader[Req, Resp] + func (r *PluginReloader[Req, Resp]) ReloadWithIntelligentDiff(ctx context.Context, newConfig ManagerConfig) error + type PluginStatus int + const StatusDegraded + const StatusHealthy + const StatusOffline + const StatusUnhealthy + const StatusUnknown + func (s PluginStatus) String() string + type PluginUpdate struct + Changes []string + Name string + NewConfig PluginConfig + OldConfig PluginConfig + type PluginWrapper struct + Active *atomic.Int32 + Enabled *atomic.Bool + LastUsed *atomic.Int64 + Plugin Plugin[Req, Resp] + Priority int + Weight int + type PrometheusBucket struct + Count uint64 + UpperBound float64 + type PrometheusMetric struct + Buckets []PrometheusBucket + Description string + Labels map[string]string + Name string + Type string + Value float64 + type RateLimitConfig struct + BurstSize int + Enabled bool + RequestsPerSecond float64 + TimeWindow time.Duration + type RateLimiter struct + func NewRateLimiter(config RateLimitConfig) *RateLimiter + func (rl *RateLimiter) Allow() bool + type ReloadOptions struct + DrainTimeout time.Duration + GracefulTimeout time.Duration + HealthCheckTimeout time.Duration + MaxConcurrentReloads int + RollbackOnFailure bool + Strategy ReloadStrategy + func DefaultReloadOptions() ReloadOptions + type ReloadStrategy string + const ReloadStrategyGraceful + const ReloadStrategyRecreate + const ReloadStrategyRolling + type RetryConfig struct + InitialInterval time.Duration + MaxInterval time.Duration + MaxRetries int + Multiplier float64 + RandomJitter bool + type Span interface + Context func() interface{} + Finish func() + SetAttribute func(key string, value interface{}) + SetStatus func(code SpanStatusCode, message string) + type SpanStatusCode int + const SpanStatusError + const SpanStatusOK + const SpanStatusTimeout + type TracingProvider interface + ExtractContext func(headers map[string]string) context.Context + InjectContext func(ctx context.Context) map[string]string + StartSpan func(ctx context.Context, operationName string) (context.Context, Span) + type TransportType string + const TransportExecutable + const TransportGRPC + const TransportGRPCTLS + const TransportHTTP + const TransportHTTPS + const TransportUnix + type UnixConnectionPool struct + func NewUnixConnectionPool(socketPath string, maxConnections int) *UnixConnectionPool + func (p *UnixConnectionPool) Close() error + func (p *UnixConnectionPool) GetConnection() (net.Conn, error) + func (p *UnixConnectionPool) ReturnConnection(conn net.Conn) + type UnixSocketMessage struct + Data json.RawMessage + Headers map[string]string + RequestID string + Timeout int64 + Type string + type UnixSocketPlugin struct + func NewUnixSocketPlugin[Req, Resp any](config PluginConfig, logger *slog.Logger) (*UnixSocketPlugin[Req, Resp], error) + func (u *UnixSocketPlugin[Req, Resp]) Close() error + func (u *UnixSocketPlugin[Req, Resp]) Execute(ctx context.Context, execCtx ExecutionContext, request Req) (Resp, error) + func (u *UnixSocketPlugin[Req, Resp]) Health(ctx context.Context) HealthStatus + func (u *UnixSocketPlugin[Req, Resp]) Info() PluginInfo + type UnixSocketPluginFactory struct + func NewUnixSocketPluginFactory[Req, Resp any](logger *slog.Logger) *UnixSocketPluginFactory[Req, Resp] + func (f *UnixSocketPluginFactory[Req, Resp]) CreatePlugin(config PluginConfig) (Plugin[Req, Resp], error) + func (f *UnixSocketPluginFactory[Req, Resp]) SupportedTransports() []string + func (f *UnixSocketPluginFactory[Req, Resp]) ValidateConfig(config PluginConfig) error + type UnixSocketResponse struct + Data json.RawMessage + Error string + Headers map[string]string + RequestID string + Success bool + Type string