Documentation
¶
Index ¶
- Constants
- func IsBrokersAreValid(brokers []string) error
- func IsKafkaTopicValid(s KafkaTopicIdentifier) error
- func Run() error
- type Consumer
- func (Consumer) ExtractHeadersFromMessage(msg *sarama.ConsumerMessage) (*storage.GitHubWebhookData, error)
- func (Consumer) ExtractUserInfo(payload any) (int64, string)
- func (c *Consumer) Ping() error
- func (c Consumer) Start() error
- func (c Consumer) StoreGitHubMessage(msg *sarama.ConsumerMessage) error
- func (Consumer) UnmarshalPayload(event github.Event, value []byte, target string) (any, error)
- func (c Consumer) Worker(workerID int, messages <-chan *sarama.ConsumerMessage)
- type ConsumerConfigFactoryFunc
- type ConsumerFactoryFunc
- type GitHubKafkaConsumer
- type KafkaConsumer
- type KafkaTopicIdentifier
- type Option
- func WithBackoff(d time.Duration) Option
- func WithBrokers(brokers []string) Option
- func WithDialTimeout(d time.Duration) Option
- func WithLogger(l *slog.Logger) Option
- func WithMaxRetries(i int) Option
- func WithPartition(i int) Option
- func WithReadTimeout(d time.Duration) Option
- func WithStorage(st storage.Storer) Option
- func WithTopic(s KafkaTopicIdentifier) Option
- func WithWriteTimeout(d time.Duration) Option
- type TCPAddrs
Constants ¶
const ( DefaultKafkaBrokers = "127.0.0.1:9094" DefaultKafkaConsumerPartition = 0 DefaultKafkaConsumerDialTimeout = 30 * time.Second DefaultKafkaConsumerReadTimeout = 30 * time.Second DefaultKafkaConsumerWriteTimeout = 30 * time.Second DefaultKafkaConsumerBackoff = 2 * time.Second DefaultKafkaConsumerMaxRetries = 10 )
defaults values.
Variables ¶
This section is empty.
Functions ¶
func IsBrokersAreValid ¶
IsBrokersAreValid validates if the given brokers are pointing valid tcp addrs.
func IsKafkaTopicValid ¶
func IsKafkaTopicValid(s KafkaTopicIdentifier) error
IsKafkaTopicValid validates if given topic is supported.
Types ¶
type Consumer ¶
type Consumer struct {
ConsumerFactory ConsumerFactoryFunc
ConsumerConfig ConsumerConfigFactoryFunc
Logger *slog.Logger
Storage storage.Storer
Consumer sarama.Consumer
Topic KafkaTopicIdentifier
Brokers []string
DialTimeout time.Duration
ReadTimeout time.Duration
WriteTimeout time.Duration
Backoff time.Duration
MaxRetries uint8
Partition int32
}
Consumer represents kafa consumer setup.
func (Consumer) ExtractHeadersFromMessage ¶
func (Consumer) ExtractHeadersFromMessage(msg *sarama.ConsumerMessage) (*storage.GitHubWebhookData, error)
ExtractHeadersFromMessage extracts Kafka message headers.
func (Consumer) ExtractUserInfo ¶
ExtractUserInfo extracts user information from payload.
func (Consumer) StoreGitHubMessage ¶
func (c Consumer) StoreGitHubMessage(msg *sarama.ConsumerMessage) error
StoreGitHubMessage stores consumed message to database.
func (Consumer) UnmarshalPayload ¶
UnmarshalPayload unmarshals incoming value to event related struct.
type ConsumerConfigFactoryFunc ¶
ConsumerConfigFactoryFunc is a type for handling config.
type ConsumerFactoryFunc ¶
ConsumerFactoryFunc is a type for handling consumer.
type GitHubKafkaConsumer ¶
type GitHubKafkaConsumer interface {
KafkaConsumer
StoreGitHubMessage(msg *sarama.ConsumerMessage) error
}
GitHubKafkaConsumer defines Kafka GitHub message consumer.
type KafkaConsumer ¶
type KafkaConsumer interface {
Start() error
Ping() error
Worker(workerID int, messages <-chan *sarama.ConsumerMessage)
}
KafkaConsumer defines kafka consumer behaviours.
type KafkaTopicIdentifier ¶
type KafkaTopicIdentifier string
KafkaTopicIdentifier represents custom type.
const ( KafkaTopicIdentifierGitHub KafkaTopicIdentifier = "github" KafkaTopicIdentifierGitLab KafkaTopicIdentifier = "gitlab" )
constants.
func (KafkaTopicIdentifier) String ¶
func (s KafkaTopicIdentifier) String() string
type Option ¶
Option represents option function type.
func WithDialTimeout ¶
WithDialTimeout sets dial timeout.
func WithReadTimeout ¶
WithReadTimeout sets read timeout.
func WithWriteTimeout ¶
WithWriteTimeout sets write timeout.