kafkaconsumer

package
v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 13, 2025 License: MIT Imports: 19 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultKafkaBrokers = "127.0.0.1:9094"

	DefaultKafkaConsumerPartition    = 0
	DefaultKafkaConsumerDialTimeout  = 30 * time.Second
	DefaultKafkaConsumerReadTimeout  = 30 * time.Second
	DefaultKafkaConsumerWriteTimeout = 30 * time.Second

	DefaultKafkaConsumerBackoff    = 2 * time.Second
	DefaultKafkaConsumerMaxRetries = 10
)

defaults values.

Variables

This section is empty.

Functions

func IsBrokersAreValid

func IsBrokersAreValid(brokers []string) error

IsBrokersAreValid validates if the given brokers are pointing valid tcp addrs.

func IsKafkaTopicValid

func IsKafkaTopicValid(s KafkaTopicIdentifier) error

IsKafkaTopicValid validates if given topic is supported.

func Run

func Run() error

Run runs kafa consumer.

Types

type Consumer

type Consumer struct {
	ConsumerFactory ConsumerFactoryFunc
	ConsumerConfig  ConsumerConfigFactoryFunc
	Logger          *slog.Logger
	Storage         storage.Storer
	Consumer        sarama.Consumer
	Topic           KafkaTopicIdentifier
	Brokers         []string
	DialTimeout     time.Duration
	ReadTimeout     time.Duration
	WriteTimeout    time.Duration
	Backoff         time.Duration
	MaxRetries      uint8
	Partition       int32
}

Consumer represents kafa consumer setup.

func New

func New(options ...Option) (*Consumer, error)

New instantiates new kafka consumer instance.

func (Consumer) ExtractHeadersFromMessage

func (Consumer) ExtractHeadersFromMessage(msg *sarama.ConsumerMessage) (*storage.GitHubWebhookData, error)

ExtractHeadersFromMessage extracts Kafka message headers.

func (Consumer) ExtractUserInfo

func (Consumer) ExtractUserInfo(payload any) (int64, string)

ExtractUserInfo extracts user information from payload.

func (*Consumer) Ping

func (c *Consumer) Ping() error

Ping checks kafka consumer availability and sets consumer instance.

func (Consumer) Start

func (c Consumer) Start() error

Start starts consumer.

func (Consumer) StoreGitHubMessage

func (c Consumer) StoreGitHubMessage(msg *sarama.ConsumerMessage) error

StoreGitHubMessage stores consumed message to database.

func (Consumer) UnmarshalPayload

func (Consumer) UnmarshalPayload(event github.Event, value []byte, target string) (any, error)

UnmarshalPayload unmarshals incoming value to event related struct.

func (Consumer) Worker

func (c Consumer) Worker(workerID int, messages <-chan *sarama.ConsumerMessage)

Worker drains message queue.

type ConsumerConfigFactoryFunc

type ConsumerConfigFactoryFunc func() *sarama.Config

ConsumerConfigFactoryFunc is a type for handling config.

type ConsumerFactoryFunc

type ConsumerFactoryFunc func(brokers []string, config *sarama.Config) (sarama.Consumer, error)

ConsumerFactoryFunc is a type for handling consumer.

type GitHubKafkaConsumer

type GitHubKafkaConsumer interface {
	KafkaConsumer
	StoreGitHubMessage(msg *sarama.ConsumerMessage) error
}

GitHubKafkaConsumer defines Kafka GitHub message consumer.

type KafkaConsumer

type KafkaConsumer interface {
	Start() error
	Ping() error
	Worker(workerID int, messages <-chan *sarama.ConsumerMessage)
}

KafkaConsumer defines kafka consumer behaviours.

type KafkaTopicIdentifier

type KafkaTopicIdentifier string

KafkaTopicIdentifier represents custom type.

const (
	KafkaTopicIdentifierGitHub KafkaTopicIdentifier = "github"
	KafkaTopicIdentifierGitLab KafkaTopicIdentifier = "gitlab"
)

constants.

func (KafkaTopicIdentifier) String

func (s KafkaTopicIdentifier) String() string

type Option

type Option func(*Consumer) error

Option represents option function type.

func WithBackoff

func WithBackoff(d time.Duration) Option

WithBackoff sets backoff duration.

func WithBrokers

func WithBrokers(brokers []string) Option

WithBrokers sets brokers list.

func WithDialTimeout

func WithDialTimeout(d time.Duration) Option

WithDialTimeout sets dial timeout.

func WithLogger

func WithLogger(l *slog.Logger) Option

WithLogger sets logger.

func WithMaxRetries

func WithMaxRetries(i int) Option

WithMaxRetries sets max retries value.

func WithPartition

func WithPartition(i int) Option

WithPartition sets partition.

func WithReadTimeout

func WithReadTimeout(d time.Duration) Option

WithReadTimeout sets read timeout.

func WithStorage

func WithStorage(st storage.Storer) Option

WithStorage sets storage value.

func WithTopic

func WithTopic(s KafkaTopicIdentifier) Option

WithTopic sets topic name.

func WithWriteTimeout

func WithWriteTimeout(d time.Duration) Option

WithWriteTimeout sets write timeout.

type TCPAddrs

type TCPAddrs string

TCPAddrs represents comma separated tcp addr list.

func (TCPAddrs) List

func (t TCPAddrs) List() []string

List validates and return list of tcp addrs.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL