Documentation
¶
Index ¶
- Constants
- Variables
- func BalanceWorkerGroup(ctx context.Context, worker *balanceworker.Worker, ...) run.Group
- func BalanceWorkerProvisionTopics(conf config.BalanceWorkerConfiguration) []pkgkafka.TopicConfig
- func BalanceWorkerSubjectResolver() balanceworker.SubjectResolver
- func BalanceWorkerSubscriber(conf config.BalanceWorkerConfiguration, ...) (message.Subscriber, error)
- func BillingAdapter(logger *slog.Logger, db *entdb.Client) (billing.Adapter, error)
- func BillingService(logger *slog.Logger, db *entdb.Client, appService app.Service, ...) (billing.Service, error)
- func BillingWorkerGroup(ctx context.Context, worker *billingworker.Worker, ...) run.Group
- func BillingWorkerProvisionTopics(conf config.BillingConfiguration) []pkgkafka.TopicConfig
- func BillingWorkerSubscriber(conf config.BillingConfiguration, brokerOptions watermillkafka.BrokerOptions) (message.Subscriber, error)
- func NewAppService(logger *slog.Logger, db *entdb.Client, appsConfig config.AppsConfiguration) (app.Service, error)
- func NewAppStripeService(logger *slog.Logger, db *entdb.Client, appsConfig config.AppsConfiguration, ...) (appstripe.Service, error)
- func NewBalanceWorker(workerOptions balanceworker.WorkerOptions) (*balanceworker.Worker, error)
- func NewBalanceWorkerOptions(eventConfig config.EventsConfiguration, routerOptions router.Options, ...) balanceworker.WorkerOptions
- func NewBillingWorker(workerOptions billingworker.WorkerOptions) (*billingworker.Worker, error)
- func NewBillingWorkerOptions(eventConfig config.EventsConfiguration, routerOptions router.Options, ...) billingworker.WorkerOptions
- func NewBrokerConfiguration(kafkaConfig config.KafkaConfiguration, logConfig config.LogTelemetryConfig, ...) watermillkafka.BrokerOptions
- func NewClickHouse(conf config.ClickHouseAggregationConfiguration) (clickhouse.Conn, error)
- func NewCustomerService(logger *slog.Logger, db *entdb.Client) (customer.Service, error)
- func NewDB(driver *pgdriver.Driver) *sql.DB
- func NewDefaultTextMapPropagator() propagation.TextMapPropagator
- func NewEntClient(driver *entdriver.EntPostgresDriver) *db.Client
- func NewEntPostgresDriver(db *sql.DB, logger *slog.Logger) (*entdriver.EntPostgresDriver, func())
- func NewEntitlementRegistry(logger *slog.Logger, db *entdb.Client, ...) *registry.Entitlement
- func NewEventBusPublisher(publisher message.Publisher, conf config.EventsConfiguration, ...) (eventbus.Publisher, error)
- func NewFeatureConnector(logger *slog.Logger, db *entdb.Client, meterRepo meter.Repository) feature.FeatureConnector
- func NewFlushHandler(eventsConfig config.EventsConfiguration, sinkConfig config.SinkConfiguration, ...) (flushhandler.FlushEventHandler, error)
- func NewHealthChecker(logger *slog.Logger) health.Health
- func NewInMemoryRepository(meters []*models.Meter) *meter.InMemoryRepository
- func NewIngestCollector(conf config.Configuration, kafkaCollector *kafkaingest.Collector, ...) (ingest.Collector, func(), error)
- func NewKafkaAdminClient(conf config.KafkaConfiguration) (*kafka.AdminClient, error)
- func NewKafkaIngestCollector(config config.KafkaIngestConfiguration, producer *kafka.Producer, ...) (*kafkaingest.Collector, error)
- func NewKafkaMetrics(meter metric.Meter) (*kafkametrics.Metrics, error)
- func NewKafkaNamespaceHandler(topicResolver topicresolver.Resolver, ...) (*kafkaingest.NamespaceHandler, error)
- func NewKafkaProducer(conf config.KafkaIngestConfiguration, logger *slog.Logger) (*kafka.Producer, error)
- func NewKafkaTopicProvisioner(conf pkgkafka.TopicProvisionerConfig) (pkgkafka.TopicProvisioner, error)
- func NewKafkaTopicProvisionerConfig(adminClient *kafka.AdminClient, logger *slog.Logger, meter metric.Meter, ...) pkgkafka.TopicProvisionerConfig
- func NewLogger(conf config.LogTelemetryConfig, res *resource.Resource, ...) *slog.Logger
- func NewLoggerProvider(ctx context.Context, conf config.LogTelemetryConfig, res *resource.Resource) (*sdklog.LoggerProvider, func(), error)
- func NewMeter(meterProvider metric.MeterProvider, metadata Metadata) metric.Meter
- func NewMeterProvider(ctx context.Context, conf config.MetricsTelemetryConfig, ...) (*sdkmetric.MeterProvider, func(), error)
- func NewNamespaceHandlers(kafkaHandler *kafkaingest.NamespaceHandler, ...) []namespace.Handler
- func NewNamespaceManager(handlers []namespace.Handler, conf config.NamespaceConfiguration) (*namespace.Manager, error)
- func NewNamespacedTopicResolver(config config.KafkaIngestConfiguration) (*topicresolver.NamespacedTopicResolver, error)
- func NewNotificationService(logger *slog.Logger, db *entdb.Client, ...) (notification.Service, error)
- func NewPlanService(logger *slog.Logger, db *entdb.Client, ...) (plan.Service, error)
- func NewPostgresDriver(ctx context.Context, conf config.PostgresConfig, ...) (*pgdriver.Driver, func(), error)
- func NewPublisher(ctx context.Context, options watermillkafka.PublisherOptions, ...) (message.Publisher, func(), error)
- func NewServerPublisher(ctx context.Context, conf config.EventsConfiguration, ...) (message.Publisher, func(), error)
- func NewSinkWorkerPublisher(ctx context.Context, options watermillkafka.PublisherOptions, ...) (message.Publisher, func(), error)
- func NewStreamingConnector(ctx context.Context, conf config.AggregationConfiguration, ...) (streaming.Connector, error)
- func NewTelemetryResource(metadata Metadata) *resource.Resource
- func NewTelemetryRouterHook(meterProvider metric.MeterProvider, tracerProvider trace.TracerProvider) func(chi.Router)
- func NewTracer(tracerProvider trace.TracerProvider, metadata Metadata) trace.Tracer
- func NewTracerProvider(ctx context.Context, conf config.TraceTelemetryConfig, res *resource.Resource, ...) (*sdktrace.TracerProvider, func(), error)
- func NewUnsafeSecretService(logger *slog.Logger, db *entdb.Client) (*secretservice.Service, error)
- func NotificationServiceProvisionTopics(conf config.NotificationConfiguration) []pkgkafka.TopicConfig
- func ServerProvisionTopics(conf config.EventsConfiguration) []pkgkafka.TopicConfig
- func SinkWorkerProvisionTopics(conf config.EventsConfiguration) []pkgkafka.TopicConfig
- type AppSandboxProvisioner
- type BalanceWorkerEntitlementRepo
- type GlobalInitializer
- type LevelHandler
- type Metadata
- type Migrator
- type Runner
- type SubscriptionServiceWithWorkflow
- type TelemetryHandler
- type TelemetryServer
Constants ¶
const (
DefaultShutdownTimeout = 5 * time.Second
)
Variables ¶
var App = wire.NewSet( NewAppService, NewAppStripeService, NewAppSandboxProvisioner, )
var BalanceWorker = wire.NewSet( BalanceWorkerProvisionTopics, BalanceWorkerSubscriber, wire.Struct(new(registrybuilder.EntitlementOptions), "*"), registrybuilder.GetEntitlementRegistry, NewBalanceWorkerOptions, NewBalanceWorker, BalanceWorkerGroup, )
var BalanceWorkerAdapter = wire.NewSet( NewBalanceWorkerEntitlementRepo, wire.Bind(new(balanceworker.BalanceWorkerRepository), new(BalanceWorkerEntitlementRepo)), BalanceWorkerSubjectResolver, )
var Billing = wire.NewSet( BillingService, BillingAdapter, )
var BillingWorker = wire.NewSet( App, Customer, Secret, BillingWorkerProvisionTopics, BillingWorkerSubscriber, NewFeatureConnector, BillingAdapter, BillingService, NewBillingWorkerOptions, NewBillingWorker, BillingWorkerGroup, )
var ClickHouse = wire.NewSet( NewClickHouse, )
var Config = wire.NewSet( wire.FieldsOf(new(config.Configuration), "Apps"), wire.FieldsOf(new(config.Configuration), "Aggregation"), wire.FieldsOf(new(config.Configuration), "Billing"), wire.FieldsOf(new(config.AggregationConfiguration), "ClickHouse"), wire.FieldsOf(new(config.Configuration), "Postgres"), wire.FieldsOf(new(config.Configuration), "Entitlements"), wire.FieldsOf(new(config.Configuration), "Events"), wire.FieldsOf(new(config.KafkaIngestConfiguration), "KafkaConfiguration"), wire.FieldsOf(new(config.Configuration), "Ingest"), wire.FieldsOf(new(config.IngestConfiguration), "Kafka"), wire.FieldsOf(new(config.KafkaIngestConfiguration), "TopicProvisionerConfig"), wire.FieldsOf(new(config.Configuration), "Namespace"), wire.FieldsOf(new(config.Configuration), "Notification"), wire.FieldsOf(new(config.Configuration), "ProductCatalog"), wire.FieldsOf(new(config.Configuration), "Svix"), wire.FieldsOf(new(config.Configuration), "Telemetry"), wire.FieldsOf(new(config.TelemetryConfig), "Metrics"), wire.FieldsOf(new(config.TelemetryConfig), "Trace"), wire.FieldsOf(new(config.TelemetryConfig), "Log"), )
We have configs separatly to be able to reuse wires in other projects
var Customer = wire.NewSet( NewCustomerService, )
var Database = wire.NewSet( wire.Struct(new(Migrator), "*"), NewPostgresDriver, NewDB, NewEntPostgresDriver, NewEntClient, )
var Entitlement = wire.NewSet( NewEntitlementRegistry, )
var Feature = wire.NewSet( NewFeatureConnector, )
var Framework = wire.NewSet( wire.Struct(new(GlobalInitializer), "*"), wire.Struct(new(Runner), "*"), )
var Kafka = wire.NewSet( NewKafkaProducer, NewKafkaMetrics, KafkaTopic, )
var KafkaNamespaceResolver = wire.NewSet( NewNamespacedTopicResolver, wire.Bind(new(topicresolver.Resolver), new(*topicresolver.NamespacedTopicResolver)), NewKafkaNamespaceHandler, NewNamespaceHandlers, )
var KafkaTopic = wire.NewSet( NewKafkaAdminClient, NewKafkaTopicProvisionerConfig, NewKafkaTopicProvisioner, )
var MeterInMemory = wire.NewSet( wire.FieldsOf(new(config.Configuration), "Meters"), wire.Bind(new(meter.Repository), new(*meter.InMemoryRepository)), NewInMemoryRepository, )
var Namespace = wire.NewSet( NewNamespaceManager, )
var Notification = wire.NewSet( NewNotificationService, )
var Plan = wire.NewSet( NewPlanService, )
var ProductCatalog = wire.NewSet( Feature, Plan, )
var Secret = wire.NewSet( wire.Bind(new(secret.Service), new(*secretservice.Service)), NewUnsafeSecretService, )
var Streaming = wire.NewSet( NewStreamingConnector, )
var Subscription = wire.NewSet( NewSubscriptionServices, )
var Telemetry = wire.NewSet( NewTelemetryResource, NewLoggerProvider, wire.Bind(new(log.LoggerProvider), new(*sdklog.LoggerProvider)), NewLogger, NewMeterProvider, wire.Bind(new(metric.MeterProvider), new(*sdkmetric.MeterProvider)), NewMeter, NewTracerProvider, wire.Bind(new(trace.TracerProvider), new(*sdktrace.TracerProvider)), NewTracer, NewHealthChecker, NewTelemetryHandler, NewTelemetryServer, )
var Watermill = wire.NewSet( WatermillNoPublisher, NewPublisher, )
var WatermillNoPublisher = wire.NewSet( NewBrokerConfiguration, wire.Struct(new(watermillkafka.PublisherOptions), "*"), NewEventBusPublisher, )
TODO: move this back to Watermill NOTE: this is also used by the sink-worker that requires control over how the publisher is closed
Functions ¶
func BalanceWorkerGroup ¶
func BalanceWorkerGroup( ctx context.Context, worker *balanceworker.Worker, telemetryServer TelemetryServer, ) run.Group
func BalanceWorkerProvisionTopics ¶
func BalanceWorkerProvisionTopics(conf config.BalanceWorkerConfiguration) []pkgkafka.TopicConfig
func BalanceWorkerSubjectResolver ¶
func BalanceWorkerSubjectResolver() balanceworker.SubjectResolver
func BalanceWorkerSubscriber ¶
func BalanceWorkerSubscriber(conf config.BalanceWorkerConfiguration, brokerOptions watermillkafka.BrokerOptions) (message.Subscriber, error)
no closer function: the subscriber is closed by the router/worker
func BillingAdapter ¶
func BillingService ¶
func BillingService( logger *slog.Logger, db *entdb.Client, appService app.Service, appStripeService appstripe.Service, billingAdapter billing.Adapter, billingConfig config.BillingConfiguration, customerService customer.Service, featureConnector feature.FeatureConnector, meterRepo meter.Repository, streamingConnector streaming.Connector, ) (billing.Service, error)
func BillingWorkerGroup ¶
func BillingWorkerGroup( ctx context.Context, worker *billingworker.Worker, telemetryServer TelemetryServer, ) run.Group
func BillingWorkerProvisionTopics ¶
func BillingWorkerProvisionTopics(conf config.BillingConfiguration) []pkgkafka.TopicConfig
func BillingWorkerSubscriber ¶
func BillingWorkerSubscriber(conf config.BillingConfiguration, brokerOptions watermillkafka.BrokerOptions) (message.Subscriber, error)
no closer function: the subscriber is closed by the router/worker
func NewAppService ¶
func NewAppStripeService ¶
func NewBalanceWorker ¶
func NewBalanceWorker(workerOptions balanceworker.WorkerOptions) (*balanceworker.Worker, error)
func NewBalanceWorkerOptions ¶
func NewBalanceWorkerOptions( eventConfig config.EventsConfiguration, routerOptions router.Options, eventBus eventbus.Publisher, entitlements *registry.Entitlement, repo balanceworker.BalanceWorkerRepository, subjectResolver balanceworker.SubjectResolver, logger *slog.Logger, ) balanceworker.WorkerOptions
func NewBillingWorker ¶
func NewBillingWorker(workerOptions billingworker.WorkerOptions) (*billingworker.Worker, error)
func NewBillingWorkerOptions ¶
func NewBillingWorkerOptions( eventConfig config.EventsConfiguration, routerOptions router.Options, eventBus eventbus.Publisher, billingService billing.Service, billingAdapter billing.Adapter, logger *slog.Logger, ) billingworker.WorkerOptions
func NewBrokerConfiguration ¶
func NewBrokerConfiguration( kafkaConfig config.KafkaConfiguration, logConfig config.LogTelemetryConfig, appMetadata Metadata, logger *slog.Logger, meter metric.Meter, ) watermillkafka.BrokerOptions
func NewClickHouse ¶
func NewClickHouse(conf config.ClickHouseAggregationConfiguration) (clickhouse.Conn, error)
TODO: add closer function?
func NewCustomerService ¶
func NewDefaultTextMapPropagator ¶
func NewDefaultTextMapPropagator() propagation.TextMapPropagator
func NewEntClient ¶
func NewEntClient(driver *entdriver.EntPostgresDriver) *db.Client
TODO: add closer function?
func NewEntPostgresDriver ¶
func NewEntitlementRegistry ¶
func NewEntitlementRegistry( logger *slog.Logger, db *entdb.Client, entitlementConfig config.EntitlementsConfiguration, streamingConnector streaming.Connector, meterRepository meter.Repository, eventPublisher eventbus.Publisher, ) *registry.Entitlement
func NewEventBusPublisher ¶
func NewFeatureConnector ¶
func NewFeatureConnector(logger *slog.Logger, db *entdb.Client, meterRepo meter.Repository) feature.FeatureConnector
func NewFlushHandler ¶
func NewFlushHandler( eventsConfig config.EventsConfiguration, sinkConfig config.SinkConfiguration, messagePublisher message.Publisher, eventPublisher eventbus.Publisher, logger *slog.Logger, meter metric.Meter, ) (flushhandler.FlushEventHandler, error)
func NewInMemoryRepository ¶
func NewInMemoryRepository(meters []*models.Meter) *meter.InMemoryRepository
func NewIngestCollector ¶
func NewKafkaAdminClient ¶
func NewKafkaAdminClient(conf config.KafkaConfiguration) (*kafka.AdminClient, error)
func NewKafkaIngestCollector ¶
func NewKafkaIngestCollector( config config.KafkaIngestConfiguration, producer *kafka.Producer, topicResolver topicresolver.Resolver, topicProvisioner pkgkafka.TopicProvisioner, ) (*kafkaingest.Collector, error)
func NewKafkaMetrics ¶
func NewKafkaMetrics(meter metric.Meter) (*kafkametrics.Metrics, error)
func NewKafkaNamespaceHandler ¶
func NewKafkaNamespaceHandler( topicResolver topicresolver.Resolver, topicProvisioner pkgkafka.TopicProvisioner, conf config.KafkaIngestConfiguration, ) (*kafkaingest.NamespaceHandler, error)
func NewKafkaProducer ¶
func NewKafkaProducer(conf config.KafkaIngestConfiguration, logger *slog.Logger) (*kafka.Producer, error)
TODO: use ingest config directly? TODO: use kafka config directly? TODO: add closer function?
func NewKafkaTopicProvisioner ¶
func NewKafkaTopicProvisioner(conf pkgkafka.TopicProvisionerConfig) (pkgkafka.TopicProvisioner, error)
TODO: do we need a separate constructor for the sake of a custom error message?
func NewKafkaTopicProvisionerConfig ¶
func NewKafkaTopicProvisionerConfig( adminClient *kafka.AdminClient, logger *slog.Logger, meter metric.Meter, settings config.TopicProvisionerConfig, ) pkgkafka.TopicProvisionerConfig
TODO: fill struct fields automatically?
func NewLogger ¶
func NewLogger(conf config.LogTelemetryConfig, res *resource.Resource, loggerProvider log.LoggerProvider, metadata Metadata) *slog.Logger
func NewLoggerProvider ¶
func NewLoggerProvider(ctx context.Context, conf config.LogTelemetryConfig, res *resource.Resource) (*sdklog.LoggerProvider, func(), error)
func NewMeterProvider ¶
func NewNamespaceHandlers ¶
func NewNamespaceHandlers( kafkaHandler *kafkaingest.NamespaceHandler, clickHouseHandler streaming.Connector, ) []namespace.Handler
func NewNamespaceManager ¶
func NewNamespacedTopicResolver ¶
func NewNamespacedTopicResolver(config config.KafkaIngestConfiguration) (*topicresolver.NamespacedTopicResolver, error)
func NewNotificationService ¶
func NewNotificationService( logger *slog.Logger, db *entdb.Client, notificationConfig config.NotificationConfiguration, svixConfig config.SvixConfig, featureConnector feature.FeatureConnector, ) (notification.Service, error)
func NewPlanService ¶
func NewPostgresDriver ¶
func NewPostgresDriver( ctx context.Context, conf config.PostgresConfig, meterProvider metric.MeterProvider, meter metric.Meter, tracerProvider trace.TracerProvider, logger *slog.Logger, ) (*pgdriver.Driver, func(), error)
func NewPublisher ¶
func NewPublisher( ctx context.Context, options watermillkafka.PublisherOptions, logger *slog.Logger, ) (message.Publisher, func(), error)
NOTE: this is also used by the sink-worker that requires control over how the publisher is closed
func NewServerPublisher ¶
func NewServerPublisher( ctx context.Context, conf config.EventsConfiguration, options watermillkafka.PublisherOptions, logger *slog.Logger, ) (message.Publisher, func(), error)
func NewSinkWorkerPublisher ¶
func NewSinkWorkerPublisher( ctx context.Context, options watermillkafka.PublisherOptions, logger *slog.Logger, ) (message.Publisher, func(), error)
the sink-worker requires control over how the publisher is closed
func NewStreamingConnector ¶
func NewStreamingConnector( ctx context.Context, conf config.AggregationConfiguration, clickHouse clickhouse.Conn, meterRepository meter.Repository, logger *slog.Logger, ) (streaming.Connector, error)
func NewTelemetryResource ¶
func NewTelemetryRouterHook ¶
func NewTelemetryRouterHook(meterProvider metric.MeterProvider, tracerProvider trace.TracerProvider) func(chi.Router)
func NewTracer ¶
func NewTracer(tracerProvider trace.TracerProvider, metadata Metadata) trace.Tracer
func NewTracerProvider ¶
func NewUnsafeSecretService ¶
func NotificationServiceProvisionTopics ¶
func NotificationServiceProvisionTopics(conf config.NotificationConfiguration) []pkgkafka.TopicConfig
func ServerProvisionTopics ¶
func ServerProvisionTopics(conf config.EventsConfiguration) []pkgkafka.TopicConfig
func SinkWorkerProvisionTopics ¶
func SinkWorkerProvisionTopics(conf config.EventsConfiguration) []pkgkafka.TopicConfig
Types ¶
type AppSandboxProvisioner ¶
type AppSandboxProvisioner func() error
type BalanceWorkerEntitlementRepo ¶
type BalanceWorkerEntitlementRepo interface {
entitlement.EntitlementRepo
balanceworker.BalanceWorkerRepository
}
func NewBalanceWorkerEntitlementRepo ¶
func NewBalanceWorkerEntitlementRepo(db *db.Client) BalanceWorkerEntitlementRepo
type GlobalInitializer ¶
type GlobalInitializer struct {
Logger *slog.Logger
MeterProvider metric.MeterProvider
TracerProvider trace.TracerProvider
TextMapPropagator propagation.TextMapPropagator
}
GlobalInitializer initializes global variables for the application.
func (*GlobalInitializer) SetGlobals ¶
func (i *GlobalInitializer) SetGlobals()
SetGlobals initializes global variables for the application.
It is intended to be embedded into application structs and be called from func main.
type LevelHandler ¶
type LevelHandler struct {
// contains filtered or unexported fields
}
LevelHandler is a slog.Handler that filters log records based on the log level.
func NewLevelHandler ¶
func NewLevelHandler(handler slog.Handler, level slog.Leveler) *LevelHandler
NewLevelHandler returns a new LevelHandler.
type Metadata ¶
type Metadata struct {
ServiceName string
Version string
Environment string
OpenTelemetryName string
}
Metadata provides information about the service to components that need it (eg. telemetry).
func NewMetadata ¶
func NewMetadata(conf config.Configuration, version string, serviceName string) Metadata
type SubscriptionServiceWithWorkflow ¶
type SubscriptionServiceWithWorkflow struct {
Service subscription.Service
WorkflowService subscription.WorkflowService
PlanSubscriptionService plansubscription.PlanSubscriptionService
}
Combine Srvice and WorkflowService into one struct We do this to able to initialize the Service and WorkflowService together and share the same subscriptionRepo.
func NewSubscriptionServices ¶
func NewSubscriptionServices( logger *slog.Logger, db *entdb.Client, productcatalogConfig config.ProductCatalogConfiguration, entitlementConfig config.EntitlementsConfiguration, featureConnector feature.FeatureConnector, entitlementRegistry *registry.Entitlement, customerService customer.Service, planService plan.Service, eventPublisher eventbus.Publisher, ) (SubscriptionServiceWithWorkflow, error)
type TelemetryHandler ¶
func NewTelemetryHandler ¶
func NewTelemetryHandler(metricsConf config.MetricsTelemetryConfig, healthChecker health.Health) TelemetryHandler
type TelemetryServer ¶
func NewTelemetryServer ¶
func NewTelemetryServer(conf config.TelemetryConfig, handler TelemetryHandler) (TelemetryServer, func())
Source Files
¶
- app.go
- billing.go
- clickhouse.go
- config.go
- customer.go
- database.go
- entitlement.go
- framework.go
- globals.go
- kafka.go
- metadata.go
- meter.go
- namespace.go
- notification.go
- openmeter_balanceworker.go
- openmeter_billingworker.go
- openmeter_notification.go
- openmeter_server.go
- openmeter_sinkworker.go
- productcatalog.go
- runner.go
- secret.go
- streaming.go
- subscription.go
- telemetry.go
- watermill.go