Documentation
¶
Index ¶
- type Event
- type EventSink
- type EventSource
- type Registry
- func (r *Registry) CreateSink(connectorType, name string, config map[string]any) (EventSink, error)
- func (r *Registry) CreateSource(connectorType, name string, config map[string]any) (EventSource, error)
- func (r *Registry) GetInstance(name string) (any, bool)
- func (r *Registry) ListSinks() []string
- func (r *Registry) ListSources() []string
- func (r *Registry) RegisterSink(connectorType string, factory SinkFactory) error
- func (r *Registry) RegisterSource(connectorType string, factory SourceFactory) error
- func (r *Registry) StopAll(ctx context.Context) error
- type SinkFactory
- type SourceFactory
- type WebhookSink
- func (ws *WebhookSink) Deliver(ctx context.Context, event Event) error
- func (ws *WebhookSink) DeliverBatch(ctx context.Context, events []Event) []error
- func (ws *WebhookSink) Healthy() bool
- func (ws *WebhookSink) Name() string
- func (ws *WebhookSink) SetClient(client *http.Client)
- func (ws *WebhookSink) Stop(_ context.Context) error
- func (ws *WebhookSink) Type() string
- type WebhookSinkRetryConfig
- type WebhookSource
- func (ws *WebhookSource) Addr() string
- func (ws *WebhookSource) Checkpoint(_ context.Context) error
- func (ws *WebhookSource) Healthy() bool
- func (ws *WebhookSource) Name() string
- func (ws *WebhookSource) Start(ctx context.Context, output chan<- Event) error
- func (ws *WebhookSource) Stop(ctx context.Context) error
- func (ws *WebhookSource) Type() string
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Event ¶
type Event struct {
ID string `json:"id"`
Source string `json:"source"`
Type string `json:"type"`
Subject string `json:"subject,omitempty"`
Time time.Time `json:"time"`
Data json.RawMessage `json:"data"`
DataSchema string `json:"dataschema,omitempty"`
DataContentType string `json:"datacontenttype,omitempty"`
// Internal metadata (not serialized to CloudEvents)
TenantID string `json:"-"`
PipelineID string `json:"-"`
IdempotencyKey string `json:"-"`
}
Event is the universal event envelope (CloudEvents compatible).
type EventSink ¶
type EventSink interface {
Name() string
Type() string
Deliver(ctx context.Context, event Event) error
DeliverBatch(ctx context.Context, events []Event) []error
Stop(ctx context.Context) error
Healthy() bool
}
EventSink defines the interface for event egress connectors.
type EventSource ¶
type EventSource interface {
Name() string
Type() string
Start(ctx context.Context, output chan<- Event) error
Stop(ctx context.Context) error
Healthy() bool
Checkpoint(ctx context.Context) error
}
EventSource defines the interface for event ingress connectors.
func WebhookSourceFactory ¶
func WebhookSourceFactory(name string, config map[string]any) (EventSource, error)
WebhookSourceFactory is a SourceFactory for creating WebhookSource instances.
type Registry ¶
type Registry struct {
// contains filtered or unexported fields
}
Registry manages connector factories and instances.
func (*Registry) CreateSink ¶
CreateSink creates and tracks a new EventSink instance.
func (*Registry) CreateSource ¶
func (r *Registry) CreateSource(connectorType, name string, config map[string]any) (EventSource, error)
CreateSource creates and tracks a new EventSource instance.
func (*Registry) GetInstance ¶
GetInstance returns a running connector instance by name.
func (*Registry) ListSources ¶
ListSources returns the registered source connector type names.
func (*Registry) RegisterSink ¶
func (r *Registry) RegisterSink(connectorType string, factory SinkFactory) error
RegisterSink registers a SinkFactory for the given connector type.
func (*Registry) RegisterSource ¶
func (r *Registry) RegisterSource(connectorType string, factory SourceFactory) error
RegisterSource registers a SourceFactory for the given connector type.
type SinkFactory ¶
SinkFactory creates EventSink instances from config.
type SourceFactory ¶
type SourceFactory func(name string, config map[string]any) (EventSource, error)
SourceFactory creates EventSource instances from config.
type WebhookSink ¶
type WebhookSink struct {
// contains filtered or unexported fields
}
WebhookSink is an EventSink that delivers events to HTTP endpoints. It wraps the webhook delivery pattern used by the existing webhook.sender module, adding CloudEvents envelope support.
func NewWebhookSink ¶
func NewWebhookSink(name string, config map[string]any) (*WebhookSink, error)
NewWebhookSink creates a WebhookSink from a config map. Supported config keys: url, method, headers, retry (max_attempts, backoff).
func (*WebhookSink) Deliver ¶
func (ws *WebhookSink) Deliver(ctx context.Context, event Event) error
Deliver sends a single event to the configured HTTP endpoint with retry.
func (*WebhookSink) DeliverBatch ¶
func (ws *WebhookSink) DeliverBatch(ctx context.Context, events []Event) []error
DeliverBatch delivers multiple events individually, returning per-event errors. A nil entry in the returned slice means the corresponding event was delivered successfully.
func (*WebhookSink) Healthy ¶
func (ws *WebhookSink) Healthy() bool
Healthy returns true when the sink is operational.
func (*WebhookSink) Name ¶
func (ws *WebhookSink) Name() string
Name returns the connector instance name.
func (*WebhookSink) SetClient ¶
func (ws *WebhookSink) SetClient(client *http.Client)
SetClient sets a custom HTTP client (useful for testing).
func (*WebhookSink) Stop ¶
func (ws *WebhookSink) Stop(_ context.Context) error
Stop marks the sink as unhealthy. The underlying HTTP client does not require explicit shutdown.
func (*WebhookSink) Type ¶
func (ws *WebhookSink) Type() string
Type returns the connector type identifier.
type WebhookSinkRetryConfig ¶
type WebhookSinkRetryConfig struct {
MaxAttempts int `json:"max_attempts" yaml:"max_attempts"`
Backoff time.Duration `json:"backoff" yaml:"backoff"`
}
WebhookSinkRetryConfig controls retry behavior for webhook delivery.
type WebhookSource ¶
type WebhookSource struct {
// contains filtered or unexported fields
}
WebhookSource is an EventSource that receives HTTP webhooks and emits them as CloudEvents. It wraps the common HTTP trigger pattern used elsewhere in the workflow engine.
func NewWebhookSource ¶
func NewWebhookSource(name string, config map[string]any) (*WebhookSource, error)
NewWebhookSource creates a WebhookSource from a config map. Supported config keys: address, path, secret.
func (*WebhookSource) Addr ¶
func (ws *WebhookSource) Addr() string
Addr returns the resolved listen address. Useful when the source was started on port 0 to let the OS pick a port.
func (*WebhookSource) Checkpoint ¶
func (ws *WebhookSource) Checkpoint(_ context.Context) error
Checkpoint is a no-op for webhooks (stateless).
func (*WebhookSource) Healthy ¶
func (ws *WebhookSource) Healthy() bool
Healthy returns true when the server is running.
func (*WebhookSource) Name ¶
func (ws *WebhookSource) Name() string
Name returns the connector instance name.
func (*WebhookSource) Start ¶
func (ws *WebhookSource) Start(ctx context.Context, output chan<- Event) error
Start begins listening for HTTP webhooks, writing received events to output.
func (*WebhookSource) Stop ¶
func (ws *WebhookSource) Stop(ctx context.Context) error
Stop gracefully shuts down the HTTP server.
func (*WebhookSource) Type ¶
func (ws *WebhookSource) Type() string
Type returns the connector type identifier.