Documentation
¶
Index ¶
- Constants
- Variables
- func GetKafkaHeader(message *kafka.Message, key string) string
- func GetKafkaIntHeader(message *kafka.Message, name string) (int, error)
- func GetKafkaObjectHeader(message *kafka.Message, name string) (map[string]any, error)
- func GetKafkaTimeHeader(message *kafka.Message, name string) (time.Time, error)
- func KafkaErrorCode(err error) string
- func KafkaRetryableError(err error) bool
- func PutKafkaHeader(headers *[]kafka.Header, key string, value string)
- type DummyPartitionSelector
- type FailoverDestination
- type FailoverLogger
- type FailoverLoggerConfig
- type FailoverLoggerEnvConfig
- type KafkaConfig
- type LocalFileDestination
- type MetricsLabelsFunc
- type PartitionSelector
- type Producer
- type S3Destination
Constants ¶
View Source
const MessageIdHeader = "message_id"
Variables ¶
View Source
var ( ProducerMessages = func(topicId, destinationId, mode, tableName, status, errorType string) prometheus.Counter { return producerMessages.WithLabelValues(topicId, destinationId, mode, tableName, status, errorType) } ProducerQueueLength = promauto.NewGauge(prometheus.GaugeOpts{ Namespace: "bulkerapp", Subsystem: "producer", Name: "queue_length", }) )
Functions ¶
func GetKafkaObjectHeader ¶
func GetKafkaTimeHeader ¶
func KafkaErrorCode ¶
func KafkaRetryableError ¶
Types ¶
type DummyPartitionSelector ¶
type DummyPartitionSelector struct {
}
func (*DummyPartitionSelector) SelectPartition ¶
func (dps *DummyPartitionSelector) SelectPartition() int32
type FailoverDestination ¶
FailoverDestination represents where to store rotated logs
type FailoverLogger ¶
FailoverLogger handles failed Kafka messages
func NewFailoverLogger ¶
func NewFailoverLogger(config *FailoverLoggerConfig) (*FailoverLogger, error)
func (*FailoverLogger) Close ¶
func (f *FailoverLogger) Close() error
func (*FailoverLogger) LogPayload ¶
func (f *FailoverLogger) LogPayload(payload []byte) error
func (*FailoverLogger) ShouldLog ¶
func (f *FailoverLogger) ShouldLog(err error) bool
func (*FailoverLogger) Start ¶
func (f *FailoverLogger) Start()
type FailoverLoggerConfig ¶
type FailoverLoggerConfig struct {
Enabled bool
LogAllMessages bool // Log all delivery reports, not just errors
BasePath string
RotationPeriod time.Duration
MaxSize int64 // Max size in bytes
Destinations []FailoverDestination
CompressOnRotate bool
}
FailoverLoggerConfig configuration for failover logger
type FailoverLoggerEnvConfig ¶
type FailoverLoggerEnvConfig struct {
// Enable failover logger
FailoverLoggerEnabled bool `mapstructure:"FAILOVER_LOGGER_ENABLED" default:"false"`
// Log all messages (not just errors)
FailoverLoggerLogAllMessages bool `mapstructure:"FAILOVER_LOGGER_LOG_ALL_MESSAGES" default:"false"`
// Base path for temporary files
FailoverLoggerBasePath string `mapstructure:"FAILOVER_LOGGER_BASE_PATH" default:"/tmp/kafka_failover"`
// Rotation settings
FailoverLoggerRotationPeriodMinutes int `mapstructure:"FAILOVER_LOGGER_ROTATION_PERIOD_MINUTES" default:"60"`
FailoverLoggerMaxSizeMB int64 `mapstructure:"FAILOVER_LOGGER_MAX_SIZE_MB" default:"100"`
FailoverLoggerCompressOnRotate bool `mapstructure:"FAILOVER_LOGGER_COMPRESS" default:"true"`
// Local destination settings
FailoverLoggerLocalMaxOldFiles int `mapstructure:"FAILOVER_LOGGER_LOCAL_MAX_OLD_FILES" default:"10"`
// S3 destination settings
FailoverLoggerS3Enabled bool `mapstructure:"FAILOVER_LOGGER_S3_ENABLED" default:"false"`
FailoverLoggerS3Bucket string `mapstructure:"FAILOVER_LOGGER_S3_BUCKET"`
FailoverLoggerS3Prefix string `mapstructure:"FAILOVER_LOGGER_S3_PREFIX" default:""`
FailoverLoggerS3Region string `mapstructure:"FAILOVER_LOGGER_S3_REGION" default:"us-east-1"`
}
FailoverLoggerEnvConfig for parsing failover logger config from environment
func (*FailoverLoggerEnvConfig) PostInit ¶
func (c *FailoverLoggerEnvConfig) PostInit(settings *appbase.AppSettings) error
func (*FailoverLoggerEnvConfig) ToFailoverLoggerConfig ¶
func (c *FailoverLoggerEnvConfig) ToFailoverLoggerConfig() (*FailoverLoggerConfig, error)
ToFailoverLoggerConfig converts environment config to FailoverLoggerConfig
type KafkaConfig ¶
type KafkaConfig struct {
// KafkaBootstrapServers List of Kafka brokers separated by comma. Each broker should be in format host:port.
KafkaBootstrapServers string `mapstructure:"KAFKA_BOOTSTRAP_SERVERS"`
KafkaSSL bool `mapstructure:"KAFKA_SSL" default:"false"`
KafkaSSLSkipVerify bool `mapstructure:"KAFKA_SSL_SKIP_VERIFY" default:"false"`
KafkaSSLCA string `mapstructure:"KAFKA_SSL_CA"`
KafkaSSLCAFile string `mapstructure:"KAFKA_SSL_CA_FILE"`
KafkaSecurityProtocol string `mapstructure:"KAFKA_SECURITY_PROTOCOL"`
// Kafka authorization as JSON object {"mechanism": "SCRAM-SHA-256|PLAIN", "username": "user", "password": "password"}
KafkaSASL string `mapstructure:"KAFKA_SASL"`
KafkaSessionTimeoutMs int `mapstructure:"KAFKA_SESSION_TIMEOUT_MS" default:"45000"`
KafkaMaxPollIntervalMs int `mapstructure:"KAFKA_MAX_POLL_INTERVAL_MS" default:"300000"`
KafkaTopicCompression string `mapstructure:"KAFKA_TOPIC_COMPRESSION" default:"snappy"`
KafkaTopicRetentionHours int `mapstructure:"KAFKA_TOPIC_RETENTION_HOURS" default:"48"`
KafkaTopicSegmentHours int `mapstructure:"KAFKA_TOPIC_SEGMENT_HOURS" default:"24"`
KafkaTopicPrefix string `mapstructure:"KAFKA_TOPIC_PREFIX" default:""`
KafkaFetchMessageMaxBytes int `mapstructure:"KAFKA_FETCH_MESSAGE_MAX_BYTES" default:"1048576"`
KafkaRetryTopicSegmentBytes int `mapstructure:"KAFKA_RETRY_TOPIC_SEGMENT_BYTES" default:"104857600"`
KafkaRetryTopicRetentionHours int `mapstructure:"KAFKA_RETRY_TOPIC_RETENTION_HOURS" default:"48"`
KafkaDeadTopicRetentionHours int `mapstructure:"KAFKA_DEAD_TOPIC_RETENTION_HOURS" default:"168"`
KafkaTopicReplicationFactor int `mapstructure:"KAFKA_TOPIC_REPLICATION_FACTOR"`
KafkaAdminMetadataTimeoutMs int `mapstructure:"KAFKA_ADMIN_METADATA_TIMEOUT_MS" default:"1000"`
KafkaConsumerPartitionsAssigmentStrategy string `mapstructure:"KAFKA_CONSUMER_PARTITIONS_ASSIGMENT_STRATEGY" default:"roundrobin"`
// KafkaDestinationsTopicName destination topic for /ingest endpoint
KafkaDestinationsTopicName string `mapstructure:"KAFKA_DESTINATIONS_TOPIC_NAME" default:"destination-messages"`
KafkaDestinationsTopicPartitions int `mapstructure:"KAFKA_DESTINATIONS_TOPIC_PARTITIONS" default:"16"`
KafkaDestinationsRetryTopicName string `mapstructure:"KAFKA_DESTINATIONS_RETRY_TOPIC_NAME" default:"destination-messages-retry"`
KafkaDestinationsDeadLetterTopicName string `mapstructure:"KAFKA_DESTINATIONS_DEAD_LETTER_TOPIC_NAME" default:"destination-messages-dead-letter"`
// ProducerWaitForDeliveryMs For ProduceSync only is a timeout for producer to wait for delivery report.
ProducerQueueSize int `mapstructure:"PRODUCER_QUEUE_SIZE" default:"100000"`
// ProducerQueueSizeThreshold when queue size reaches this value, health check will return unhealthy
ProducerQueueSizeThreshold float64 `mapstructure:"PRODUCER_QUEUE_SIZE_THRESHOLD" default:"0.5"`
ProducerBatchSize int `mapstructure:"PRODUCER_BATCH_SIZE" default:"65535"`
ProducerLingerMs int `mapstructure:"PRODUCER_LINGER_MS" default:"1000"`
ProducerWaitForDeliveryMs int `mapstructure:"PRODUCER_WAIT_FOR_DELIVERY_MS" default:"1000"`
// Failover logger configuration
FailoverLoggerEnvConfig `mapstructure:",squash"`
}
func (*KafkaConfig) GetKafkaConfig ¶
func (ac *KafkaConfig) GetKafkaConfig() *kafka.ConfigMap
GetKafkaConfig returns kafka config
func (*KafkaConfig) PostInit ¶
func (c *KafkaConfig) PostInit(settings *appbase.AppSettings) error
type LocalFileDestination ¶
type LocalFileDestination struct {
// contains filtered or unexported fields
}
LocalFileDestination stores files locally
func NewLocalFileDestination ¶
func NewLocalFileDestination(basePath string, maxOldFiles int, keepCompressed bool) *LocalFileDestination
func (*LocalFileDestination) Name ¶
func (l *LocalFileDestination) Name() string
func (*LocalFileDestination) Store ¶
func (l *LocalFileDestination) Store(filePath string) error
type MetricsLabelsFunc ¶
type PartitionSelector ¶
type PartitionSelector interface {
SelectPartition() int32
}
type Producer ¶
func NewProducer ¶
func NewProducer(config *KafkaConfig, kafkaConfig *kafka.ConfigMap, reportQueueLength bool, metricsLabelFunc MetricsLabelsFunc) (*Producer, error)
NewProducer creates new Producer
func (*Producer) ProduceAsync ¶
func (p *Producer) ProduceAsync(topic string, messageKey string, event []byte, headers map[string]string, partition int32, messageId string, failover bool) error
ProduceAsync TODO: transactional delivery? produces messages to kafka
func (*Producer) ProduceSync ¶
ProduceSync TODO: transactional delivery? produces messages to kafka
type S3Destination ¶
type S3Destination struct {
// contains filtered or unexported fields
}
S3Destination stores files in S3
func NewS3Destination ¶
func NewS3Destination(bucket, prefix string, awsConfig aws.Config) (*S3Destination, error)
func (*S3Destination) Name ¶
func (s *S3Destination) Name() string
func (*S3Destination) Store ¶
func (s *S3Destination) Store(filePath string) error
Click to show internal directories.
Click to hide internal directories.