Documentation
¶
Index ¶
- Constants
- Variables
- func NewExtension(opts ...ConfigOption) forge.Extension
- func NewExtensionWithConfig(config Config) forge.Extension
- type ClientStats
- type Config
- type ConfigOption
- func WithBrokers(brokers ...string) ConfigOption
- func WithClientID(clientID string) ConfigOption
- func WithCompression(compression string) ConfigOption
- func WithConfig(config Config) ConfigOption
- func WithConsumer(enabled bool) ConfigOption
- func WithConsumerGroup(groupID string) ConfigOption
- func WithIdempotent(enabled bool) ConfigOption
- func WithMetrics(enable bool) ConfigOption
- func WithProducer(enabled bool) ConfigOption
- func WithRequireConfig(require bool) ConfigOption
- func WithSASL(mechanism, username, password string) ConfigOption
- func WithTLS(certFile, keyFile, caFile string, skipVerify bool) ConfigOption
- func WithTracing(enable bool) ConfigOption
- func WithVersion(version string) ConfigOption
- type ConsumerGroupHandler
- type Extension
- type HashGeneratorFcn
- type Kafka
- type KafkaService
- func (s *KafkaService) Client() Kafka
- func (s *KafkaService) Close() error
- func (s *KafkaService) Consume(ctx context.Context, topics []string, handler MessageHandler, ...) error
- func (s *KafkaService) CreateTopic(ctx context.Context, topic string, partitions int, replicationFactor int16) error
- func (s *KafkaService) DeleteTopic(ctx context.Context, topic string) error
- func (s *KafkaService) GetTopicMetadata(ctx context.Context, topic string) (*TopicMetadata, error)
- func (s *KafkaService) Health(ctx context.Context) error
- func (s *KafkaService) ListTopics(ctx context.Context) ([]string, error)
- func (s *KafkaService) Name() string
- func (s *KafkaService) Ping(ctx context.Context) error
- func (s *KafkaService) Produce(ctx context.Context, message Message) error
- func (s *KafkaService) ProduceBatch(ctx context.Context, messages []Message) error
- func (s *KafkaService) Start(ctx context.Context) error
- func (s *KafkaService) Stop(ctx context.Context) error
- func (s *KafkaService) StopConsuming() error
- type MessageHandler
- type MessageHeader
- type PartitionMetadata
- type ProducerMessage
- type TopicConfig
- type TopicMetadata
- type XDGSCRAMClient
Constants ¶
const (
// ServiceKey is the DI key for the Kafka service.
ServiceKey = "kafka"
)
DI container keys for Kafka extension services.
Variables ¶
var ( // ErrClientNotInitialized is returned when client is not initialized ErrClientNotInitialized = errors.New("kafka: client not initialized") // ErrProducerNotEnabled is returned when producer operations are attempted but producer is disabled ErrProducerNotEnabled = errors.New("kafka: producer not enabled") // ErrConsumerNotEnabled is returned when consumer operations are attempted but consumer is disabled ErrConsumerNotEnabled = errors.New("kafka: consumer not enabled") // ErrAlreadyConsuming is returned when attempting to start consuming while already consuming ErrAlreadyConsuming = errors.New("kafka: already consuming") // ErrNotConsuming is returned when attempting to stop consuming while not consuming ErrNotConsuming = errors.New("kafka: not consuming") // ErrInConsumerGroup is returned when attempting consumer operations while in consumer group ErrInConsumerGroup = errors.New("kafka: already in consumer group") // ErrNotInConsumerGroup is returned when attempting to leave consumer group while not in one ErrNotInConsumerGroup = errors.New("kafka: not in consumer group") // ErrSendFailed is returned when message send fails ErrSendFailed = errors.New("kafka: send failed") // ErrConsumeFailed is returned when message consumption fails ErrConsumeFailed = errors.New("kafka: consume failed") // ErrTopicNotFound is returned when topic doesn't exist ErrTopicNotFound = errors.New("kafka: topic not found") // ErrInvalidPartition is returned when partition is invalid ErrInvalidPartition = errors.New("kafka: invalid partition") // ErrConnectionFailed is returned when connection fails ErrConnectionFailed = errors.New("kafka: connection failed") // ErrClientClosed is returned when operations are attempted on closed client ErrClientClosed = errors.New("kafka: client closed") )
Functions ¶
func NewExtension ¶
func NewExtension(opts ...ConfigOption) forge.Extension
NewExtension creates a new Kafka extension
func NewExtensionWithConfig ¶
NewExtensionWithConfig creates a new Kafka extension with a complete config
Types ¶
type ClientStats ¶
type ClientStats struct {
Connected bool
ConnectTime time.Time
MessagesSent int64
MessagesReceived int64
BytesSent int64
BytesReceived int64
Errors int64
LastError error
LastErrorTime time.Time
ActiveConsumers int
ActiveProducers int
}
ClientStats contains client statistics
type Config ¶
type Config struct {
// Connection settings
Brokers []string `json:"brokers" yaml:"brokers" mapstructure:"brokers"`
ClientID string `json:"client_id" yaml:"client_id" mapstructure:"client_id"`
Version string `json:"version" yaml:"version" mapstructure:"version"` // Kafka version (e.g., "3.0.0")
DialTimeout time.Duration `json:"dial_timeout" yaml:"dial_timeout" mapstructure:"dial_timeout"`
ReadTimeout time.Duration `json:"read_timeout" yaml:"read_timeout" mapstructure:"read_timeout"`
WriteTimeout time.Duration `json:"write_timeout" yaml:"write_timeout" mapstructure:"write_timeout"`
KeepAlive time.Duration `json:"keep_alive" yaml:"keep_alive" mapstructure:"keep_alive"`
// TLS/SASL
EnableTLS bool `json:"enable_tls" yaml:"enable_tls" mapstructure:"enable_tls"`
TLSCertFile string `json:"tls_cert_file,omitempty" yaml:"tls_cert_file,omitempty" mapstructure:"tls_cert_file"`
TLSKeyFile string `json:"tls_key_file,omitempty" yaml:"tls_key_file,omitempty" mapstructure:"tls_key_file"`
TLSCAFile string `json:"tls_ca_file,omitempty" yaml:"tls_ca_file,omitempty" mapstructure:"tls_ca_file"`
TLSSkipVerify bool `json:"tls_skip_verify" yaml:"tls_skip_verify" mapstructure:"tls_skip_verify"`
EnableSASL bool `json:"enable_sasl" yaml:"enable_sasl" mapstructure:"enable_sasl"`
SASLMechanism string `json:"sasl_mechanism,omitempty" yaml:"sasl_mechanism,omitempty" mapstructure:"sasl_mechanism"` // PLAIN, SCRAM-SHA-256, SCRAM-SHA-512
SASLUsername string `json:"sasl_username,omitempty" yaml:"sasl_username,omitempty" mapstructure:"sasl_username"`
SASLPassword string `json:"sasl_password,omitempty" yaml:"sasl_password,omitempty" mapstructure:"sasl_password"`
// Producer settings
ProducerEnabled bool `json:"producer_enabled" yaml:"producer_enabled" mapstructure:"producer_enabled"`
ProducerMaxMessageBytes int `json:"producer_max_message_bytes" yaml:"producer_max_message_bytes" mapstructure:"producer_max_message_bytes"`
ProducerCompression string `json:"producer_compression" yaml:"producer_compression" mapstructure:"producer_compression"` // none, gzip, snappy, lz4, zstd
ProducerFlushMessages int `json:"producer_flush_messages" yaml:"producer_flush_messages" mapstructure:"producer_flush_messages"`
ProducerFlushFrequency time.Duration `json:"producer_flush_frequency" yaml:"producer_flush_frequency" mapstructure:"producer_flush_frequency"`
ProducerRetryMax int `json:"producer_retry_max" yaml:"producer_retry_max" mapstructure:"producer_retry_max"`
ProducerIdempotent bool `json:"producer_idempotent" yaml:"producer_idempotent" mapstructure:"producer_idempotent"`
ProducerAcks string `json:"producer_acks" yaml:"producer_acks" mapstructure:"producer_acks"` // none, local, all
// Consumer settings
ConsumerEnabled bool `json:"consumer_enabled" yaml:"consumer_enabled" mapstructure:"consumer_enabled"`
ConsumerGroupID string `json:"consumer_group_id,omitempty" yaml:"consumer_group_id,omitempty" mapstructure:"consumer_group_id"`
ConsumerOffsets string `json:"consumer_offsets" yaml:"consumer_offsets" mapstructure:"consumer_offsets"` // newest, oldest
ConsumerMaxWait time.Duration `json:"consumer_max_wait" yaml:"consumer_max_wait" mapstructure:"consumer_max_wait"`
ConsumerFetchMin int32 `json:"consumer_fetch_min" yaml:"consumer_fetch_min" mapstructure:"consumer_fetch_min"`
ConsumerFetchMax int32 `json:"consumer_fetch_max" yaml:"consumer_fetch_max" mapstructure:"consumer_fetch_max"`
ConsumerIsolation string `json:"consumer_isolation" yaml:"consumer_isolation" mapstructure:"consumer_isolation"` // read_uncommitted, read_committed
// Consumer group settings
ConsumerGroupRebalance string `json:"consumer_group_rebalance" yaml:"consumer_group_rebalance" mapstructure:"consumer_group_rebalance"` // range, roundrobin, sticky
ConsumerGroupSession time.Duration `json:"consumer_group_session" yaml:"consumer_group_session" mapstructure:"consumer_group_session"`
ConsumerGroupHeartbeat time.Duration `json:"consumer_group_heartbeat" yaml:"consumer_group_heartbeat" mapstructure:"consumer_group_heartbeat"`
// Metadata settings
MetadataRetryMax int `json:"metadata_retry_max" yaml:"metadata_retry_max" mapstructure:"metadata_retry_max"`
MetadataRetryBackoff time.Duration `json:"metadata_retry_backoff" yaml:"metadata_retry_backoff" mapstructure:"metadata_retry_backoff"`
MetadataRefreshFreq time.Duration `json:"metadata_refresh_freq" yaml:"metadata_refresh_freq" mapstructure:"metadata_refresh_freq"`
MetadataFullRefresh bool `json:"metadata_full_refresh" yaml:"metadata_full_refresh" mapstructure:"metadata_full_refresh"`
// Observability
EnableMetrics bool `json:"enable_metrics" yaml:"enable_metrics" mapstructure:"enable_metrics"`
EnableTracing bool `json:"enable_tracing" yaml:"enable_tracing" mapstructure:"enable_tracing"`
EnableLogging bool `json:"enable_logging" yaml:"enable_logging" mapstructure:"enable_logging"`
// Config loading flags
RequireConfig bool `json:"-" yaml:"-" mapstructure:"-"`
}
Config contains configuration for the Kafka extension
func (*Config) ToSaramaConfig ¶
ToSaramaConfig converts to Sarama configuration
type ConfigOption ¶
type ConfigOption func(*Config)
ConfigOption is a functional option for Config
func WithBrokers ¶
func WithBrokers(brokers ...string) ConfigOption
func WithClientID ¶
func WithClientID(clientID string) ConfigOption
func WithCompression ¶
func WithCompression(compression string) ConfigOption
func WithConfig ¶
func WithConfig(config Config) ConfigOption
func WithConsumer ¶
func WithConsumer(enabled bool) ConfigOption
func WithConsumerGroup ¶
func WithConsumerGroup(groupID string) ConfigOption
func WithIdempotent ¶
func WithIdempotent(enabled bool) ConfigOption
func WithMetrics ¶
func WithMetrics(enable bool) ConfigOption
func WithProducer ¶
func WithProducer(enabled bool) ConfigOption
func WithRequireConfig ¶
func WithRequireConfig(require bool) ConfigOption
func WithSASL ¶
func WithSASL(mechanism, username, password string) ConfigOption
func WithTLS ¶
func WithTLS(certFile, keyFile, caFile string, skipVerify bool) ConfigOption
func WithTracing ¶
func WithTracing(enable bool) ConfigOption
func WithVersion ¶
func WithVersion(version string) ConfigOption
type ConsumerGroupHandler ¶
type ConsumerGroupHandler interface {
Setup(sarama.ConsumerGroupSession) error
Cleanup(sarama.ConsumerGroupSession) error
ConsumeClaim(sarama.ConsumerGroupSession, sarama.ConsumerGroupClaim) error
}
ConsumerGroupHandler handles consumer group messages
type Extension ¶
type Extension struct {
*forge.BaseExtension
// contains filtered or unexported fields
}
Extension implements forge.Extension for Kafka functionality. The extension is now a lightweight facade that loads config and registers services.
func (*Extension) Health ¶
Health checks the extension health. Service health is managed by Vessel through KafkaService.Health().
type HashGeneratorFcn ¶
HashGeneratorFcn is a function type for generating hash functions
var ( // SHA256 is a hash generator for SHA-256 SHA256 HashGeneratorFcn = sha256.New // SHA512 is a hash generator for SHA-512 SHA512 HashGeneratorFcn = sha512.New )
type Kafka ¶
type Kafka interface {
// Producer operations
SendMessage(topic string, key, value []byte) error
SendMessageAsync(topic string, key, value []byte) error
SendMessages(messages []*ProducerMessage) error
// Consumer operations
Consume(ctx context.Context, topics []string, handler MessageHandler) error
ConsumePartition(ctx context.Context, topic string, partition int32, offset int64, handler MessageHandler) error
StopConsume() error
// Consumer group operations
JoinConsumerGroup(ctx context.Context, groupID string, topics []string, handler ConsumerGroupHandler) error
LeaveConsumerGroup(ctx context.Context) error
// Admin operations
CreateTopic(topic string, config TopicConfig) error
DeleteTopic(topic string) error
ListTopics() ([]string, error)
DescribeTopic(topic string) (*TopicMetadata, error)
GetPartitions(topic string) ([]int32, error)
GetOffset(topic string, partition int32, time int64) (int64, error)
// Client info
GetProducer() sarama.SyncProducer
GetAsyncProducer() sarama.AsyncProducer
GetConsumer() sarama.Consumer
GetConsumerGroup() sarama.ConsumerGroup
GetClient() sarama.Client
GetStats() ClientStats
// Lifecycle
Close() error
// Health
Ping(ctx context.Context) error
}
Kafka represents a unified Kafka client interface
type KafkaService ¶
type KafkaService struct {
// contains filtered or unexported fields
}
KafkaService wraps a Kafka client and provides lifecycle management. It implements vessel's di.Service interface so Vessel can manage its lifecycle.
func NewKafkaService ¶
func NewKafkaService(config Config, logger forge.Logger, metrics forge.Metrics) (*KafkaService, error)
NewKafkaService creates a new Kafka service with the given configuration. This is the constructor that will be registered with the DI container.
func (*KafkaService) Client ¶
func (s *KafkaService) Client() Kafka
Client returns the underlying Kafka client.
func (*KafkaService) Close ¶
func (s *KafkaService) Close() error
func (*KafkaService) Consume ¶
func (s *KafkaService) Consume(ctx context.Context, topics []string, handler MessageHandler, opts ...ConsumeOption) error
func (*KafkaService) CreateTopic ¶
func (*KafkaService) DeleteTopic ¶
func (s *KafkaService) DeleteTopic(ctx context.Context, topic string) error
func (*KafkaService) GetTopicMetadata ¶
func (s *KafkaService) GetTopicMetadata(ctx context.Context, topic string) (*TopicMetadata, error)
func (*KafkaService) Health ¶
func (s *KafkaService) Health(ctx context.Context) error
Health checks if the Kafka service is healthy.
func (*KafkaService) ListTopics ¶
func (s *KafkaService) ListTopics(ctx context.Context) ([]string, error)
func (*KafkaService) Name ¶
func (s *KafkaService) Name() string
Name returns the service name for Vessel's lifecycle management.
func (*KafkaService) Produce ¶
func (s *KafkaService) Produce(ctx context.Context, message Message) error
func (*KafkaService) ProduceBatch ¶
func (s *KafkaService) ProduceBatch(ctx context.Context, messages []Message) error
func (*KafkaService) Start ¶
func (s *KafkaService) Start(ctx context.Context) error
Start starts the Kafka service by verifying connection. This is called automatically by Vessel during container.Start().
func (*KafkaService) Stop ¶
func (s *KafkaService) Stop(ctx context.Context) error
Stop stops the Kafka service by closing the client. This is called automatically by Vessel during container.Stop().
func (*KafkaService) StopConsuming ¶
func (s *KafkaService) StopConsuming() error
type MessageHandler ¶
type MessageHandler func(message *sarama.ConsumerMessage) error
MessageHandler processes incoming Kafka messages
type MessageHeader ¶
MessageHeader represents a Kafka message header
type PartitionMetadata ¶
PartitionMetadata contains partition metadata
type ProducerMessage ¶
type ProducerMessage struct {
Topic string
Key []byte
Value []byte
Headers []MessageHeader
Partition int32
Offset int64
Timestamp time.Time
}
ProducerMessage represents a message to be produced
type TopicConfig ¶
type TopicConfig struct {
NumPartitions int32
ReplicationFactor int16
ConfigEntries map[string]*string
}
TopicConfig contains topic configuration
type TopicMetadata ¶
type TopicMetadata struct {
Name string
Partitions []PartitionMetadata
Config map[string]string
}
TopicMetadata contains topic metadata
type XDGSCRAMClient ¶
type XDGSCRAMClient struct {
*scram.Client
*scram.ClientConversation
HashGeneratorFcn scram.HashGeneratorFcn
}
XDGSCRAMClient implements the SCRAM client interface
func (*XDGSCRAMClient) Begin ¶
func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error)
Begin starts the SCRAM authentication process
func (*XDGSCRAMClient) Done ¶
func (x *XDGSCRAMClient) Done() bool
Done returns true if the authentication is complete