Documentation
¶
Index ¶
- type BatchItem
- type ChannelRuntime
- type CodeTemplateLibrary
- type CodeTemplateLoader
- func (ctl *CodeTemplateLoader) GetLibrary(name string) (*CodeTemplateLibrary, bool)
- func (ctl *CodeTemplateLoader) Libraries() map[string]*CodeTemplateLibrary
- func (ctl *CodeTemplateLoader) LoadLibrary(name, dir string) error
- func (ctl *CodeTemplateLoader) ResolveFunction(funcName string) (string, bool)
- type ConnectorFactory
- type DefaultEngine
- func (e *DefaultEngine) CloseRuntime() error
- func (e *DefaultEngine) Config() *config.Config
- func (e *DefaultEngine) DeployChannel(ctx context.Context, channelID string) error
- func (e *DefaultEngine) GetChannelRuntime(channelID string) (*ChannelRuntime, bool)
- func (e *DefaultEngine) InitRuntime(ctx context.Context) error
- func (e *DefaultEngine) ListChannelIDs() []string
- func (e *DefaultEngine) MessageStore() storage.MessageStore
- func (e *DefaultEngine) Metrics() *observability.Metrics
- func (e *DefaultEngine) ReprocessMessage(ctx context.Context, channelID string, msg *message.Message) error
- func (e *DefaultEngine) RestartChannel(ctx context.Context, channelID string) error
- func (e *DefaultEngine) RootDir() string
- func (e *DefaultEngine) SetAlertManager(am *alerting.AlertManager)
- func (e *DefaultEngine) SetCoordinator(coord cluster.ChannelCoordinator)
- func (e *DefaultEngine) SetDeduplicator(dedup cluster.MessageDeduplicator)
- func (e *DefaultEngine) SetMessageStore(store storage.MessageStore)
- func (e *DefaultEngine) SetRedisClient(client *redis.Client, keyPrefix string)
- func (e *DefaultEngine) Start(ctx context.Context) error
- func (e *DefaultEngine) Stop(ctx context.Context) error
- func (e *DefaultEngine) UndeployChannel(ctx context.Context, channelID string) error
- func (e *DefaultEngine) WatchChannels(ctx context.Context) error
- type DestinationResult
- type Engine
- type GojaRunner
- type HotReloader
- type JSRunner
- type MapVariables
- type NodeRunner
- type Pipeline
- func (p *Pipeline) Execute(ctx context.Context, msg *message.Message) (*PipelineResult, error)
- func (p *Pipeline) ExecuteDestinationPipeline(ctx context.Context, msg *message.Message, transformed any, ...) (*message.Message, bool, error)
- func (p *Pipeline) ExecutePostprocessor(ctx context.Context, msg *message.Message, transformed any, ...) error
- func (p *Pipeline) ExecuteResponseTransformer(ctx context.Context, msg *message.Message, dest config.ChannelDestination, ...) error
- func (p *Pipeline) SetMapContext(maps *MapVariables, connectorMap *SyncMap)
- func (p *Pipeline) SetMessageStore(store storage.MessageStore)
- type PipelineResult
- type SyncMap
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ChannelRuntime ¶
type ChannelRuntime struct {
ID string
Config *config.ChannelConfig
Source connector.SourceConnector
Destinations map[string]connector.DestinationConnector
DestConfigs []config.ChannelDestination
Pipeline *Pipeline
Logger *slog.Logger
Metrics *observability.Metrics
Store storage.MessageStore
Maps *MapVariables
Dedup cluster.MessageDeduplicator
// contains filtered or unexported fields
}
type CodeTemplateLibrary ¶
type CodeTemplateLoader ¶
type CodeTemplateLoader struct {
// contains filtered or unexported fields
}
func NewCodeTemplateLoader ¶
func NewCodeTemplateLoader(projectDir string, logger *slog.Logger) *CodeTemplateLoader
func (*CodeTemplateLoader) GetLibrary ¶
func (ctl *CodeTemplateLoader) GetLibrary(name string) (*CodeTemplateLibrary, bool)
GetLibrary returns a loaded library by name.
func (*CodeTemplateLoader) Libraries ¶
func (ctl *CodeTemplateLoader) Libraries() map[string]*CodeTemplateLibrary
Libraries returns all loaded libraries.
func (*CodeTemplateLoader) LoadLibrary ¶
func (ctl *CodeTemplateLoader) LoadLibrary(name, dir string) error
LoadLibrary loads all compiled JS files from a code template library directory.
func (*CodeTemplateLoader) ResolveFunction ¶
func (ctl *CodeTemplateLoader) ResolveFunction(funcName string) (string, bool)
ResolveFunction finds a function file across all loaded libraries.
type ConnectorFactory ¶
type ConnectorFactory interface {
CreateSource(listenerCfg config.ListenerConfig) (connector.SourceConnector, error)
CreateDestination(name string, dest config.Destination) (connector.DestinationConnector, error)
}
type DefaultEngine ¶
type DefaultEngine struct {
// contains filtered or unexported fields
}
func NewDefaultEngine ¶
func NewDefaultEngine(rootDir string, cfg *config.Config, factory ConnectorFactory, logger *slog.Logger) *DefaultEngine
func (*DefaultEngine) CloseRuntime ¶
func (e *DefaultEngine) CloseRuntime() error
func (*DefaultEngine) Config ¶
func (e *DefaultEngine) Config() *config.Config
func (*DefaultEngine) DeployChannel ¶
func (e *DefaultEngine) DeployChannel(ctx context.Context, channelID string) error
func (*DefaultEngine) GetChannelRuntime ¶
func (e *DefaultEngine) GetChannelRuntime(channelID string) (*ChannelRuntime, bool)
func (*DefaultEngine) InitRuntime ¶
func (e *DefaultEngine) InitRuntime(ctx context.Context) error
func (*DefaultEngine) ListChannelIDs ¶
func (e *DefaultEngine) ListChannelIDs() []string
func (*DefaultEngine) MessageStore ¶
func (e *DefaultEngine) MessageStore() storage.MessageStore
func (*DefaultEngine) Metrics ¶
func (e *DefaultEngine) Metrics() *observability.Metrics
func (*DefaultEngine) ReprocessMessage ¶
func (*DefaultEngine) RestartChannel ¶
func (e *DefaultEngine) RestartChannel(ctx context.Context, channelID string) error
func (*DefaultEngine) RootDir ¶
func (e *DefaultEngine) RootDir() string
func (*DefaultEngine) SetAlertManager ¶
func (e *DefaultEngine) SetAlertManager(am *alerting.AlertManager)
func (*DefaultEngine) SetCoordinator ¶
func (e *DefaultEngine) SetCoordinator(coord cluster.ChannelCoordinator)
func (*DefaultEngine) SetDeduplicator ¶
func (e *DefaultEngine) SetDeduplicator(dedup cluster.MessageDeduplicator)
func (*DefaultEngine) SetMessageStore ¶
func (e *DefaultEngine) SetMessageStore(store storage.MessageStore)
func (*DefaultEngine) SetRedisClient ¶
func (e *DefaultEngine) SetRedisClient(client *redis.Client, keyPrefix string)
func (*DefaultEngine) UndeployChannel ¶
func (e *DefaultEngine) UndeployChannel(ctx context.Context, channelID string) error
func (*DefaultEngine) WatchChannels ¶
func (e *DefaultEngine) WatchChannels(ctx context.Context) error
type DestinationResult ¶
type GojaRunner ¶
type GojaRunner struct {
// contains filtered or unexported fields
}
func NewGojaRunner ¶
func NewGojaRunner() *GojaRunner
func (*GojaRunner) Close ¶
func (r *GojaRunner) Close() error
type HotReloader ¶
type HotReloader struct {
// contains filtered or unexported fields
}
func NewHotReloader ¶
func NewHotReloader(engine *DefaultEngine, channelsDir string, logger *slog.Logger) (*HotReloader, error)
func (*HotReloader) Stop ¶
func (hr *HotReloader) Stop()
type MapVariables ¶
type MapVariables struct {
// contains filtered or unexported fields
}
func NewMapVariables ¶
func NewMapVariables() *MapVariables
func (*MapVariables) ChannelMap ¶
func (mv *MapVariables) ChannelMap(channelID string) *SyncMap
func (*MapVariables) ExportForChannel ¶
func (mv *MapVariables) ExportForChannel(channelID string) map[string]any
Export returns all map data as a plain map structure for JS context injection.
func (*MapVariables) GlobalMap ¶
func (mv *MapVariables) GlobalMap() *SyncMap
func (*MapVariables) ResponseMap ¶
func (mv *MapVariables) ResponseMap(channelID string) *SyncMap
type NodeRunner ¶
type NodeRunner struct {
// contains filtered or unexported fields
}
func NewNodeRunner ¶
func NewNodeRunner(poolSize int, logger *slog.Logger) (*NodeRunner, error)
func (*NodeRunner) Close ¶
func (nr *NodeRunner) Close() error
func (*NodeRunner) PreloadModule ¶
func (nr *NodeRunner) PreloadModule(module string) error
type Pipeline ¶
type Pipeline struct {
// contains filtered or unexported fields
}
func NewPipeline ¶
func (*Pipeline) ExecuteDestinationPipeline ¶
func (*Pipeline) ExecutePostprocessor ¶
func (*Pipeline) ExecuteResponseTransformer ¶
func (*Pipeline) SetMapContext ¶
func (p *Pipeline) SetMapContext(maps *MapVariables, connectorMap *SyncMap)
func (*Pipeline) SetMessageStore ¶
func (p *Pipeline) SetMessageStore(store storage.MessageStore)
type PipelineResult ¶
type SyncMap ¶
type SyncMap struct {
// contains filtered or unexported fields
}
func NewConnectorMap ¶
func NewConnectorMap() *SyncMap
ConnectorMap creates a per-message connector map (not shared across messages).
func NewSyncMap ¶
func NewSyncMap() *SyncMap
Click to show internal directories.
Click to hide internal directories.