Documentation
¶
Index ¶
- Constants
- Variables
- func BalanceWorkerGroup(ctx context.Context, worker *balanceworker.Worker, ...) run.Group
- func BalanceWorkerProvisionTopics(conf config.BalanceWorkerConfiguration, ...) []pkgkafka.TopicConfig
- func BalanceWorkerSubscriber(conf config.BalanceWorkerConfiguration, ...) (message.Subscriber, error)
- func BillingAdapter(logger *slog.Logger, db *entdb.Client) (billing.Adapter, error)
- func BillingService(logger *slog.Logger, appService app.Service, billingAdapter billing.Adapter, ...) (billing.Service, error)
- func BillingWorkerGroup(ctx context.Context, worker *billingworker.Worker, ...) run.Group
- func BillingWorkerProvisionTopics(conf config.BillingConfiguration) []pkgkafka.TopicConfig
- func BillingWorkerSubscriber(conf config.BillingConfiguration, brokerOptions watermillkafka.BrokerOptions) (message.Subscriber, error)
- func NewAddonService(logger *slog.Logger, db *entdb.Client, ...) (addon.Service, error)
- func NewAppCustomInvoicingService(logger *slog.Logger, db *entdb.Client, appsConfig config.AppsConfiguration, ...) (appcustominvoicing.Service, error)
- func NewAppSandboxFactory(appsConfig config.AppsConfiguration, appService app.Service, ...) (*appsandbox.Factory, error)
- func NewAppService(logger *slog.Logger, db *entdb.Client, publisher eventbus.Publisher) (app.Service, error)
- func NewAppStripeService(logger *slog.Logger, db *entdb.Client, appsConfig config.AppsConfiguration, ...) (appstripe.Service, error)
- func NewBalanceWorker(workerOptions balanceworker.WorkerOptions) (*balanceworker.Worker, error)
- func NewBalanceWorkerFilterStateStorage(conf config.BalanceWorkerConfiguration) (balanceworker.FilterStateStorage, error)
- func NewBalanceWorkerOptions(eventConfig config.EventsConfiguration, routerOptions router.Options, ...) balanceworker.WorkerOptions
- func NewBillingAutoAdvancer(logger *slog.Logger, service billing.Service) (*billingworkerautoadvance.AutoAdvancer, error)
- func NewBillingCollector(logger *slog.Logger, service billing.Service, ...) (*billingworkercollect.InvoiceCollector, error)
- func NewBillingSubscriptionHandler(logger *slog.Logger, subsServices SubscriptionServiceWithWorkflow, ...) (*billingworkersubscription.Handler, error)
- func NewBillingSubscriptionReconciler(logger *slog.Logger, subsServices SubscriptionServiceWithWorkflow, ...) (*billingworkersubscription.Reconciler, error)
- func NewBillingWorker(workerOptions billingworker.WorkerOptions) (*billingworker.Worker, error)
- func NewBillingWorkerOptions(eventConfig config.EventsConfiguration, routerOptions router.Options, ...) billingworker.WorkerOptions
- func NewBrokerConfiguration(kafkaConfig config.KafkaConfiguration, appMetadata Metadata, ...) watermillkafka.BrokerOptions
- func NewClickHouse(conf config.ClickHouseAggregationConfiguration, tracer trace.Tracer) (clickhouse.Conn, error)
- func NewCustomerService(logger *slog.Logger, db *entdb.Client, entRegistry *registry.Entitlement, ...) (customer.Service, error)
- func NewDB(driver *pgdriver.Driver) *sql.DB
- func NewDefaultTextMapPropagator() propagation.TextMapPropagator
- func NewEmptyProvisionTopics() []pkgkafka.TopicConfig
- func NewEntClient(driver *entdriver.EntPostgresDriver) *db.Client
- func NewEntPostgresDriver(db *sql.DB, logger *slog.Logger) (*entdriver.EntPostgresDriver, func())
- func NewEntitlementRegistry(logger *slog.Logger, db *entdb.Client, tracer trace.Tracer, ...) *registry.Entitlement
- func NewEventBusPublisher(publisher message.Publisher, conf config.EventsConfiguration, ...) (eventbus.Publisher, error)
- func NewFeatureConnector(logger *slog.Logger, db *entdb.Client, meterService meter.Service, ...) feature.FeatureConnector
- func NewFlushHandler(eventsConfig config.EventsConfiguration, sinkConfig config.SinkConfiguration, ...) (flushhandler.FlushEventHandler, error)
- func NewHealthChecker(logger *slog.Logger) health.Health
- func NewIngestCollector(dedupeConfig config.DedupeConfiguration, kafkaCollector *kafkaingest.Collector, ...) (ingest.Collector, func(), error)
- func NewIngestService(collector ingest.Collector, logger *slog.Logger) (*ingest.Service, error)
- func NewKafkaAdminClient(conf config.KafkaConfiguration) (*kafka.AdminClient, error)
- func NewKafkaIngestCollector(config config.KafkaIngestConfiguration, producer *kafka.Producer, ...) (*kafkaingest.Collector, error)
- func NewKafkaIngestNamespaceHandler(topicResolver topicresolver.Resolver, ...) (*kafkaingest.NamespaceHandler, error)
- func NewKafkaMetrics(meter metric.Meter) (*kafkametrics.Metrics, error)
- func NewKafkaProducer(conf config.KafkaIngestConfiguration, logger *slog.Logger, meta Metadata) (*kafka.Producer, error)
- func NewKafkaTopicProvisioner(kafkaConfig config.KafkaConfiguration, settings config.TopicProvisionerConfig, ...) (pkgkafka.TopicProvisioner, error)
- func NewLocker(logger *slog.Logger) (*lockr.Locker, error)
- func NewLogger(conf config.LogTelemetryConfig, res *resource.Resource, ...) *slog.Logger
- func NewLoggerProvider(ctx context.Context, conf config.LogTelemetryConfig, res *resource.Resource) (*sdklog.LoggerProvider, func(), error)
- func NewMeter(meterProvider metric.MeterProvider, metadata Metadata) metric.Meter
- func NewMeterAdapter(logger *slog.Logger, db *entdb.Client) (*adapter.Adapter, error)
- func NewMeterEventService(streamingConnector streaming.Connector, customerService customer.Service, ...) meterevent.Service
- func NewMeterManageService(meterAdapter *adapter.Adapter, namespaceManager *namespace.Manager, ...) meter.ManageService
- func NewMeterProvider(ctx context.Context, conf config.MetricsTelemetryConfig, ...) (*sdkmetric.MeterProvider, func(), error)
- func NewMeterService(meterAdapter *adapter.Adapter) meter.Service
- func NewNamespaceManager(conf config.NamespaceConfiguration) (*namespace.Manager, error)
- func NewNamespacedTopicResolver(config config.KafkaIngestConfiguration) (*topicresolver.NamespacedTopicResolver, error)
- func NewNoopKafkaTopicProvisioner() pkgkafka.TopicProvisioner
- func NewNoopNotificationEventHandler() (notification.EventHandler, func(), error)
- func NewNoopNotificationWebhookHandler(logger *slog.Logger) (notificationwebhook.Handler, error)
- func NewNotificationAdapter(logger *slog.Logger, db *entdb.Client) (notification.Repository, error)
- func NewNotificationEventHandler(config config.NotificationConfiguration, logger *slog.Logger, ...) (notification.EventHandler, func(), error)
- func NewNotificationService(logger *slog.Logger, adapter notification.Repository, ...) (notification.Service, error)
- func NewNotificationWebhookHandler(logger *slog.Logger, tracer trace.Tracer, ...) (notificationwebhook.Handler, error)
- func NewPlanAddonService(logger *slog.Logger, db *entdb.Client, planService plan.Service, ...) (planaddon.Service, error)
- func NewPlanService(logger *slog.Logger, db *entdb.Client, ...) (plan.Service, error)
- func NewPortalService(conf config.PortalConfiguration) (portal.Service, error)
- func NewPostAuthMiddlewares(ffx FFXConfigContextMiddleware) server.PostAuthMiddlewares
- func NewPostgresDriver(ctx context.Context, conf config.PostgresConfig, ...) (*pgdriver.Driver, func(), error)
- func NewProgressManager(logger *slog.Logger, conf config.ProgressManagerConfiguration) (progressmanager.Service, error)
- func NewPublisher(ctx context.Context, options watermillkafka.PublisherOptions, ...) (message.Publisher, func(), error)
- func NewRouterHooks(telemetry TelemetryMiddlewareHook) *server.RouterHooks
- func NewServerPublisher(ctx context.Context, options watermillkafka.PublisherOptions, ...) (message.Publisher, func(), error)
- func NewSinkWorkerPublisher(ctx context.Context, options watermillkafka.PublisherOptions, ...) (message.Publisher, func(), error)
- func NewStaticNamespaceDecoder(conf config.NamespaceConfiguration) namespacedriver.NamespaceDecoder
- func NewStreamingConnector(ctx context.Context, conf config.AggregationConfiguration, ...) (streaming.Connector, error)
- func NewSubjectAdapter(db *entdb.Client) (subject.Adapter, error)
- func NewSubjectCustomerHook(subject subject.Service, customer customer.Service, logger *slog.Logger, ...) (subjecthooks.CustomerSubjectHook, error)
- func NewSubjectEntitlementValidatorHook(logger *slog.Logger, entitlementRegistry *registry.Entitlement, ...) (subjecthooks.EntitlementValidatorHook, error)
- func NewSubjectService(adapter subject.Adapter) (subject.Service, error)
- func NewSvixAPIClient(config config.SvixConfig, meterProvider metric.MeterProvider, ...) (*svix.Svix, error)
- func NewTelemetryResource(metadata Metadata) *resource.Resource
- func NewTerminationCheckerActor(r *TerminationChecker, logger *slog.Logger) (execute func() error, interrupt func(error), err error)
- func NewTracer(tracerProvider trace.TracerProvider, metadata Metadata) trace.Tracer
- func NewTracerProvider(ctx context.Context, conf config.TraceTelemetryConfig, res *resource.Resource, ...) (*sdktrace.TracerProvider, func(), error)
- func NewUnsafeSecretService(logger *slog.Logger, db *entdb.Client) (*secretservice.Service, error)
- func NotificationServiceProvisionTopics(conf config.NotificationConfiguration) []pkgkafka.TopicConfig
- func ServerProvisionTopics(conf config.EventsConfiguration) []pkgkafka.TopicConfig
- func SinkWorkerProvisionTopics(conf config.EventsConfiguration) []pkgkafka.TopicConfig
- func TelemetryLoggerNoAdditionalMiddlewares() []slogmulti.Middleware
- type AppRegistry
- type AppSandboxProvisioner
- type BalanceWorkerEntitlementRepo
- type CustomerEntitlementValidatorHook
- type CustomerSubjectHook
- type CustomerSubjectValidatorHook
- type FFXConfigContextMiddleware
- type GlobalInitializer
- type LevelHandler
- type Metadata
- type MeterConfigInitializer
- type Migrator
- type Runner
- type RuntimeMetricsCollector
- type SubscriptionServiceWithWorkflow
- type TelemetryHandler
- type TelemetryMiddlewareHook
- type TelemetryServer
- type TerminationChecker
- func (s *TerminationChecker) Execute(_ context.Context) (interface{}, error)
- func (s *TerminationChecker) IsRunning() bool
- func (s *TerminationChecker) IsTerminating() bool
- func (s *TerminationChecker) Name() string
- func (s *TerminationChecker) Terminate(reason error)
- func (s *TerminationChecker) WaitForPropagation(ctx context.Context) error
- type TerminationState
Constants ¶
const (
DefaultShutdownTimeout = 5 * time.Second
)
Variables ¶
var Addon = wire.NewSet( NewAddonService, )
var App = wire.NewSet( NewAppRegistry, NewAppService, NewAppStripeService, NewAppSandboxFactory, NewAppSandboxProvisioner, NewAppCustomInvoicingService, )
var BalanceWorker = wire.NewSet( BalanceWorkerProvisionTopics, BalanceWorkerSubscriber, NewEntitlementRegistry, NewBalanceWorkerOptions, NewBalanceWorker, BalanceWorkerGroup, NewBalanceWorkerFilterStateStorage, )
var BalanceWorkerAdapter = wire.NewSet( NewBalanceWorkerEntitlementRepo, wire.Bind(new(balanceworker.BalanceWorkerRepository), new(BalanceWorkerEntitlementRepo)), )
var Billing = wire.NewSet( BillingService, BillingAdapter, wire.Bind(new(billing.CustomerOverrideService), new(billing.Service)), )
var BillingWorker = wire.NewSet( App, Customer, Secret, BillingWorkerProvisionTopics, BillingWorkerSubscriber, Lockr, FFX, Subscription, ProductCatalog, Entitlement, BillingAdapter, BillingService, NewBillingWorkerOptions, NewBillingWorker, NewBillingSubscriptionHandler, BillingWorkerGroup, )
var ClickHouse = wire.NewSet( NewClickHouse, )
var Config = wire.NewSet( wire.FieldsOf(new(config.Configuration), "Apps"), wire.FieldsOf(new(config.Configuration), "Aggregation"), wire.FieldsOf(new(config.Configuration), "Billing"), wire.FieldsOf(new(config.BillingConfiguration), "AdvancementStrategy"), wire.FieldsOf(new(config.BillingConfiguration), "FeatureSwitches"), wire.FieldsOf(new(config.AggregationConfiguration), "ClickHouse"), wire.FieldsOf(new(config.Configuration), "Customer"), wire.FieldsOf(new(config.Configuration), "Postgres"), wire.FieldsOf(new(config.Configuration), "Entitlements"), wire.FieldsOf(new(config.Configuration), "Events"), wire.FieldsOf(new(config.Configuration), "Dedupe"), wire.FieldsOf(new(config.KafkaIngestConfiguration), "KafkaConfiguration"), wire.FieldsOf(new(config.Configuration), "Ingest"), wire.FieldsOf(new(config.IngestConfiguration), "Kafka"), wire.FieldsOf(new(config.KafkaIngestConfiguration), "TopicProvisioner"), wire.FieldsOf(new(config.Configuration), "Namespace"), wire.FieldsOf(new(config.Configuration), "Notification"), wire.FieldsOf(new(config.NotificationConfiguration), "Webhook"), wire.FieldsOf(new(config.ProductCatalogConfiguration), "Subscription"), wire.FieldsOf(new(config.Configuration), "Portal"), wire.FieldsOf(new(config.Configuration), "ProductCatalog"), wire.FieldsOf(new(config.Configuration), "ProgressManager"), wire.FieldsOf(new(config.Configuration), "Svix"), wire.FieldsOf(new(config.Configuration), "Telemetry"), wire.FieldsOf(new(config.TelemetryConfig), "Metrics"), wire.FieldsOf(new(config.TelemetryConfig), "Trace"), wire.FieldsOf(new(config.TelemetryConfig), "Log"), wire.FieldsOf(new(config.Configuration), "Termination"), )
We have configs separatly to be able to reuse wires in other projects
var Customer = wire.NewSet( NewCustomerService, )
var Database = wire.NewSet( wire.Struct(new(Migrator), "*"), NewPostgresDriver, NewDB, NewEntPostgresDriver, NewEntClient, )
var Entitlement = wire.NewSet( NewEntitlementRegistry, )
var FFX = wire.NewSet( ffx.NewContextService, )
var Feature = wire.NewSet( NewFeatureConnector, )
var Framework = wire.NewSet( wire.Struct(new(GlobalInitializer), "*"), wire.Struct(new(Runner), "*"), )
var Kafka = wire.NewSet( NewKafkaProducer, NewKafkaMetrics, NewKafkaTopicProvisioner, )
var KafkaIngest = wire.NewSet( NewKafkaIngestNamespaceHandler, )
var KafkaNamespaceResolver = wire.NewSet( NewNamespacedTopicResolver, wire.Bind(new(topicresolver.Resolver), new(*topicresolver.NamespacedTopicResolver)), )
var Lockr = wire.NewSet( NewLocker, )
var Meter = wire.NewSet( NewMeterService, NewMeterAdapter, )
var MeterEvent = wire.NewSet( NewMeterEventService, )
var MeterManage = wire.NewSet( Meter, NewMeterManageService, )
var MeterManageWithConfigMeters = wire.NewSet( wire.FieldsOf(new(config.Configuration), "Meters"), Meter, NewMeterManageService, NewMeterConfigInitializer, )
var Namespace = wire.NewSet( NewNamespaceManager, )
var Notification = wire.NewSet( NewNotificationAdapter, NewNotificationService, NewNotificationWebhookHandler, NewNotificationEventHandler, )
var NotificationService = wire.NewSet( NewNotificationAdapter, NewNotificationService, NewNoopNotificationWebhookHandler, NewNoopNotificationEventHandler, )
NotificationService is a wire set for the notification service, it can be used at places where only the service is required without svix and event handler.
var Plan = wire.NewSet( NewPlanService, )
var PlanAddon = wire.NewSet( NewPlanAddonService, )
var Portal = wire.NewSet( NewPortalService, )
var ProgressManager = wire.NewSet( NewProgressManager, )
var Secret = wire.NewSet( wire.Bind(new(secret.Service), new(*secretservice.Service)), NewUnsafeSecretService, )
var Server = wire.NewSet( NewTelemetryRouterHook, NewFFXConfigContextMiddleware, NewRouterHooks, NewPostAuthMiddlewares, )
var StaticNamespace = wire.NewSet( NewStaticNamespaceDecoder, )
var Streaming = wire.NewSet( NewStreamingConnector, )
var Subject = wire.NewSet( NewSubjectService, NewSubjectAdapter, )
var Subscription = wire.NewSet( NewSubscriptionServices, )
var Telemetry = wire.NewSet( NewTelemetryResource, NewLoggerProvider, wire.Bind(new(log.LoggerProvider), new(*sdklog.LoggerProvider)), NewLogger, NewMeterProvider, wire.Bind(new(metric.MeterProvider), new(*sdkmetric.MeterProvider)), NewMeter, NewTracerProvider, wire.Bind(new(trace.TracerProvider), new(*sdktrace.TracerProvider)), NewTracer, NewHealthChecker, NewTelemetryHandler, NewTelemetryServer, NewRuntimeMetricsCollector, )
var TelemetryWithoutServer = wire.NewSet( NewTelemetryResource, NewLoggerProvider, wire.Bind(new(log.LoggerProvider), new(*sdklog.LoggerProvider)), NewLogger, NewMeterProvider, wire.Bind(new(metric.MeterProvider), new(*sdkmetric.MeterProvider)), NewMeter, NewTracerProvider, wire.Bind(new(trace.TracerProvider), new(*sdktrace.TracerProvider)), NewTracer, NewRuntimeMetricsCollector, )
var Watermill = wire.NewSet( WatermillNoPublisher, NewPublisher, )
var WatermillNoPublisher = wire.NewSet( NewBrokerConfiguration, wire.Struct(new(watermillkafka.PublisherOptions), "*"), NewEventBusPublisher, )
TODO: move this back to Watermill NOTE: this is also used by the sink-worker that requires control over how the publisher is closed
var WatermillNoTopicProvisioning = wire.NewSet( NewNoopKafkaTopicProvisioner, NewEmptyProvisionTopics, )
Functions ¶
func BalanceWorkerGroup ¶
func BalanceWorkerGroup( ctx context.Context, worker *balanceworker.Worker, telemetryServer TelemetryServer, ) run.Group
func BalanceWorkerProvisionTopics ¶
func BalanceWorkerProvisionTopics(conf config.BalanceWorkerConfiguration, eventsConfig config.EventsConfiguration) []pkgkafka.TopicConfig
func BalanceWorkerSubscriber ¶
func BalanceWorkerSubscriber(conf config.BalanceWorkerConfiguration, brokerOptions watermillkafka.BrokerOptions) (message.Subscriber, error)
no closer function: the subscriber is closed by the router/worker
func BillingAdapter ¶
func BillingService ¶
func BillingService( logger *slog.Logger, appService app.Service, billingAdapter billing.Adapter, customerService customer.Service, featureConnector feature.FeatureConnector, meterService meter.Service, streamingConnector streaming.Connector, eventPublisher eventbus.Publisher, billingConfig config.BillingConfiguration, subscriptionServices SubscriptionServiceWithWorkflow, fsConfig config.BillingFeatureSwitchesConfiguration, tracer trace.Tracer, ) (billing.Service, error)
func BillingWorkerGroup ¶
func BillingWorkerGroup( ctx context.Context, worker *billingworker.Worker, telemetryServer TelemetryServer, ) run.Group
func BillingWorkerProvisionTopics ¶
func BillingWorkerProvisionTopics(conf config.BillingConfiguration) []pkgkafka.TopicConfig
func BillingWorkerSubscriber ¶
func BillingWorkerSubscriber(conf config.BillingConfiguration, brokerOptions watermillkafka.BrokerOptions) (message.Subscriber, error)
no closer function: the subscriber is closed by the router/worker
func NewAddonService ¶
func NewAppSandboxFactory ¶
func NewAppSandboxFactory( appsConfig config.AppsConfiguration, appService app.Service, billingService billing.Service, ) (*appsandbox.Factory, error)
func NewAppService ¶
func NewAppStripeService ¶
func NewBalanceWorker ¶
func NewBalanceWorker(workerOptions balanceworker.WorkerOptions) (*balanceworker.Worker, error)
func NewBalanceWorkerFilterStateStorage ¶
func NewBalanceWorkerFilterStateStorage(conf config.BalanceWorkerConfiguration) (balanceworker.FilterStateStorage, error)
func NewBalanceWorkerOptions ¶
func NewBalanceWorkerOptions( eventConfig config.EventsConfiguration, routerOptions router.Options, eventBus eventbus.Publisher, entitlements *registry.Entitlement, repo balanceworker.BalanceWorkerRepository, notificationService notification.Service, subjectService subject.Service, customerService customer.Service, logger *slog.Logger, filterStateStorage balanceworker.FilterStateStorage, ) balanceworker.WorkerOptions
func NewBillingAutoAdvancer ¶
func NewBillingAutoAdvancer(logger *slog.Logger, service billing.Service) (*billingworkerautoadvance.AutoAdvancer, error)
func NewBillingCollector ¶
func NewBillingCollector(logger *slog.Logger, service billing.Service, fs config.BillingFeatureSwitchesConfiguration) (*billingworkercollect.InvoiceCollector, error)
func NewBillingSubscriptionReconciler ¶
func NewBillingSubscriptionReconciler(logger *slog.Logger, subsServices SubscriptionServiceWithWorkflow, subscriptionSync *billingworkersubscription.Handler, customerService customer.Service) (*billingworkersubscription.Reconciler, error)
func NewBillingWorker ¶
func NewBillingWorker(workerOptions billingworker.WorkerOptions) (*billingworker.Worker, error)
func NewBillingWorkerOptions ¶
func NewBillingWorkerOptions( eventConfig config.EventsConfiguration, routerOptions router.Options, eventBus eventbus.Publisher, billingService billing.Service, billingAdapter billing.Adapter, subscriptionServices SubscriptionServiceWithWorkflow, subsSyncHandler *billingworkersubscription.Handler, logger *slog.Logger, ) billingworker.WorkerOptions
func NewBrokerConfiguration ¶
func NewBrokerConfiguration( kafkaConfig config.KafkaConfiguration, appMetadata Metadata, logger *slog.Logger, meter metric.Meter, ) watermillkafka.BrokerOptions
func NewClickHouse ¶
func NewClickHouse(conf config.ClickHouseAggregationConfiguration, tracer trace.Tracer) (clickhouse.Conn, error)
TODO: add closer function?
func NewCustomerService ¶
func NewDefaultTextMapPropagator ¶
func NewDefaultTextMapPropagator() propagation.TextMapPropagator
func NewEmptyProvisionTopics ¶
func NewEmptyProvisionTopics() []pkgkafka.TopicConfig
func NewEntClient ¶
func NewEntClient(driver *entdriver.EntPostgresDriver) *db.Client
TODO: add closer function?
func NewEntPostgresDriver ¶
func NewEntitlementRegistry ¶
func NewEventBusPublisher ¶
func NewFeatureConnector ¶
func NewFlushHandler ¶
func NewFlushHandler( eventsConfig config.EventsConfiguration, sinkConfig config.SinkConfiguration, messagePublisher message.Publisher, eventPublisher eventbus.Publisher, logger *slog.Logger, meter metric.Meter, ) (flushhandler.FlushEventHandler, error)
func NewIngestCollector ¶
func NewIngestService ¶
func NewKafkaAdminClient ¶
func NewKafkaAdminClient(conf config.KafkaConfiguration) (*kafka.AdminClient, error)
func NewKafkaIngestCollector ¶
func NewKafkaIngestCollector( config config.KafkaIngestConfiguration, producer *kafka.Producer, topicResolver topicresolver.Resolver, topicProvisioner pkgkafka.TopicProvisioner, logger *slog.Logger, tracer trace.Tracer, ) (*kafkaingest.Collector, error)
func NewKafkaIngestNamespaceHandler ¶
func NewKafkaIngestNamespaceHandler( topicResolver topicresolver.Resolver, topicProvisioner pkgkafka.TopicProvisioner, ingestConfig config.KafkaIngestConfiguration, ) (*kafkaingest.NamespaceHandler, error)
func NewKafkaMetrics ¶
func NewKafkaMetrics(meter metric.Meter) (*kafkametrics.Metrics, error)
func NewKafkaProducer ¶
func NewKafkaProducer(conf config.KafkaIngestConfiguration, logger *slog.Logger, meta Metadata) (*kafka.Producer, error)
TODO: add closer function?
func NewKafkaTopicProvisioner ¶
func NewKafkaTopicProvisioner( kafkaConfig config.KafkaConfiguration, settings config.TopicProvisionerConfig, logger *slog.Logger, meter metric.Meter, ) (pkgkafka.TopicProvisioner, error)
func NewLogger ¶
func NewLogger(conf config.LogTelemetryConfig, res *resource.Resource, loggerProvider log.LoggerProvider, metadata Metadata, additionalMiddlewares []slogmulti.Middleware) *slog.Logger
func NewLoggerProvider ¶
func NewLoggerProvider(ctx context.Context, conf config.LogTelemetryConfig, res *resource.Resource) (*sdklog.LoggerProvider, func(), error)
func NewMeterAdapter ¶
func NewMeterEventService ¶
func NewMeterManageService ¶
func NewMeterProvider ¶
func NewNamespaceManager ¶
func NewNamespaceManager( conf config.NamespaceConfiguration, ) (*namespace.Manager, error)
func NewNamespacedTopicResolver ¶
func NewNamespacedTopicResolver(config config.KafkaIngestConfiguration) (*topicresolver.NamespacedTopicResolver, error)
func NewNoopKafkaTopicProvisioner ¶
func NewNoopKafkaTopicProvisioner() pkgkafka.TopicProvisioner
func NewNoopNotificationEventHandler ¶
func NewNoopNotificationEventHandler() (notification.EventHandler, func(), error)
func NewNoopNotificationWebhookHandler ¶
func NewNoopNotificationWebhookHandler( logger *slog.Logger, ) (notificationwebhook.Handler, error)
func NewNotificationAdapter ¶
func NewNotificationAdapter( logger *slog.Logger, db *entdb.Client, ) (notification.Repository, error)
func NewNotificationEventHandler ¶
func NewNotificationEventHandler( config config.NotificationConfiguration, logger *slog.Logger, tracer trace.Tracer, adapter notification.Repository, webhook notificationwebhook.Handler, ) (notification.EventHandler, func(), error)
func NewNotificationService ¶
func NewNotificationService( logger *slog.Logger, adapter notification.Repository, webhook notificationwebhook.Handler, eventHandler notification.EventHandler, featureConnector feature.FeatureConnector, ) (notification.Service, error)
func NewPlanAddonService ¶
func NewPlanService ¶
func NewPortalService ¶
func NewPortalService(conf config.PortalConfiguration) (portal.Service, error)
func NewPostAuthMiddlewares ¶
func NewPostAuthMiddlewares( ffx FFXConfigContextMiddleware, ) server.PostAuthMiddlewares
func NewPostgresDriver ¶
func NewPostgresDriver( ctx context.Context, conf config.PostgresConfig, meterProvider metric.MeterProvider, meter metric.Meter, tracerProvider trace.TracerProvider, logger *slog.Logger, ) (*pgdriver.Driver, func(), error)
func NewProgressManager ¶
func NewProgressManager(logger *slog.Logger, conf config.ProgressManagerConfiguration) (progressmanager.Service, error)
NewProgressManager creates a new progress manager service.
func NewPublisher ¶
func NewPublisher( ctx context.Context, options watermillkafka.PublisherOptions, logger *slog.Logger, ) (message.Publisher, func(), error)
NOTE: this is also used by the sink-worker that requires control over how the publisher is closed
func NewRouterHooks ¶
func NewRouterHooks( telemetry TelemetryMiddlewareHook, ) *server.RouterHooks
func NewServerPublisher ¶
func NewServerPublisher( ctx context.Context, options watermillkafka.PublisherOptions, logger *slog.Logger, ) (message.Publisher, func(), error)
func NewSinkWorkerPublisher ¶
func NewSinkWorkerPublisher( ctx context.Context, options watermillkafka.PublisherOptions, logger *slog.Logger, ) (message.Publisher, func(), error)
the sink-worker requires control over how the publisher is closed
func NewStaticNamespaceDecoder ¶
func NewStaticNamespaceDecoder( conf config.NamespaceConfiguration, ) namespacedriver.NamespaceDecoder
func NewStreamingConnector ¶
func NewStreamingConnector( ctx context.Context, conf config.AggregationConfiguration, clickHouse clickhouse.Conn, logger *slog.Logger, progressmanager progressmanager.Service, namespaceManager *namespace.Manager, ) (streaming.Connector, error)
func NewSubjectCustomerHook ¶
func NewSubjectEntitlementValidatorHook ¶
func NewSubjectEntitlementValidatorHook( logger *slog.Logger, entitlementRegistry *registry.Entitlement, subjectService subject.Service, ) (subjecthooks.EntitlementValidatorHook, error)
func NewSubjectService ¶
func NewSvixAPIClient ¶
func NewSvixAPIClient( config config.SvixConfig, meterProvider metric.MeterProvider, tracerProvider trace.TracerProvider, ) (*svix.Svix, error)
func NewTelemetryResource ¶
func NewTerminationCheckerActor ¶
func NewTerminationCheckerActor(r *TerminationChecker, logger *slog.Logger) (execute func() error, interrupt func(error), err error)
NewTerminationCheckerActor returns actor functions (execute, interrupt) which can be passed to run.Add() to register it. Its state is set to TerminationStateStopping as soon as interrupt function is called.
func NewTracer ¶
func NewTracer(tracerProvider trace.TracerProvider, metadata Metadata) trace.Tracer
func NewTracerProvider ¶
func NewUnsafeSecretService ¶
func NotificationServiceProvisionTopics ¶
func NotificationServiceProvisionTopics(conf config.NotificationConfiguration) []pkgkafka.TopicConfig
func ServerProvisionTopics ¶
func ServerProvisionTopics(conf config.EventsConfiguration) []pkgkafka.TopicConfig
func SinkWorkerProvisionTopics ¶
func SinkWorkerProvisionTopics(conf config.EventsConfiguration) []pkgkafka.TopicConfig
func TelemetryLoggerNoAdditionalMiddlewares ¶
func TelemetryLoggerNoAdditionalMiddlewares() []slogmulti.Middleware
Types ¶
type AppRegistry ¶
type AppRegistry struct {
Service app.Service
SandboxProvisioner AppSandboxProvisioner
Stripe appstripe.Service
CustomInvoicing appcustominvoicing.Service
}
func NewAppRegistry ¶
func NewAppRegistry( Service app.Service, SandboxProvisioner AppSandboxProvisioner, Stripe appstripe.Service, CustomInvoicing appcustominvoicing.Service, ) AppRegistry
type AppSandboxProvisioner ¶
type BalanceWorkerEntitlementRepo ¶
type BalanceWorkerEntitlementRepo interface {
entitlement.EntitlementRepo
balanceworker.BalanceWorkerRepository
}
func NewBalanceWorkerEntitlementRepo ¶
func NewBalanceWorkerEntitlementRepo(db *db.Client) BalanceWorkerEntitlementRepo
type CustomerEntitlementValidatorHook ¶
type CustomerEntitlementValidatorHook customerservicehooks.EntitlementValidatorHook
func NewCustomerEntitlementValidatorServiceHook ¶
func NewCustomerEntitlementValidatorServiceHook( logger *slog.Logger, entitlementRegistry *registry.Entitlement, customerService customer.Service, ) (CustomerEntitlementValidatorHook, error)
type CustomerSubjectHook ¶
type CustomerSubjectHook customerservicehooks.SubjectCustomerHook
func NewCustomerSubjectServiceHook ¶
func NewCustomerSubjectServiceHook( config config.CustomerConfiguration, logger *slog.Logger, tracer trace.Tracer, subjectService subject.Service, customerService customer.Service, customerOverrideService billing.CustomerOverrideService, ) (CustomerSubjectHook, error)
type CustomerSubjectValidatorHook ¶
type CustomerSubjectValidatorHook customerservicehooks.SubjectValidatorHook
type FFXConfigContextMiddleware ¶
type FFXConfigContextMiddleware api.MiddlewareFunc
func NewFFXConfigContextMiddleware ¶
func NewFFXConfigContextMiddleware( subsConfig config.SubscriptionConfiguration, namespaceDriver namespacedriver.NamespaceDecoder, logger *slog.Logger, ) FFXConfigContextMiddleware
NewFFXConfigContextMiddleware creates a middleware hook that sets the feature flag access context on the request context. This hook MUST register after any session authentication step so user namespaces are available.
type GlobalInitializer ¶
type GlobalInitializer struct {
Logger *slog.Logger
MeterProvider metric.MeterProvider
TracerProvider trace.TracerProvider
TextMapPropagator propagation.TextMapPropagator
}
GlobalInitializer initializes global variables for the application.
func (*GlobalInitializer) SetGlobals ¶
func (i *GlobalInitializer) SetGlobals()
SetGlobals initializes global variables for the application.
It is intended to be embedded into application structs and be called from func main.
type LevelHandler ¶
type LevelHandler struct {
// contains filtered or unexported fields
}
LevelHandler is a slog.Handler that filters log records based on the log level.
func NewLevelHandler ¶
func NewLevelHandler(handler slog.Handler, level slog.Leveler) *LevelHandler
NewLevelHandler returns a new LevelHandler.
type Metadata ¶
type Metadata struct {
ServiceName string
Version string
Environment string
OpenTelemetryName string
AdditionalAttributes []attribute.KeyValue
}
Metadata provides information about the service to components that need it (eg. telemetry).
func NewMetadata ¶
type MeterConfigInitializer ¶
func NewMeterConfigInitializer ¶
func NewMeterConfigInitializer( logger *slog.Logger, configMeters []*meter.Meter, meterManagerService meter.ManageService, namespaceManager *namespace.Manager, ) MeterConfigInitializer
type RuntimeMetricsCollector ¶
type RuntimeMetricsCollector struct {
// contains filtered or unexported fields
}
func NewRuntimeMetricsCollector ¶
func NewRuntimeMetricsCollector( meterProvider metric.MeterProvider, conf config.TelemetryConfig, logger *slog.Logger, ) (RuntimeMetricsCollector, error)
func (RuntimeMetricsCollector) Start ¶
func (c RuntimeMetricsCollector) Start() error
type SubscriptionServiceWithWorkflow ¶
type SubscriptionServiceWithWorkflow struct {
Service subscription.Service
WorkflowService subscriptionworkflow.Service
PlanSubscriptionService plansubscription.PlanSubscriptionService
SubscriptionAddonService subscriptionaddon.Service
}
TODO: break up to multiple initializers
func NewSubscriptionServices ¶
func NewSubscriptionServices( logger *slog.Logger, db *entdb.Client, featureConnector feature.FeatureConnector, entitlementRegistry *registry.Entitlement, customerService customer.Service, planService plan.Service, planAddonService planaddon.Service, addonService addon.Service, eventPublisher eventbus.Publisher, lockr *lockr.Locker, featureFlags ffx.Service, ) (SubscriptionServiceWithWorkflow, error)
type TelemetryHandler ¶
func NewTelemetryHandler ¶
func NewTelemetryHandler( metricsConf config.MetricsTelemetryConfig, healthChecker health.Health, runtimeMetricsCollector RuntimeMetricsCollector, logger *slog.Logger, ) TelemetryHandler
type TelemetryMiddlewareHook ¶
type TelemetryMiddlewareHook server.MiddlewareHook
func NewTelemetryRouterHook ¶
func NewTelemetryRouterHook(meterProvider metric.MeterProvider, tracerProvider trace.TracerProvider) TelemetryMiddlewareHook
type TelemetryServer ¶
func NewTelemetryServer ¶
func NewTelemetryServer(conf config.TelemetryConfig, handler TelemetryHandler) (TelemetryServer, func())
type TerminationChecker ¶
type TerminationChecker struct {
// contains filtered or unexported fields
}
func NewTerminationChecker ¶
func NewTerminationChecker(conf config.TerminationConfig, healthChecker health.Health) (*TerminationChecker, error)
NewTerminationChecker returns a new TerminationChecker and registers the TerminationChecker to the provided health checker.
func (*TerminationChecker) Execute ¶
func (s *TerminationChecker) Execute(_ context.Context) (interface{}, error)
func (*TerminationChecker) IsRunning ¶
func (s *TerminationChecker) IsRunning() bool
func (*TerminationChecker) IsTerminating ¶
func (s *TerminationChecker) IsTerminating() bool
func (*TerminationChecker) Name ¶
func (s *TerminationChecker) Name() string
func (*TerminationChecker) Terminate ¶
func (s *TerminationChecker) Terminate(reason error)
func (*TerminationChecker) WaitForPropagation ¶
func (s *TerminationChecker) WaitForPropagation(ctx context.Context) error
WaitForPropagation blocks for
type TerminationState ¶
type TerminationState = string
const ( TerminationStateRunning TerminationState = "running" TerminationStateTerminating TerminationState = "terminating" )
Source Files
¶
- app.go
- billing.go
- clickhouse.go
- config.go
- customer.go
- database.go
- entitlement.go
- ffx.go
- framework.go
- globals.go
- kafka.go
- locker.go
- metadata.go
- meter.go
- meterevent.go
- namespace.go
- notification.go
- openmeter_balanceworker.go
- openmeter_billingworker.go
- openmeter_notification.go
- openmeter_server.go
- openmeter_sinkworker.go
- portal.go
- productcatalog.go
- progressmanager.go
- runner.go
- secret.go
- server.go
- streaming.go
- subject.go
- subscription.go
- svix.go
- telemetry.go
- termination.go
- watermill.go