source

package
v0.0.0-...-dac86b4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 16, 2026 License: MIT Imports: 9 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewPostgresCDCSourceFactory

func NewPostgresCDCSourceFactory() connector.SourceFactory

NewPostgresCDCSourceFactory returns a SourceFactory for PostgreSQL CDC.

func NewRedisStreamSourceFactory

func NewRedisStreamSourceFactory() connector.SourceFactory

NewRedisStreamSourceFactory returns a SourceFactory for Redis Streams.

func NewSQSSinkFactory

func NewSQSSinkFactory() connector.SinkFactory

NewSQSSinkFactory returns a SinkFactory for AWS SQS.

func NewSQSSourceFactory

func NewSQSSourceFactory() connector.SourceFactory

NewSQSSourceFactory returns a SourceFactory for AWS SQS.

func PostgresCDCSourceFactory

func PostgresCDCSourceFactory(name string, config map[string]any) (connector.EventSource, error)

PostgresCDCSourceFactory creates PostgresCDCSource instances from config maps. Note: The returned source requires a PGListener to be injected before Start().

func RedisStreamSourceFactory

func RedisStreamSourceFactory(name string, config map[string]any) (connector.EventSource, error)

RedisStreamSourceFactory creates RedisStreamSource instances from config maps. Note: The returned source requires a RedisStreamClient to be injected before Start().

func RegisterBuiltinSources

func RegisterBuiltinSources(registry *connector.Registry) error

RegisterBuiltinSources registers all built-in source and sink factories with the given connector registry. This is the main entry point for bootstrapping the connector system with the standard set of connectors.

func SQSSinkFactory

func SQSSinkFactory(name string, config map[string]any) (connector.EventSink, error)

SQSSinkFactory creates SQSSink instances from config maps. Note: The returned sink requires an SQSClient to be injected before Deliver().

func SQSSourceFactory

func SQSSourceFactory(name string, config map[string]any) (connector.EventSource, error)

SQSSourceFactory creates SQSSource instances from config maps. Note: The returned source requires an SQSClient to be injected before Start().

Types

type ChangeEvent

type ChangeEvent struct {
	Table     string          `json:"table"`
	Operation string          `json:"operation"` // insert, update, delete
	Data      json.RawMessage `json:"data"`
	OldData   json.RawMessage `json:"old_data,omitempty"`
	Timestamp time.Time       `json:"timestamp"`
}

ChangeEvent represents a single row-level change from PostgreSQL.

type PGListener

type PGListener interface {
	// Connect establishes the database connection.
	Connect(ctx context.Context, dsn string) error
	// Listen starts listening on the given channel.
	Listen(ctx context.Context, channel string) error
	// WaitForNotification blocks until a notification arrives or context is cancelled.
	WaitForNotification(ctx context.Context) (payload string, err error)
	// Close closes the connection.
	Close(ctx context.Context) error
}

PGListener abstracts the PostgreSQL LISTEN/NOTIFY mechanism for testability. In production, this would wrap pgx.Conn; in tests, a mock implementation is used.

type PostgresCDCConfig

type PostgresCDCConfig struct {
	DSN          string        `json:"dsn" yaml:"dsn"`
	Tables       []string      `json:"tables" yaml:"tables"`
	Channel      string        `json:"channel" yaml:"channel"`             // LISTEN/NOTIFY channel name
	PollInterval time.Duration `json:"poll_interval" yaml:"poll_interval"` // fallback polling interval
}

PostgresCDCConfig holds the configuration for the PostgreSQL CDC source.

type PostgresCDCSource

type PostgresCDCSource struct {
	// contains filtered or unexported fields
}

PostgresCDCSource is an EventSource that watches PostgreSQL tables for changes using LISTEN/NOTIFY and optional polling. It converts row-level changes into CloudEvents-compatible Event structs.

func NewPostgresCDCSource

func NewPostgresCDCSource(name string, config map[string]any) (*PostgresCDCSource, error)

NewPostgresCDCSource creates a new PostgresCDCSource from a config map. Supported config keys: dsn, tables, channel, poll_interval.

func NewPostgresCDCSourceWithListener

func NewPostgresCDCSourceWithListener(name string, config map[string]any, listener PGListener) (*PostgresCDCSource, error)

NewPostgresCDCSourceWithListener creates a PostgresCDCSource with a custom PGListener. This is primarily used for testing with mock listeners.

func (*PostgresCDCSource) Checkpoint

func (s *PostgresCDCSource) Checkpoint(_ context.Context) error

Checkpoint is a no-op for PostgreSQL LISTEN/NOTIFY (stateless notifications).

func (*PostgresCDCSource) Healthy

func (s *PostgresCDCSource) Healthy() bool

Healthy returns true when the source is connected and listening.

func (*PostgresCDCSource) Name

func (s *PostgresCDCSource) Name() string

Name returns the connector instance name.

func (*PostgresCDCSource) Start

func (s *PostgresCDCSource) Start(ctx context.Context, output chan<- connector.Event) error

Start begins listening for PostgreSQL changes and writing events to the output channel.

func (*PostgresCDCSource) Stop

func (s *PostgresCDCSource) Stop(ctx context.Context) error

Stop gracefully shuts down the CDC listener.

func (*PostgresCDCSource) Type

func (s *PostgresCDCSource) Type() string

Type returns the connector type identifier.

type RedisStreamClient

type RedisStreamClient interface {
	// Connect establishes the Redis connection.
	Connect(ctx context.Context, addr string) error
	// CreateGroup creates a consumer group if it does not already exist.
	// The "startID" parameter is typically "$" (latest) or "0" (all history).
	CreateGroup(ctx context.Context, stream, group, startID string) error
	// ReadGroup reads messages from the stream using the consumer group.
	// Returns up to count messages; blocks until messages are available or ctx is cancelled.
	ReadGroup(ctx context.Context, stream, group, consumer string, count int) ([]RedisStreamMessage, error)
	// Ack acknowledges processing of the given message IDs.
	Ack(ctx context.Context, stream, group string, ids ...string) error
	// Close closes the Redis connection.
	Close() error
}

RedisStreamClient abstracts the Redis Streams API for testability. In production, this would wrap go-redis; in tests, a mock implementation is used.

type RedisStreamConfig

type RedisStreamConfig struct {
	Addr      string `json:"addr" yaml:"addr"`
	Stream    string `json:"stream" yaml:"stream"`
	Group     string `json:"group" yaml:"group"`       // consumer group name
	Consumer  string `json:"consumer" yaml:"consumer"` // consumer instance name
	BatchSize int    `json:"batch_size" yaml:"batch_size"`
}

RedisStreamConfig holds the configuration for the Redis Streams source.

type RedisStreamMessage

type RedisStreamMessage struct {
	ID     string            // stream message ID (e.g., "1645000000000-0")
	Fields map[string]string // key-value pairs from the stream entry
}

RedisStreamMessage represents a single message read from a Redis Stream.

type RedisStreamSource

type RedisStreamSource struct {
	// contains filtered or unexported fields
}

RedisStreamSource is an EventSource that reads from a Redis Stream using consumer groups. Each message is converted to a CloudEvents Event.

func NewRedisStreamSource

func NewRedisStreamSource(name string, config map[string]any) (*RedisStreamSource, error)

NewRedisStreamSource creates a new RedisStreamSource from a config map. Supported config keys: addr, stream, group, consumer, batch_size.

func NewRedisStreamSourceWithClient

func NewRedisStreamSourceWithClient(name string, config map[string]any, client RedisStreamClient) (*RedisStreamSource, error)

NewRedisStreamSourceWithClient creates a RedisStreamSource with a custom client. This is primarily used for testing with mock clients.

func (*RedisStreamSource) Checkpoint

func (s *RedisStreamSource) Checkpoint(_ context.Context) error

Checkpoint is a no-op; Redis Streams tracks position via consumer groups.

func (*RedisStreamSource) Healthy

func (s *RedisStreamSource) Healthy() bool

Healthy returns true when the source is connected and reading.

func (*RedisStreamSource) Name

func (s *RedisStreamSource) Name() string

Name returns the connector instance name.

func (*RedisStreamSource) Start

func (s *RedisStreamSource) Start(ctx context.Context, output chan<- connector.Event) error

Start begins reading from the Redis Stream and writing events to the output channel.

func (*RedisStreamSource) Stop

func (s *RedisStreamSource) Stop(ctx context.Context) error

Stop gracefully shuts down the Redis Stream reader.

func (*RedisStreamSource) Type

func (s *RedisStreamSource) Type() string

Type returns the connector type identifier.

type SQSBatchEntry

type SQSBatchEntry struct {
	ID         string
	Body       string
	Attributes map[string]string
}

SQSBatchEntry represents a single entry in a SendMessageBatch call.

type SQSBatchResult

type SQSBatchResult struct {
	ID        string
	MessageID string
	Error     error
}

SQSBatchResult represents the result of a single entry in a SendMessageBatch call.

type SQSClient

type SQSClient interface {
	// ReceiveMessages polls the SQS queue for up to maxMessages.
	ReceiveMessages(ctx context.Context, queueURL string, maxMessages, waitTimeSeconds int) ([]SQSMessage, error)
	// DeleteMessage removes a message from the queue after successful processing.
	DeleteMessage(ctx context.Context, queueURL, receiptHandle string) error
	// SendMessage sends a message to the SQS queue.
	SendMessage(ctx context.Context, queueURL, body string, attributes map[string]string) (messageID string, err error)
	// SendMessageBatch sends multiple messages to the SQS queue.
	SendMessageBatch(ctx context.Context, queueURL string, entries []SQSBatchEntry) ([]SQSBatchResult, error)
}

SQSClient abstracts the AWS SQS API for testability. In production, this would wrap the AWS SDK v2 SQS client; in tests, a mock is used.

type SQSConfig

type SQSConfig struct {
	QueueURL        string `json:"queue_url" yaml:"queue_url"`
	Region          string `json:"region" yaml:"region"`
	MaxMessages     int    `json:"max_messages" yaml:"max_messages"`
	WaitTimeSeconds int    `json:"wait_time_seconds" yaml:"wait_time_seconds"`
}

SQSConfig holds the configuration for the AWS SQS source and sink.

type SQSMessage

type SQSMessage struct {
	MessageID     string
	ReceiptHandle string
	Body          string
	Attributes    map[string]string
}

SQSMessage represents a single message received from SQS.

type SQSSink

type SQSSink struct {
	// contains filtered or unexported fields
}

SQSSink is an EventSink that delivers events to an AWS SQS queue.

func NewSQSSink

func NewSQSSink(name string, config map[string]any) (*SQSSink, error)

NewSQSSink creates a new SQSSink from a config map. Supported config keys: queue_url, region.

func NewSQSSinkWithClient

func NewSQSSinkWithClient(name string, config map[string]any, client SQSClient) (*SQSSink, error)

NewSQSSinkWithClient creates an SQSSink with a custom SQS client. This is primarily used for testing with mock clients.

func (*SQSSink) Deliver

func (s *SQSSink) Deliver(ctx context.Context, event connector.Event) error

Deliver sends a single event to the SQS queue.

func (*SQSSink) DeliverBatch

func (s *SQSSink) DeliverBatch(ctx context.Context, events []connector.Event) []error

DeliverBatch sends multiple events to the SQS queue. Returns per-event errors.

func (*SQSSink) Healthy

func (s *SQSSink) Healthy() bool

Healthy returns true when the sink is operational.

func (*SQSSink) Name

func (s *SQSSink) Name() string

Name returns the connector instance name.

func (*SQSSink) Stop

func (s *SQSSink) Stop(_ context.Context) error

Stop marks the sink as unhealthy.

func (*SQSSink) Type

func (s *SQSSink) Type() string

Type returns the connector type identifier.

type SQSSource

type SQSSource struct {
	// contains filtered or unexported fields
}

SQSSource is an EventSource that polls an AWS SQS queue and emits CloudEvents-compatible events for each message received.

func NewSQSSource

func NewSQSSource(name string, config map[string]any) (*SQSSource, error)

NewSQSSource creates a new SQSSource from a config map. Supported config keys: queue_url, region, max_messages, wait_time_seconds.

func NewSQSSourceWithClient

func NewSQSSourceWithClient(name string, config map[string]any, client SQSClient) (*SQSSource, error)

NewSQSSourceWithClient creates an SQSSource with a custom SQS client. This is primarily used for testing with mock clients.

func (*SQSSource) Checkpoint

func (s *SQSSource) Checkpoint(_ context.Context) error

Checkpoint is a no-op for SQS (messages are deleted after processing).

func (*SQSSource) Healthy

func (s *SQSSource) Healthy() bool

Healthy returns true when the source is polling.

func (*SQSSource) Name

func (s *SQSSource) Name() string

Name returns the connector instance name.

func (*SQSSource) Start

func (s *SQSSource) Start(ctx context.Context, output chan<- connector.Event) error

Start begins polling the SQS queue and writing events to the output channel.

func (*SQSSource) Stop

func (s *SQSSource) Stop(_ context.Context) error

Stop gracefully shuts down the SQS poller.

func (*SQSSource) Type

func (s *SQSSource) Type() string

Type returns the connector type identifier.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL