kafkabase

package module
v0.0.0-...-e6d1e93 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 9, 2026 License: MIT Imports: 25 Imported by: 4

Documentation

Index

Constants

View Source
const MessageIdHeader = "message_id"

Variables

View Source
var (
	ProducerMessages = func(topicId, destinationId, mode, tableName, status, errorType string) prometheus.Counter {
		return producerMessages.WithLabelValues(topicId, destinationId, mode, tableName, status, errorType)
	}

	ProducerQueueLength = promauto.NewGauge(prometheus.GaugeOpts{
		Namespace: "bulkerapp",
		Subsystem: "producer",
		Name:      "queue_length",
	})
)

Functions

func GetKafkaHeader

func GetKafkaHeader(message *kafka.Message, key string) string

func GetKafkaIntHeader

func GetKafkaIntHeader(message *kafka.Message, name string) (int, error)

func GetKafkaObjectHeader

func GetKafkaObjectHeader(message *kafka.Message, name string) (map[string]any, error)

func GetKafkaTimeHeader

func GetKafkaTimeHeader(message *kafka.Message, name string) (time.Time, error)

func KafkaErrorCode

func KafkaErrorCode(err error) string

func KafkaRetryableError

func KafkaRetryableError(err error) bool

func PutKafkaHeader

func PutKafkaHeader(headers *[]kafka.Header, key string, value string)

Types

type DummyPartitionSelector

type DummyPartitionSelector struct {
}

func (*DummyPartitionSelector) SelectPartition

func (dps *DummyPartitionSelector) SelectPartition() int32

type FailoverDestination

type FailoverDestination interface {
	Store(filePath string) error
	Name() string
}

FailoverDestination represents where to store rotated logs

type FailoverLogger

type FailoverLogger struct {
	appbase.Service
	// contains filtered or unexported fields
}

FailoverLogger handles failed Kafka messages

func NewFailoverLogger

func NewFailoverLogger(config *FailoverLoggerConfig) (*FailoverLogger, error)

func (*FailoverLogger) Close

func (f *FailoverLogger) Close() error

func (*FailoverLogger) LogPayload

func (f *FailoverLogger) LogPayload(payload []byte) error

func (*FailoverLogger) ShouldLog

func (f *FailoverLogger) ShouldLog(err error) bool

func (*FailoverLogger) Start

func (f *FailoverLogger) Start()

type FailoverLoggerConfig

type FailoverLoggerConfig struct {
	Enabled          bool
	LogAllMessages   bool // Log all delivery reports, not just errors
	BasePath         string
	RotationPeriod   time.Duration
	MaxSize          int64 // Max size in bytes
	Destinations     []FailoverDestination
	CompressOnRotate bool
}

FailoverLoggerConfig configuration for failover logger

type FailoverLoggerEnvConfig

type FailoverLoggerEnvConfig struct {
	// Enable failover logger
	FailoverLoggerEnabled bool `mapstructure:"FAILOVER_LOGGER_ENABLED" default:"false"`

	// Log all messages (not just errors)
	FailoverLoggerLogAllMessages bool `mapstructure:"FAILOVER_LOGGER_LOG_ALL_MESSAGES" default:"false"`

	// Base path for temporary files
	FailoverLoggerBasePath string `mapstructure:"FAILOVER_LOGGER_BASE_PATH" default:"/tmp/kafka_failover"`

	// Rotation settings
	FailoverLoggerRotationPeriodMinutes int   `mapstructure:"FAILOVER_LOGGER_ROTATION_PERIOD_MINUTES" default:"60"`
	FailoverLoggerMaxSizeMB             int64 `mapstructure:"FAILOVER_LOGGER_MAX_SIZE_MB" default:"100"`
	FailoverLoggerCompressOnRotate      bool  `mapstructure:"FAILOVER_LOGGER_COMPRESS" default:"true"`

	// Local destination settings
	FailoverLoggerLocalMaxOldFiles int `mapstructure:"FAILOVER_LOGGER_LOCAL_MAX_OLD_FILES" default:"10"`

	// S3 destination settings
	FailoverLoggerS3Enabled bool   `mapstructure:"FAILOVER_LOGGER_S3_ENABLED" default:"false"`
	FailoverLoggerS3Bucket  string `mapstructure:"FAILOVER_LOGGER_S3_BUCKET"`
	FailoverLoggerS3Prefix  string `mapstructure:"FAILOVER_LOGGER_S3_PREFIX" default:""`
	FailoverLoggerS3Region  string `mapstructure:"FAILOVER_LOGGER_S3_REGION" default:"us-east-1"`
}

FailoverLoggerEnvConfig for parsing failover logger config from environment

func (*FailoverLoggerEnvConfig) PostInit

func (c *FailoverLoggerEnvConfig) PostInit(settings *appbase.AppSettings) error

func (*FailoverLoggerEnvConfig) ToFailoverLoggerConfig

func (c *FailoverLoggerEnvConfig) ToFailoverLoggerConfig() (*FailoverLoggerConfig, error)

ToFailoverLoggerConfig converts environment config to FailoverLoggerConfig

type KafkaConfig

type KafkaConfig struct {
	// KafkaBootstrapServers List of Kafka brokers separated by comma. Each broker should be in format host:port.
	KafkaBootstrapServers string `mapstructure:"KAFKA_BOOTSTRAP_SERVERS"`
	KafkaSSL              bool   `mapstructure:"KAFKA_SSL" default:"false"`
	KafkaSSLSkipVerify    bool   `mapstructure:"KAFKA_SSL_SKIP_VERIFY" default:"false"`
	KafkaSSLCA            string `mapstructure:"KAFKA_SSL_CA"`
	KafkaSSLCAFile        string `mapstructure:"KAFKA_SSL_CA_FILE"`
	KafkaSecurityProtocol string `mapstructure:"KAFKA_SECURITY_PROTOCOL"`

	// Kafka authorization as JSON object {"mechanism": "SCRAM-SHA-256|PLAIN", "username": "user", "password": "password"}
	KafkaSASL string `mapstructure:"KAFKA_SASL"`

	KafkaSessionTimeoutMs     int    `mapstructure:"KAFKA_SESSION_TIMEOUT_MS" default:"45000"`
	KafkaMaxPollIntervalMs    int    `mapstructure:"KAFKA_MAX_POLL_INTERVAL_MS" default:"300000"`
	KafkaTopicCompression     string `mapstructure:"KAFKA_TOPIC_COMPRESSION" default:"snappy"`
	KafkaTopicRetentionHours  int    `mapstructure:"KAFKA_TOPIC_RETENTION_HOURS" default:"48"`
	KafkaTopicSegmentHours    int    `mapstructure:"KAFKA_TOPIC_SEGMENT_HOURS" default:"24"`
	KafkaTopicPrefix          string `mapstructure:"KAFKA_TOPIC_PREFIX" default:""`
	KafkaFetchMessageMaxBytes int    `mapstructure:"KAFKA_FETCH_MESSAGE_MAX_BYTES" default:"1048576"`

	KafkaRetryTopicSegmentBytes              int    `mapstructure:"KAFKA_RETRY_TOPIC_SEGMENT_BYTES" default:"104857600"`
	KafkaRetryTopicRetentionHours            int    `mapstructure:"KAFKA_RETRY_TOPIC_RETENTION_HOURS" default:"48"`
	KafkaDeadTopicRetentionHours             int    `mapstructure:"KAFKA_DEAD_TOPIC_RETENTION_HOURS" default:"168"`
	KafkaTopicReplicationFactor              int    `mapstructure:"KAFKA_TOPIC_REPLICATION_FACTOR"`
	KafkaAdminMetadataTimeoutMs              int    `mapstructure:"KAFKA_ADMIN_METADATA_TIMEOUT_MS" default:"1000"`
	KafkaConsumerPartitionsAssigmentStrategy string `mapstructure:"KAFKA_CONSUMER_PARTITIONS_ASSIGMENT_STRATEGY" default:"roundrobin"`

	// KafkaDestinationsTopicName destination topic for /ingest endpoint
	KafkaDestinationsTopicName       string `mapstructure:"KAFKA_DESTINATIONS_TOPIC_NAME" default:"destination-messages"`
	KafkaDestinationsTopicPartitions int    `mapstructure:"KAFKA_DESTINATIONS_TOPIC_PARTITIONS" default:"16"`

	KafkaDestinationsRetryTopicName      string `mapstructure:"KAFKA_DESTINATIONS_RETRY_TOPIC_NAME" default:"destination-messages-retry"`
	KafkaDestinationsDeadLetterTopicName string `mapstructure:"KAFKA_DESTINATIONS_DEAD_LETTER_TOPIC_NAME" default:"destination-messages-dead-letter"`

	// ProducerWaitForDeliveryMs For ProduceSync only is a timeout for producer to wait for delivery report.
	ProducerQueueSize int `mapstructure:"PRODUCER_QUEUE_SIZE" default:"100000"`
	// ProducerQueueSizeThreshold when queue size reaches this value, health check will return unhealthy
	ProducerQueueSizeThreshold float64 `mapstructure:"PRODUCER_QUEUE_SIZE_THRESHOLD" default:"0.5"`
	ProducerBatchSize          int     `mapstructure:"PRODUCER_BATCH_SIZE" default:"65535"`
	ProducerLingerMs           int     `mapstructure:"PRODUCER_LINGER_MS" default:"1000"`
	ProducerWaitForDeliveryMs  int     `mapstructure:"PRODUCER_WAIT_FOR_DELIVERY_MS" default:"1000"`

	// Failover logger configuration
	FailoverLoggerEnvConfig `mapstructure:",squash"`
}

func (*KafkaConfig) GetKafkaConfig

func (ac *KafkaConfig) GetKafkaConfig() *kafka.ConfigMap

GetKafkaConfig returns kafka config

func (*KafkaConfig) PostInit

func (c *KafkaConfig) PostInit(settings *appbase.AppSettings) error

type LocalFileDestination

type LocalFileDestination struct {
	// contains filtered or unexported fields
}

LocalFileDestination stores files locally

func NewLocalFileDestination

func NewLocalFileDestination(basePath string, maxOldFiles int, keepCompressed bool) *LocalFileDestination

func (*LocalFileDestination) Name

func (l *LocalFileDestination) Name() string

func (*LocalFileDestination) Store

func (l *LocalFileDestination) Store(filePath string) error

type MetricsLabelsFunc

type MetricsLabelsFunc func(topicId string, status, errText string) (topic, destinationId, mode, tableName, st string, err string)

type PartitionSelector

type PartitionSelector interface {
	SelectPartition() int32
}

type Producer

type Producer struct {
	appbase.Service
	// contains filtered or unexported fields
}

func NewProducer

func NewProducer(config *KafkaConfig, kafkaConfig *kafka.ConfigMap, reportQueueLength bool, metricsLabelFunc MetricsLabelsFunc) (*Producer, error)

NewProducer creates new Producer

func (*Producer) Close

func (p *Producer) Close() error

Close closes producer

func (*Producer) ProduceAsync

func (p *Producer) ProduceAsync(topic string, messageKey string, event []byte, headers map[string]string, partition int32, messageId string, failover bool) error

ProduceAsync TODO: transactional delivery? produces messages to kafka

func (*Producer) ProduceSync

func (p *Producer) ProduceSync(topic string, event kafka.Message) error

ProduceSync TODO: transactional delivery? produces messages to kafka

func (*Producer) QueueSize

func (p *Producer) QueueSize() (int, error)

func (*Producer) Start

func (p *Producer) Start()

type S3Destination

type S3Destination struct {
	// contains filtered or unexported fields
}

S3Destination stores files in S3

func NewS3Destination

func NewS3Destination(bucket, prefix string, awsConfig aws.Config) (*S3Destination, error)

func (*S3Destination) Name

func (s *S3Destination) Name() string

func (*S3Destination) Store

func (s *S3Destination) Store(filePath string) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL