Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
View Source
var GetDatasetClient = func(cfg *config.Config) clients.DatasetClient { return dataset.NewAPIClient(cfg.DatasetAPIURL) }
GetDatasetClient gets the Dataset API client
View Source
var GetHTTPServer = func(bindAddr string, router http.Handler) HTTPServer { s := dphttp.NewServer(bindAddr, router) s.HandleOSSignals = false return s }
GetHTTPServer creates an HTTP Server with the provided bind address and router
View Source
var GetHealthCheck = func(cfg *config.Config, buildTime, gitCommit, version string) (HealthChecker, error) { versionInfo, err := healthcheck.NewVersionInfo(buildTime, gitCommit, version) if err != nil { return nil, fmt.Errorf("failed to get version info: %w", err) } hc := healthcheck.New( versionInfo, cfg.HealthCheckCriticalTimeout, cfg.HealthCheckInterval, ) return &hc, nil }
GetHealthCheck creates a healthcheck with versionInfo
View Source
var GetKafkaConsumer = func(ctx context.Context, cfg *config.Kafka) (kafka.IConsumerGroup, error) { if cfg == nil { return nil, errors.New("cannot create a kafka consumer without kafka config") } kafkaOffset := kafka.OffsetNewest if cfg.OffsetOldest { kafkaOffset = kafka.OffsetOldest } cgConfig := &kafka.ConsumerGroupConfig{ BrokerAddrs: cfg.Addr, Topic: cfg.ContentUpdatedTopic, GroupName: cfg.ContentUpdatedGroup, MinBrokersHealthy: &cfg.ConsumerMinBrokersHealthy, KafkaVersion: &cfg.Version, Offset: &kafkaOffset, } if cfg.SecProtocol == config.KafkaTLSProtocol { cgConfig.SecurityConfig = kafka.GetSecurityConfig( cfg.SecCACerts, cfg.SecClientCert, cfg.SecClientKey, cfg.SecSkipVerify, ) } return kafka.NewConsumerGroup(ctx, cgConfig) }
GetKafkaConsumer returns a Kafka Consumer group
View Source
var GetKafkaProducer = func(ctx context.Context, cfg *config.Kafka) (kafka.IProducer, error) { if cfg == nil { return nil, errors.New("cannot create a kafka producer without kafka config") } pConfig := &kafka.ProducerConfig{ BrokerAddrs: cfg.Addr, Topic: cfg.ProducerTopic, MinBrokersHealthy: &cfg.ProducerMinBrokersHealthy, KafkaVersion: &cfg.Version, MaxMessageBytes: &cfg.MaxBytes, } if cfg.SecProtocol == config.KafkaTLSProtocol { pConfig.SecurityConfig = kafka.GetSecurityConfig( cfg.SecCACerts, cfg.SecClientCert, cfg.SecClientKey, cfg.SecSkipVerify, ) } return kafka.NewProducer(ctx, pConfig) }
GetKafkaProducer creates a Kafka producer and sets the producder flag to true
View Source
var GetZebedee = func(cfg *config.Config) clients.ZebedeeClient { return zebedee.New(cfg.ZebedeeURL) }
GetZebedee gets the Zebedee Client
Functions ¶
This section is empty.
Types ¶
type HTTPServer ¶
HTTPServer defines the required methods from the HTTP server
type HealthChecker ¶
type HealthChecker interface {
Handler(w http.ResponseWriter, req *http.Request)
Start(ctx context.Context)
Stop()
AddAndGetCheck(name string, checker healthcheck.Checker) (check *healthcheck.Check, err error)
Subscribe(s healthcheck.Subscriber, checks ...*healthcheck.Check)
}
HealthChecker defines the required methods from Healthcheck
type Service ¶
type Service struct {
Cfg *config.Config
Server HTTPServer
HealthCheck HealthChecker
Consumer kafka.IConsumerGroup
Producer kafka.IProducer
ZebedeeCli clients.ZebedeeClient
DatasetCli clients.DatasetClient
}
Service contains all the configs, server and clients to run the event handler service
Click to show internal directories.
Click to hide internal directories.