Documentation
¶
Index ¶
- func GetTopics(bootstrapKafkaUrl string, regex *regexp.Regexp) (result []string, err error)
- func GetWorkerTopics(config configuration.Config) (result []string, err error)
- func ImportIdToTopic(id string) string
- func InitTopics(bootstrapUrl string, config []kafka.ConfigEntry, topics ...string) (err error)
- func NewKafkaLastOffsetConsumer(ctx context.Context, wg *sync.WaitGroup, broker string, groupId string, ...) error
- func NewKafkaLastOffsetConsumerGroup(ctx context.Context, wg *sync.WaitGroup, broker string, groupId string, ...) error
- func ServiceIdToTopic(id string) string
- func Start(basectx context.Context, wg *sync.WaitGroup, config configuration.Config, ...) error
- type UpdatableConsumer
- type Worker
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func GetWorkerTopics ¶
func GetWorkerTopics(config configuration.Config) (result []string, err error)
func ImportIdToTopic ¶
func InitTopics ¶
func InitTopics(bootstrapUrl string, config []kafka.ConfigEntry, topics ...string) (err error)
func ServiceIdToTopic ¶
Types ¶
type UpdatableConsumer ¶ added in v1.0.38
type UpdatableConsumer struct {
// contains filtered or unexported fields
}
func NewUpdatableConsumer ¶ added in v1.0.38
func NewUpdatableConsumer(ctx context.Context, config configuration.Config, onMsg func(msg model.ConsumerMessage) error, onError func(topic string, err error)) *UpdatableConsumer
func (*UpdatableConsumer) UpdateTopics ¶ added in v1.0.38
func (this *UpdatableConsumer) UpdateTopics(topics []string) error
type Worker ¶
type Worker interface {
Do(msg model.ConsumerMessage) error
HandleDeploymentUpdateSignal()
}
Click to show internal directories.
Click to hide internal directories.