runtime

package
v1.0.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 14, 2026 License: MPL-2.0 Imports: 34 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BatchItem

type BatchItem struct {
	Raw    []byte
	Parsed any
	Output any
}

type ChannelRuntime

type ChannelRuntime struct {
	ID           string
	Config       *config.ChannelConfig
	Source       connector.SourceConnector
	Destinations map[string]connector.DestinationConnector
	DestConfigs  []config.ChannelDestination
	Pipeline     *Pipeline
	Logger       *slog.Logger
	Metrics      *observability.Metrics
	Store        storage.MessageStore
	Maps         *MapVariables
	Dedup        cluster.MessageDeduplicator
	// contains filtered or unexported fields
}

func (*ChannelRuntime) HandleMessage added in v1.0.6

func (cr *ChannelRuntime) HandleMessage(ctx context.Context, msg *message.Message) error

func (*ChannelRuntime) Start

func (cr *ChannelRuntime) Start(ctx context.Context) error

func (*ChannelRuntime) Stop

func (cr *ChannelRuntime) Stop(ctx context.Context) error

type CodeTemplateLibrary

type CodeTemplateLibrary struct {
	Name      string
	Directory string
	Functions map[string]string
}

type CodeTemplateLoader

type CodeTemplateLoader struct {
	// contains filtered or unexported fields
}

func NewCodeTemplateLoader

func NewCodeTemplateLoader(projectDir string, logger *slog.Logger) *CodeTemplateLoader

func (*CodeTemplateLoader) GetLibrary

func (ctl *CodeTemplateLoader) GetLibrary(name string) (*CodeTemplateLibrary, bool)

GetLibrary returns a loaded library by name.

func (*CodeTemplateLoader) Libraries

func (ctl *CodeTemplateLoader) Libraries() map[string]*CodeTemplateLibrary

Libraries returns all loaded libraries.

func (*CodeTemplateLoader) LoadLibrary

func (ctl *CodeTemplateLoader) LoadLibrary(name, dir string) error

LoadLibrary loads all compiled JS files from a code template library directory.

func (*CodeTemplateLoader) ResolveFunction

func (ctl *CodeTemplateLoader) ResolveFunction(funcName string) (string, bool)

ResolveFunction finds a function file across all loaded libraries.

type ConnectorFactory

type ConnectorFactory interface {
	CreateSource(listenerCfg config.ListenerConfig) (connector.SourceConnector, error)
	CreateDestination(name string, dest config.Destination) (connector.DestinationConnector, error)
}

type DefaultEngine

type DefaultEngine struct {
	// contains filtered or unexported fields
}

func NewDefaultEngine

func NewDefaultEngine(rootDir string, cfg *config.Config, factory ConnectorFactory, logger *slog.Logger) *DefaultEngine

func (*DefaultEngine) CloseRuntime

func (e *DefaultEngine) CloseRuntime() error

func (*DefaultEngine) Config

func (e *DefaultEngine) Config() *config.Config

func (*DefaultEngine) DeployChannel

func (e *DefaultEngine) DeployChannel(ctx context.Context, channelID string) error

func (*DefaultEngine) GetChannelRuntime

func (e *DefaultEngine) GetChannelRuntime(channelID string) (*ChannelRuntime, bool)

func (*DefaultEngine) InitRuntime

func (e *DefaultEngine) InitRuntime(ctx context.Context) error

func (*DefaultEngine) ListChannelIDs

func (e *DefaultEngine) ListChannelIDs() []string

func (*DefaultEngine) MessageStore

func (e *DefaultEngine) MessageStore() storage.MessageStore

func (*DefaultEngine) Metrics

func (e *DefaultEngine) Metrics() *observability.Metrics

func (*DefaultEngine) ReprocessMessage

func (e *DefaultEngine) ReprocessMessage(ctx context.Context, channelID string, msg *message.Message) error

func (*DefaultEngine) RestartChannel

func (e *DefaultEngine) RestartChannel(ctx context.Context, channelID string) error

func (*DefaultEngine) RootDir

func (e *DefaultEngine) RootDir() string

func (*DefaultEngine) SetAlertManager

func (e *DefaultEngine) SetAlertManager(am *alerting.AlertManager)

func (*DefaultEngine) SetCoordinator

func (e *DefaultEngine) SetCoordinator(coord cluster.ChannelCoordinator)

func (*DefaultEngine) SetDeduplicator

func (e *DefaultEngine) SetDeduplicator(dedup cluster.MessageDeduplicator)

func (*DefaultEngine) SetMessageStore

func (e *DefaultEngine) SetMessageStore(store storage.MessageStore)

func (*DefaultEngine) SetRedisClient

func (e *DefaultEngine) SetRedisClient(client *redis.Client, keyPrefix string)

func (*DefaultEngine) Start

func (e *DefaultEngine) Start(ctx context.Context) error

func (*DefaultEngine) Stop

func (e *DefaultEngine) Stop(ctx context.Context) error

func (*DefaultEngine) UndeployChannel

func (e *DefaultEngine) UndeployChannel(ctx context.Context, channelID string) error

func (*DefaultEngine) WatchChannels

func (e *DefaultEngine) WatchChannels(ctx context.Context) error

type DestinationResult

type DestinationResult struct {
	Name     string
	Success  bool
	Response *message.Response
	Error    string
}

type Engine

type Engine interface {
	Start(ctx context.Context) error
	Stop(ctx context.Context) error
}

type HotReloader

type HotReloader struct {
	// contains filtered or unexported fields
}

func NewHotReloader

func NewHotReloader(engine *DefaultEngine, channelsDir string, logger *slog.Logger) (*HotReloader, error)

func (*HotReloader) Start

func (hr *HotReloader) Start(ctx context.Context) error

func (*HotReloader) Stop

func (hr *HotReloader) Stop()

type MapVariables

type MapVariables struct {
	// contains filtered or unexported fields
}

func NewMapVariables

func NewMapVariables() *MapVariables

func (*MapVariables) ChannelMap

func (mv *MapVariables) ChannelMap(channelID string) *SyncMap

func (*MapVariables) ExportForChannel

func (mv *MapVariables) ExportForChannel(channelID string) map[string]any

Export returns all map data as a plain map structure for JS context injection.

func (*MapVariables) GlobalMap

func (mv *MapVariables) GlobalMap() *SyncMap

func (*MapVariables) ResponseMap

func (mv *MapVariables) ResponseMap(channelID string) *SyncMap

type NodeRunner

type NodeRunner struct {
	// contains filtered or unexported fields
}

func NewNodeRunner

func NewNodeRunner(poolSize int, logger *slog.Logger) (*NodeRunner, error)

func (*NodeRunner) Call

func (nr *NodeRunner) Call(fn string, entrypoint string, args ...any) (any, error)

func (*NodeRunner) Close

func (nr *NodeRunner) Close() error

func (*NodeRunner) PreloadModule

func (nr *NodeRunner) PreloadModule(module string) error

type Phase added in v1.0.7

type Phase string

Phase represents a point in the pipeline where plugins can execute.

const (
	PhaseBeforeValidation  Phase = "before_validation"
	PhaseAfterValidation   Phase = "after_validation"
	PhaseBeforeTransform   Phase = "before_transform"
	PhaseAfterTransform    Phase = "after_transform"
	PhaseBeforeDestination Phase = "before_destination"
	PhaseAfterDestination  Phase = "after_destination"
)

type Pipeline

type Pipeline struct {
	// contains filtered or unexported fields
}

func NewPipeline

func NewPipeline(channelDir, projectDir, channelID string, cfg *config.ChannelConfig, runner *NodeRunner, logger *slog.Logger) *Pipeline

func (*Pipeline) Execute

func (p *Pipeline) Execute(ctx context.Context, msg *message.Message) (*PipelineResult, error)

func (*Pipeline) ExecuteDestinationPipeline

func (p *Pipeline) ExecuteDestinationPipeline(ctx context.Context, msg *message.Message, transformed any, sourceIntuMsg map[string]any, dest config.ChannelDestination) (*message.Message, bool, error)

func (*Pipeline) ExecutePostprocessor

func (p *Pipeline) ExecutePostprocessor(ctx context.Context, msg *message.Message, transformed any, results []DestinationResult) error

func (*Pipeline) ExecuteResponseTransformer

func (p *Pipeline) ExecuteResponseTransformer(ctx context.Context, msg *message.Message, dest config.ChannelDestination, resp *message.Response) error

func (*Pipeline) SetMapContext

func (p *Pipeline) SetMapContext(maps *MapVariables, connectorMap *SyncMap)

func (*Pipeline) SetMessageStore

func (p *Pipeline) SetMessageStore(store storage.MessageStore)

func (*Pipeline) SetResolvedDestinations added in v1.0.4

func (p *Pipeline) SetResolvedDestinations(dests map[string]config.Destination)

type PipelineResult

type PipelineResult struct {
	Filtered      bool
	Output        any
	OutputBytes   []byte
	OutputMsg     *message.Message
	RouteTo       []string
	DestResults   []DestinationResult
	BatchItems    []BatchItem
	SourceIntuMsg map[string]any
}

type PipelineStage added in v1.0.7

type PipelineStage interface {
	Name() string
	Phase() Phase
	Process(ctx context.Context, msg *message.Message) (*message.Message, error)
}

PipelineStage is the Go interface for custom pipeline plugins. Each stage declares its Name, the Phase at which it runs, and a Process function that receives and returns a message.

type PluginRegistry added in v1.0.7

type PluginRegistry struct {
	// contains filtered or unexported fields
}

PluginRegistry holds the ordered set of plugins for a channel, indexed by phase for fast lookup during pipeline execution.

func NewPluginRegistry added in v1.0.7

func NewPluginRegistry() *PluginRegistry

func (*PluginRegistry) Execute added in v1.0.7

func (r *PluginRegistry) Execute(ctx context.Context, phase Phase, msg *message.Message, logger *slog.Logger) (*message.Message, error)

Execute runs all plugins registered for the given phase in order. Returns the (possibly modified) message or the first error encountered.

func (*PluginRegistry) HasPlugins added in v1.0.7

func (r *PluginRegistry) HasPlugins(phase Phase) bool

func (*PluginRegistry) Register added in v1.0.7

func (r *PluginRegistry) Register(stage PipelineStage)

type ScriptPlugin added in v1.0.7

type ScriptPlugin struct {
	// contains filtered or unexported fields
}

ScriptPlugin implements PipelineStage by delegating to a TypeScript/JS function via the Node.js runner. This is the standard plugin type registered through channel YAML.

func NewScriptPlugin added in v1.0.7

func NewScriptPlugin(cfg config.PluginConfig, channelDir, projectDir string, runner *NodeRunner, logger *slog.Logger) (*ScriptPlugin, error)

func (*ScriptPlugin) Name added in v1.0.7

func (sp *ScriptPlugin) Name() string

func (*ScriptPlugin) Phase added in v1.0.7

func (sp *ScriptPlugin) Phase() Phase

func (*ScriptPlugin) Process added in v1.0.7

func (sp *ScriptPlugin) Process(ctx context.Context, msg *message.Message) (*message.Message, error)

type SyncMap

type SyncMap struct {
	// contains filtered or unexported fields
}

func NewConnectorMap

func NewConnectorMap() *SyncMap

ConnectorMap creates a per-message connector map (not shared across messages).

func NewSyncMap

func NewSyncMap() *SyncMap

func (*SyncMap) Clear

func (m *SyncMap) Clear()

func (*SyncMap) Get

func (m *SyncMap) Get(key string) (any, bool)

func (*SyncMap) Put

func (m *SyncMap) Put(key string, value any)

func (*SyncMap) Remove

func (m *SyncMap) Remove(key string)

func (*SyncMap) Snapshot

func (m *SyncMap) Snapshot() map[string]any

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL