Documentation
¶
Index ¶
- func NewPostgresCDCSourceFactory() connector.SourceFactory
- func NewRedisStreamSourceFactory() connector.SourceFactory
- func NewSQSSinkFactory() connector.SinkFactory
- func NewSQSSourceFactory() connector.SourceFactory
- func PostgresCDCSourceFactory(name string, config map[string]any) (connector.EventSource, error)
- func RedisStreamSourceFactory(name string, config map[string]any) (connector.EventSource, error)
- func RegisterBuiltinSources(registry *connector.Registry) error
- func SQSSinkFactory(name string, config map[string]any) (connector.EventSink, error)
- func SQSSourceFactory(name string, config map[string]any) (connector.EventSource, error)
- type ChangeEvent
- type PGListener
- type PostgresCDCConfig
- type PostgresCDCSource
- func (s *PostgresCDCSource) Checkpoint(_ context.Context) error
- func (s *PostgresCDCSource) Healthy() bool
- func (s *PostgresCDCSource) Name() string
- func (s *PostgresCDCSource) Start(ctx context.Context, output chan<- connector.Event) error
- func (s *PostgresCDCSource) Stop(ctx context.Context) error
- func (s *PostgresCDCSource) Type() string
- type RedisStreamClient
- type RedisStreamConfig
- type RedisStreamMessage
- type RedisStreamSource
- func (s *RedisStreamSource) Checkpoint(_ context.Context) error
- func (s *RedisStreamSource) Healthy() bool
- func (s *RedisStreamSource) Name() string
- func (s *RedisStreamSource) Start(ctx context.Context, output chan<- connector.Event) error
- func (s *RedisStreamSource) Stop(ctx context.Context) error
- func (s *RedisStreamSource) Type() string
- type SQSBatchEntry
- type SQSBatchResult
- type SQSClient
- type SQSConfig
- type SQSMessage
- type SQSSink
- func (s *SQSSink) Deliver(ctx context.Context, event connector.Event) error
- func (s *SQSSink) DeliverBatch(ctx context.Context, events []connector.Event) []error
- func (s *SQSSink) Healthy() bool
- func (s *SQSSink) Name() string
- func (s *SQSSink) Stop(_ context.Context) error
- func (s *SQSSink) Type() string
- type SQSSource
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NewPostgresCDCSourceFactory ¶
func NewPostgresCDCSourceFactory() connector.SourceFactory
NewPostgresCDCSourceFactory returns a SourceFactory for PostgreSQL CDC.
func NewRedisStreamSourceFactory ¶
func NewRedisStreamSourceFactory() connector.SourceFactory
NewRedisStreamSourceFactory returns a SourceFactory for Redis Streams.
func NewSQSSinkFactory ¶
func NewSQSSinkFactory() connector.SinkFactory
NewSQSSinkFactory returns a SinkFactory for AWS SQS.
func NewSQSSourceFactory ¶
func NewSQSSourceFactory() connector.SourceFactory
NewSQSSourceFactory returns a SourceFactory for AWS SQS.
func PostgresCDCSourceFactory ¶
PostgresCDCSourceFactory creates PostgresCDCSource instances from config maps. Note: The returned source requires a PGListener to be injected before Start().
func RedisStreamSourceFactory ¶
RedisStreamSourceFactory creates RedisStreamSource instances from config maps. Note: The returned source requires a RedisStreamClient to be injected before Start().
func RegisterBuiltinSources ¶
RegisterBuiltinSources registers all built-in source and sink factories with the given connector registry. This is the main entry point for bootstrapping the connector system with the standard set of connectors.
func SQSSinkFactory ¶
SQSSinkFactory creates SQSSink instances from config maps. Note: The returned sink requires an SQSClient to be injected before Deliver().
func SQSSourceFactory ¶
SQSSourceFactory creates SQSSource instances from config maps. Note: The returned source requires an SQSClient to be injected before Start().
Types ¶
type ChangeEvent ¶
type ChangeEvent struct {
Table string `json:"table"`
Operation string `json:"operation"` // insert, update, delete
Data json.RawMessage `json:"data"`
OldData json.RawMessage `json:"old_data,omitempty"`
Timestamp time.Time `json:"timestamp"`
}
ChangeEvent represents a single row-level change from PostgreSQL.
type PGListener ¶
type PGListener interface {
// Connect establishes the database connection.
Connect(ctx context.Context, dsn string) error
// Listen starts listening on the given channel.
Listen(ctx context.Context, channel string) error
// WaitForNotification blocks until a notification arrives or context is cancelled.
WaitForNotification(ctx context.Context) (payload string, err error)
// Close closes the connection.
Close(ctx context.Context) error
}
PGListener abstracts the PostgreSQL LISTEN/NOTIFY mechanism for testability. In production, this would wrap pgx.Conn; in tests, a mock implementation is used.
type PostgresCDCConfig ¶
type PostgresCDCConfig struct {
DSN string `json:"dsn" yaml:"dsn"`
Tables []string `json:"tables" yaml:"tables"`
Channel string `json:"channel" yaml:"channel"` // LISTEN/NOTIFY channel name
PollInterval time.Duration `json:"poll_interval" yaml:"poll_interval"` // fallback polling interval
}
PostgresCDCConfig holds the configuration for the PostgreSQL CDC source.
type PostgresCDCSource ¶
type PostgresCDCSource struct {
// contains filtered or unexported fields
}
PostgresCDCSource is an EventSource that watches PostgreSQL tables for changes using LISTEN/NOTIFY and optional polling. It converts row-level changes into CloudEvents-compatible Event structs.
func NewPostgresCDCSource ¶
func NewPostgresCDCSource(name string, config map[string]any) (*PostgresCDCSource, error)
NewPostgresCDCSource creates a new PostgresCDCSource from a config map. Supported config keys: dsn, tables, channel, poll_interval.
func NewPostgresCDCSourceWithListener ¶
func NewPostgresCDCSourceWithListener(name string, config map[string]any, listener PGListener) (*PostgresCDCSource, error)
NewPostgresCDCSourceWithListener creates a PostgresCDCSource with a custom PGListener. This is primarily used for testing with mock listeners.
func (*PostgresCDCSource) Checkpoint ¶
func (s *PostgresCDCSource) Checkpoint(_ context.Context) error
Checkpoint is a no-op for PostgreSQL LISTEN/NOTIFY (stateless notifications).
func (*PostgresCDCSource) Healthy ¶
func (s *PostgresCDCSource) Healthy() bool
Healthy returns true when the source is connected and listening.
func (*PostgresCDCSource) Name ¶
func (s *PostgresCDCSource) Name() string
Name returns the connector instance name.
func (*PostgresCDCSource) Start ¶
Start begins listening for PostgreSQL changes and writing events to the output channel.
func (*PostgresCDCSource) Stop ¶
func (s *PostgresCDCSource) Stop(ctx context.Context) error
Stop gracefully shuts down the CDC listener.
func (*PostgresCDCSource) Type ¶
func (s *PostgresCDCSource) Type() string
Type returns the connector type identifier.
type RedisStreamClient ¶
type RedisStreamClient interface {
// Connect establishes the Redis connection.
Connect(ctx context.Context, addr string) error
// CreateGroup creates a consumer group if it does not already exist.
// The "startID" parameter is typically "$" (latest) or "0" (all history).
CreateGroup(ctx context.Context, stream, group, startID string) error
// ReadGroup reads messages from the stream using the consumer group.
// Returns up to count messages; blocks until messages are available or ctx is cancelled.
ReadGroup(ctx context.Context, stream, group, consumer string, count int) ([]RedisStreamMessage, error)
// Ack acknowledges processing of the given message IDs.
Ack(ctx context.Context, stream, group string, ids ...string) error
// Close closes the Redis connection.
Close() error
}
RedisStreamClient abstracts the Redis Streams API for testability. In production, this would wrap go-redis; in tests, a mock implementation is used.
type RedisStreamConfig ¶
type RedisStreamConfig struct {
Addr string `json:"addr" yaml:"addr"`
Stream string `json:"stream" yaml:"stream"`
Group string `json:"group" yaml:"group"` // consumer group name
Consumer string `json:"consumer" yaml:"consumer"` // consumer instance name
BatchSize int `json:"batch_size" yaml:"batch_size"`
}
RedisStreamConfig holds the configuration for the Redis Streams source.
type RedisStreamMessage ¶
type RedisStreamMessage struct {
ID string // stream message ID (e.g., "1645000000000-0")
Fields map[string]string // key-value pairs from the stream entry
}
RedisStreamMessage represents a single message read from a Redis Stream.
type RedisStreamSource ¶
type RedisStreamSource struct {
// contains filtered or unexported fields
}
RedisStreamSource is an EventSource that reads from a Redis Stream using consumer groups. Each message is converted to a CloudEvents Event.
func NewRedisStreamSource ¶
func NewRedisStreamSource(name string, config map[string]any) (*RedisStreamSource, error)
NewRedisStreamSource creates a new RedisStreamSource from a config map. Supported config keys: addr, stream, group, consumer, batch_size.
func NewRedisStreamSourceWithClient ¶
func NewRedisStreamSourceWithClient(name string, config map[string]any, client RedisStreamClient) (*RedisStreamSource, error)
NewRedisStreamSourceWithClient creates a RedisStreamSource with a custom client. This is primarily used for testing with mock clients.
func (*RedisStreamSource) Checkpoint ¶
func (s *RedisStreamSource) Checkpoint(_ context.Context) error
Checkpoint is a no-op; Redis Streams tracks position via consumer groups.
func (*RedisStreamSource) Healthy ¶
func (s *RedisStreamSource) Healthy() bool
Healthy returns true when the source is connected and reading.
func (*RedisStreamSource) Name ¶
func (s *RedisStreamSource) Name() string
Name returns the connector instance name.
func (*RedisStreamSource) Start ¶
Start begins reading from the Redis Stream and writing events to the output channel.
func (*RedisStreamSource) Stop ¶
func (s *RedisStreamSource) Stop(ctx context.Context) error
Stop gracefully shuts down the Redis Stream reader.
func (*RedisStreamSource) Type ¶
func (s *RedisStreamSource) Type() string
Type returns the connector type identifier.
type SQSBatchEntry ¶
SQSBatchEntry represents a single entry in a SendMessageBatch call.
type SQSBatchResult ¶
SQSBatchResult represents the result of a single entry in a SendMessageBatch call.
type SQSClient ¶
type SQSClient interface {
// ReceiveMessages polls the SQS queue for up to maxMessages.
ReceiveMessages(ctx context.Context, queueURL string, maxMessages, waitTimeSeconds int) ([]SQSMessage, error)
// DeleteMessage removes a message from the queue after successful processing.
DeleteMessage(ctx context.Context, queueURL, receiptHandle string) error
// SendMessage sends a message to the SQS queue.
SendMessage(ctx context.Context, queueURL, body string, attributes map[string]string) (messageID string, err error)
// SendMessageBatch sends multiple messages to the SQS queue.
SendMessageBatch(ctx context.Context, queueURL string, entries []SQSBatchEntry) ([]SQSBatchResult, error)
}
SQSClient abstracts the AWS SQS API for testability. In production, this would wrap the AWS SDK v2 SQS client; in tests, a mock is used.
type SQSConfig ¶
type SQSConfig struct {
QueueURL string `json:"queue_url" yaml:"queue_url"`
Region string `json:"region" yaml:"region"`
MaxMessages int `json:"max_messages" yaml:"max_messages"`
WaitTimeSeconds int `json:"wait_time_seconds" yaml:"wait_time_seconds"`
}
SQSConfig holds the configuration for the AWS SQS source and sink.
type SQSMessage ¶
type SQSMessage struct {
MessageID string
ReceiptHandle string
Body string
Attributes map[string]string
}
SQSMessage represents a single message received from SQS.
type SQSSink ¶
type SQSSink struct {
// contains filtered or unexported fields
}
SQSSink is an EventSink that delivers events to an AWS SQS queue.
func NewSQSSink ¶
NewSQSSink creates a new SQSSink from a config map. Supported config keys: queue_url, region.
func NewSQSSinkWithClient ¶
NewSQSSinkWithClient creates an SQSSink with a custom SQS client. This is primarily used for testing with mock clients.
func (*SQSSink) DeliverBatch ¶
DeliverBatch sends multiple events to the SQS queue. Returns per-event errors.
type SQSSource ¶
type SQSSource struct {
// contains filtered or unexported fields
}
SQSSource is an EventSource that polls an AWS SQS queue and emits CloudEvents-compatible events for each message received.
func NewSQSSource ¶
NewSQSSource creates a new SQSSource from a config map. Supported config keys: queue_url, region, max_messages, wait_time_seconds.
func NewSQSSourceWithClient ¶
func NewSQSSourceWithClient(name string, config map[string]any, client SQSClient) (*SQSSource, error)
NewSQSSourceWithClient creates an SQSSource with a custom SQS client. This is primarily used for testing with mock clients.
func (*SQSSource) Checkpoint ¶
Checkpoint is a no-op for SQS (messages are deleted after processing).
func (*SQSSource) Start ¶
Start begins polling the SQS queue and writing events to the output channel.