common

package
v1.0.0-beta.228 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 19, 2026 License: Apache-2.0 Imports: 212 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultShutdownTimeout = 5 * time.Second
)

Variables

Billing is the Wire provider set for the billing and charges stack. Downstream consumers should depend on BillingRegistry.

View Source
var ClickHouse = wire.NewSet(
	NewClickHouse,
)
View Source
var Config = wire.NewSet(

	wire.FieldsOf(new(config.Configuration), "Apps"),

	wire.FieldsOf(new(config.Configuration), "Aggregation"),

	wire.FieldsOf(new(config.Configuration), "Billing"),
	wire.FieldsOf(new(config.BillingConfiguration), "AdvancementStrategy"),
	wire.FieldsOf(new(config.BillingConfiguration), "FeatureSwitches"),

	wire.FieldsOf(new(config.AggregationConfiguration), "ClickHouse"),

	wire.FieldsOf(new(config.Configuration), "Customer"),

	wire.FieldsOf(new(config.Configuration), "Credits"),

	wire.FieldsOf(new(config.Configuration), "Postgres"),

	wire.FieldsOf(new(config.Configuration), "Entitlements"),

	wire.FieldsOf(new(config.Configuration), "Events"),
	wire.FieldsOf(new(config.Configuration), "Dedupe"),

	wire.FieldsOf(new(config.KafkaIngestConfiguration), "KafkaConfiguration"),
	wire.FieldsOf(new(config.Configuration), "Ingest"),
	wire.FieldsOf(new(config.IngestConfiguration), "Kafka"),
	wire.FieldsOf(new(config.KafkaIngestConfiguration), "TopicProvisioner"),

	wire.FieldsOf(new(config.Configuration), "Namespace"),

	wire.FieldsOf(new(config.Configuration), "Notification"),
	wire.FieldsOf(new(config.NotificationConfiguration), "Webhook"),

	wire.FieldsOf(new(config.ProductCatalogConfiguration), "Subscription"),

	wire.FieldsOf(new(config.Configuration), "Portal"),

	wire.FieldsOf(new(config.Configuration), "ProductCatalog"),

	wire.FieldsOf(new(config.Configuration), "ProgressManager"),

	wire.FieldsOf(new(config.Configuration), "ReservedEventTypes"),

	wire.FieldsOf(new(config.Configuration), "Svix"),

	wire.FieldsOf(new(config.Configuration), "Telemetry"),
	wire.FieldsOf(new(config.TelemetryConfig), "Metrics"),
	wire.FieldsOf(new(config.TelemetryConfig), "Trace"),
	wire.FieldsOf(new(config.TelemetryConfig), "Log"),

	wire.FieldsOf(new(config.Configuration), "Termination"),
)

We have configs separatly to be able to reuse wires in other projects

View Source
var Framework = wire.NewSet(
	wire.Struct(new(GlobalInitializer), "*"),
	wire.Struct(new(Runner), "*"),
)

LedgerStack is the full provider set for the ledger stack. Callers must provide *entdb.Client and *lockr.Locker (e.g. via common.Lockr).

View Source
var Lockr = wire.NewSet(
	NewLocker,
)
View Source
var NewFeatureResolver = featureresolver.New

NotificationService is a wire set for the notification service, it can be used at places where only the service is required without svix and event handler.

View Source
var ProductCatalog = wire.NewSet(
	Feature,
	Cost,
	Plan,
	Addon,
	PlanAddon,
)
View Source
var ProgressManager = wire.NewSet(
	NewProgressManager,
)
View Source
var StaticNamespace = wire.NewSet(
	NewStaticNamespaceDecoder,
)

TODO: move this back to Watermill NOTE: this is also used by the sink-worker that requires control over how the publisher is closed

View Source
var WatermillRouter = wire.NewSet(
	wire.Struct(new(router.Options), "*"),
)

Functions

func BalanceWorkerGroup

func BalanceWorkerGroup(
	ctx context.Context,
	worker *balanceworker.Worker,
	telemetryServer TelemetryServer,
) run.Group

func BalanceWorkerSubscriber

func BalanceWorkerSubscriber(conf config.BalanceWorkerConfiguration, brokerOptions watermillkafka.BrokerOptions) (message.Subscriber, error)

no closer function: the subscriber is closed by the router/worker

func BillingAdapter

func BillingAdapter(
	logger *slog.Logger,
	db *entdb.Client,
) (billing.Adapter, error)

func BillingWorkerGroup

func BillingWorkerGroup(
	ctx context.Context,
	worker *billingworker.Worker,
	telemetryServer TelemetryServer,
) run.Group

func BillingWorkerProvisionTopics

func BillingWorkerProvisionTopics(conf config.BillingConfiguration) []pkgkafka.TopicConfig

func BillingWorkerSubscriber

func BillingWorkerSubscriber(conf config.BillingConfiguration, brokerOptions watermillkafka.BrokerOptions) (message.Subscriber, error)

no closer function: the subscriber is closed by the router/worker

func NewAddonService

func NewAddonService(
	logger *slog.Logger,
	db *entdb.Client,
	featureResolver productcatalog.FeatureResolver,
	taxCodeService taxcode.Service,
	publisher eventbus.Publisher,
) (addon.Service, error)

func NewAppCustomInvoicingService

func NewAppCustomInvoicingService(logger *slog.Logger, db *entdb.Client, appsConfig config.AppsConfiguration, appService app.Service, customerService customer.Service, secretService secret.Service, billingRegistry BillingRegistry, publisher eventbus.Publisher) (appcustominvoicing.Service, error)

func NewAppSandboxFactory

func NewAppSandboxFactory(
	appsConfig config.AppsConfiguration,
	appService app.Service,
	billing BillingRegistry,
) (*appsandbox.Factory, error)

func NewAppService

func NewAppService(
	logger *slog.Logger,
	db *entdb.Client,
	publisher eventbus.Publisher,
) (app.Service, error)

func NewAppStripeService

func NewAppStripeService(logger *slog.Logger, db *entdb.Client, appsConfig config.AppsConfiguration, appService app.Service, customerService customer.Service, secretService secret.Service, billingRegistry BillingRegistry, publisher eventbus.Publisher) (appstripe.Service, error)

func NewBalanceWorker

func NewBalanceWorker(workerOptions balanceworker.WorkerOptions) (*balanceworker.Worker, error)

func NewBalanceWorkerOptions

func NewBalanceWorkerOptions(
	eventConfig config.EventsConfiguration,
	routerOptions router.Options,
	eventBus eventbus.Publisher,
	entitlements *registry.Entitlement,
	repo balanceworker.BalanceWorkerRepository,
	notificationService notification.Service,
	subjectService subject.Service,
	customerService customer.Service,
	logger *slog.Logger,
	balanceWorkerConfiguration config.BalanceWorkerConfiguration,
) balanceworker.WorkerOptions

func NewBillingAutoAdvancer

func NewBillingAutoAdvancer(logger *slog.Logger, billingRegistry BillingRegistry) (*billingworkerautoadvance.AutoAdvancer, error)

func NewBillingCustomerOverrideService

func NewBillingCustomerOverrideService(billingRegistry BillingRegistry) billing.CustomerOverrideService

func NewBillingRatingService

func NewBillingRatingService() rating.Service

func NewBillingSubscriptionReconciler

func NewBillingSubscriptionReconciler(logger *slog.Logger, subsServices SubscriptionServiceWithWorkflow, subscriptionSync subscriptionsync.Service, customerService customer.Service) (*reconciler.Reconciler, error)

func NewBillingSubscriptionSyncAdapter

func NewBillingSubscriptionSyncAdapter(db *entdb.Client) (subscriptionsync.Adapter, error)

func NewBillingSubscriptionSyncService

func NewBillingSubscriptionSyncService(logger *slog.Logger, subsServices SubscriptionServiceWithWorkflow, billingRegistry BillingRegistry, subscriptionSyncAdapter subscriptionsync.Adapter, tracer trace.Tracer, creditsConfig config.CreditsConfiguration) (subscriptionsync.Service, error)

func NewBillingWorker

func NewBillingWorker(workerOptions billingworker.WorkerOptions) (*billingworker.Worker, error)

func NewBillingWorkerOptions

func NewBillingWorkerOptions(
	eventConfig config.EventsConfiguration,
	routerOptions router.Options,
	eventBus eventbus.Publisher,
	billingRegistry BillingRegistry,
	subscriptionServices SubscriptionServiceWithWorkflow,
	subscriptionSyncService subscriptionsync.Service,
	billingFsConfig config.BillingFeatureSwitchesConfiguration,
	logger *slog.Logger,
) billingworker.WorkerOptions

func NewBrokerConfiguration

func NewBrokerConfiguration(
	kafkaConfig config.KafkaConfiguration,
	appMetadata Metadata,
	logger *slog.Logger,
	meter metric.Meter,
) watermillkafka.BrokerOptions

func NewChargesAdapter

func NewChargesAdapter(
	db *entdb.Client,
	logger *slog.Logger,
) (charges.Adapter, error)

func NewChargesAutoAdvancer

func NewChargesAutoAdvancer(logger *slog.Logger, billingRegistry BillingRegistry) (*chargesworkeradvance.AutoAdvancer, error)

func NewChargesCollectorService

func NewChargesCollectorService(
	ledgerService ledger.Ledger,
	balanceQuerier ledger.BalanceQuerier,
	accountResolver ledger.AccountResolver,
	accountService ledgeraccount.Service,
) ledgercollector.Service

func NewChargesCreditPurchaseAdapter

func NewChargesCreditPurchaseAdapter(
	db *entdb.Client,
	logger *slog.Logger,
	metaAdapter meta.Adapter,
) (creditpurchase.Adapter, error)

func NewChargesCreditPurchaseHandler

func NewChargesCreditPurchaseHandler(
	ledgerService ledger.Ledger,
	balanceQuerier ledger.BalanceQuerier,
	accountResolver ledger.AccountResolver,
	accountService ledgeraccount.Service,
) creditpurchase.Handler

func NewChargesCreditPurchaseService

func NewChargesCreditPurchaseService(
	creditPurchaseAdapter creditpurchase.Adapter,
	creditPurchaseHandler creditpurchase.Handler,
	lineageService lineage.Service,
	metaAdapter meta.Adapter,
) (creditpurchase.Service, error)

func NewChargesFlatFeeAdapter

func NewChargesFlatFeeAdapter(
	db *entdb.Client,
	logger *slog.Logger,
	metaAdapter meta.Adapter,
) (flatfee.Adapter, error)

func NewChargesFlatFeeHandler

func NewChargesFlatFeeHandler(
	ledgerService ledger.Ledger,
	balanceQuerier ledger.BalanceQuerier,
	accountResolver ledger.AccountResolver,
	accountService ledgeraccount.Service,
	collectorService ledgercollector.Service,
) flatfee.Handler

func NewChargesFlatFeeService

func NewChargesFlatFeeService(
	flatFeeAdapter flatfee.Adapter,
	flatFeeHandler flatfee.Handler,
	lineageService lineage.Service,
	metaAdapter meta.Adapter,
	locker *lockr.Locker,
	ratingService rating.Service,
) (flatfee.Service, error)

func NewChargesLineageAdapter

func NewChargesLineageAdapter(
	db *entdb.Client,
) (lineage.Adapter, error)

func NewChargesLineageService

func NewChargesLineageService(
	lineageAdapter lineage.Adapter,
) (lineage.Service, error)

func NewChargesMetaAdapter

func NewChargesMetaAdapter(
	db *entdb.Client,
	logger *slog.Logger,
) (meta.Adapter, error)

func NewChargesService

func NewChargesService(
	logger *slog.Logger,
	rootAdapter charges.Adapter,
	metaAdapter meta.Adapter,
	featureService feature.FeatureConnector,
	flatFeeSvc flatfee.Service,
	creditPurchaseSvc creditpurchase.Service,
	usageBasedSvc usagebased.Service,
	billingService billing.Service,
	recognizerService recognizer.Service,
	fsNamespaceLockdown []string,
) (charges.Service, error)

func NewChargesUsageBasedAdapter

func NewChargesUsageBasedAdapter(
	db *entdb.Client,
	logger *slog.Logger,
	metaAdapter meta.Adapter,
) (usagebased.Adapter, error)

func NewChargesUsageBasedHandler

func NewChargesUsageBasedHandler(
	ledgerService ledger.Ledger,
	balanceQuerier ledger.BalanceQuerier,
	accountResolver ledger.AccountResolver,
	accountService ledgeraccount.Service,
	collectorService ledgercollector.Service,
) usagebased.Handler

func NewChargesUsageBasedService

func NewChargesUsageBasedService(
	usageBasedAdapter usagebased.Adapter,
	usageBasedHandler usagebased.Handler,
	lineageService lineage.Service,
	locker *lockr.Locker,
	metaAdapter meta.Adapter,
	billingService billing.Service,
	featureService feature.FeatureConnector,
	ratingService rating.Service,
	streamingConnector streaming.Connector,
) (usagebased.Service, error)

func NewClickHouse

func NewClickHouse(ctx context.Context, conf config.ClickHouseAggregationConfiguration, tracer trace.Tracer, meter metric.Meter, logger *slog.Logger) (clickhouse.Conn, func(), error)

func NewCostService

func NewCostService(
	featureConnector feature.FeatureConnector,
	meterService meter.Service,
	streamingConnector streaming.Connector,
	llmcostService llmcost.Service,
) (cost.Service, error)

func NewCreditGrantService

func NewCreditGrantService(
	billingRegistry BillingRegistry,
	customerService customer.Service,
) (creditgrant.Service, error)

func NewCurrencyService

func NewCurrencyService(logger *slog.Logger, db *entdb.Client) (currencies.CurrencyService, error)

func NewCustomerBalanceFacade

func NewCustomerBalanceFacade(service customerbalance.Service) (*customerbalance.Facade, error)

func NewCustomerBalanceService

func NewCustomerBalanceService(
	creditsConfig config.CreditsConfiguration,
	historicalLedger ledger.Ledger,
	balanceQuerier ledger.BalanceQuerier,
	accountResolver ledger.AccountResolver,
	accountService ledgeraccount.Service,
	billingRegistry BillingRegistry,
) (customerbalance.Service, error)

func NewCustomerService

func NewCustomerService(
	logger *slog.Logger,
	db *entdb.Client,
	eventPublisher eventbus.Publisher,
) (customer.Service, error)

func NewDB

func NewDB(driver *pgdriver.Driver) *sql.DB

TODO: add closer function?

func NewDefaultTextMapPropagator

func NewDefaultTextMapPropagator() propagation.TextMapPropagator

func NewEmptyProvisionTopics

func NewEmptyProvisionTopics() []pkgkafka.TopicConfig

func NewEntClient

func NewEntClient(driver *entdriver.EntPostgresDriver) *db.Client

TODO: add closer function?

func NewEntPostgresDriver

func NewEntPostgresDriver(db *sql.DB, logger *slog.Logger) (*entdriver.EntPostgresDriver, func())

func NewEntitlementRegistry

func NewEntitlementRegistry(
	logger *slog.Logger,
	db *entdb.Client,
	tracer trace.Tracer,
	entitlementConfig config.EntitlementsConfiguration,
	streamingConnector streaming.Connector,
	meterService meter.Service,
	eventPublisher eventbus.Publisher,
	locker *lockr.Locker,
	customerService customer.Service,
) (*registry.Entitlement, error)

func NewEventBusPublisher

func NewEventBusPublisher(
	publisher message.Publisher,
	conf config.EventsConfiguration,
	logger *slog.Logger,
) (eventbus.Publisher, error)

func NewFeatureConnector

func NewFeatureConnector(
	logger *slog.Logger,
	db *entdb.Client,
	meterService meter.Service,
	publisher eventbus.Publisher,
) feature.FeatureConnector

func NewFlushHandlerManager

func NewFlushHandlerManager(
	sinkConfig config.SinkConfiguration,
	messagePublisher message.Publisher,
	logger *slog.Logger,
	handlers []flushhandler.FlushEventHandler,
) (flushhandler.FlushEventHandler, func(), error)

func NewHealthChecker

func NewHealthChecker(logger *slog.Logger) health.Health

func NewIngestCollector

func NewIngestCollector(
	dedupeConfig config.DedupeConfiguration,
	kafkaCollector *kafkaingest.Collector,
	logger *slog.Logger,
	meter metric.Meter,
	tracer trace.Tracer,
) (ingest.Collector, func(), error)

func NewIngestService

func NewIngestService(
	collector ingest.Collector,
	logger *slog.Logger,
) (ingest.Service, error)

func NewKafkaAdminClient

func NewKafkaAdminClient(conf config.KafkaConfiguration) (*kafka.AdminClient, error)

func NewKafkaConsumer

func NewKafkaConsumer(conf pkgkafka.ConsumerConfig, logger *slog.Logger) (*kafka.Consumer, func(), error)

func NewKafkaIngestCollector

func NewKafkaIngestCollector(
	config config.KafkaIngestConfiguration,
	producer *kafka.Producer,
	topicResolver topicresolver.Resolver,
	topicProvisioner pkgkafka.TopicProvisioner,
	logger *slog.Logger,
	tracer trace.Tracer,
) (*kafkaingest.Collector, error)

func NewKafkaIngestNamespaceHandler

func NewKafkaIngestNamespaceHandler(
	topicResolver topicresolver.Resolver,
	topicProvisioner pkgkafka.TopicProvisioner,
	ingestConfig config.KafkaIngestConfiguration,
) (*kafkaingest.NamespaceHandler, error)

func NewKafkaMetrics

func NewKafkaMetrics(meter metric.Meter) (*kafkametrics.Metrics, error)

func NewKafkaProducer

func NewKafkaProducer(conf config.KafkaIngestConfiguration, logger *slog.Logger, meta Metadata) (*kafka.Producer, error)

TODO: add closer function?

func NewKafkaTopicProvisioner

func NewKafkaTopicProvisioner(
	kafkaConfig config.KafkaConfiguration,
	settings config.TopicProvisionerConfig,
	logger *slog.Logger,
	meter metric.Meter,
) (pkgkafka.TopicProvisioner, error)

func NewLLMCostService

func NewLLMCostService(logger *slog.Logger, db *entdb.Client) (llmcost.Service, error)

func NewLLMCostSyncJob

func NewLLMCostSyncJob(logger *slog.Logger, db *entdb.Client) (*llmcostsync.SyncJob, error)

func NewLedgerAccountCatalog

func NewLedgerAccountCatalog(accountSvc ledgeraccount.Service) ledger.AccountCatalog

func NewLedgerAccountLocker

func NewLedgerAccountLocker(accountSvc ledgeraccount.Service) ledger.AccountLocker

func NewLedgerAccountRepo

func NewLedgerAccountRepo(db *entdb.Client) ledgeraccount.Repo

func NewLedgerAccountResolver

func NewLedgerAccountResolver(accountResolver customerLedgerProvisioner) ledger.AccountResolver

func NewLedgerAccountService

func NewLedgerAccountService(
	creditsConfig config.CreditsConfiguration,
	repo ledgeraccount.Repo,
	locker *lockr.Locker,
) ledgeraccount.Service

func NewLedgerBalanceQuerier

func NewLedgerBalanceQuerier(historicalLedger ledgerReadWriter) ledger.BalanceQuerier

func NewLedgerHistoricalLedger

func NewLedgerHistoricalLedger(
	creditsConfig config.CreditsConfiguration,
	repo historical.Repo,
	accountCatalog ledger.AccountCatalog,
	accountLocker ledger.AccountLocker,
	routingValidator ledger.RoutingValidator,
) ledgerReadWriter

func NewLedgerHistoricalRepo

func NewLedgerHistoricalRepo(db *entdb.Client) historical.Repo

func NewLedgerNamespaceHandler

func NewLedgerNamespaceHandler(accountResolver ledger.AccountResolver) namespace.Handler

func NewLedgerResolversRepo

func NewLedgerResolversRepo(db *entdb.Client) resolvers.CustomerAccountRepo

func NewLedgerResolversService

func NewLedgerResolversService(
	creditsConfig config.CreditsConfiguration,
	accountSvc ledgeraccount.Service,
	repo resolvers.CustomerAccountRepo,
	locker *lockr.Locker,
) customerLedgerProvisioner

func NewLedgerRoutingValidator

func NewLedgerRoutingValidator() ledger.RoutingValidator

func NewLedgerService

func NewLedgerService(historicalLedger ledgerReadWriter) ledger.Ledger

func NewLocker

func NewLocker(
	logger *slog.Logger,
) (*lockr.Locker, error)

func NewLogger

func NewLogger(conf config.LogTelemetryConfig, res *resource.Resource, loggerProvider log.LoggerProvider, metadata Metadata, additionalMiddlewares []slogmulti.Middleware) *slog.Logger

func NewLoggerProvider

func NewLoggerProvider(ctx context.Context, conf config.LogTelemetryConfig, res *resource.Resource) (*sdklog.LoggerProvider, func(), error)

func NewMeter

func NewMeter(meterProvider metric.MeterProvider, metadata Metadata) metric.Meter

func NewMeterAdapter

func NewMeterAdapter(
	logger *slog.Logger,
	db *entdb.Client,
) (*adapter.Adapter, error)

func NewMeterEventService

func NewMeterEventService(
	streamingConnector streaming.Connector,
	customerService customer.Service,
	meterService meter.Service,
) meterevent.Service

func NewMeterManageService

func NewMeterManageService(
	meterAdapter *adapter.Adapter,
	namespaceManager *namespace.Manager,
	publisher eventbus.Publisher,
	reservedEventTypes []*meter.EventTypePattern,
) meter.ManageService

func NewMeterProvider

func NewMeterProvider(ctx context.Context, conf config.MetricsTelemetryConfig, res *resource.Resource, logger *slog.Logger) (*sdkmetric.MeterProvider, func(), error)

func NewMeterService

func NewMeterService(
	meterAdapter *adapter.Adapter,
) meter.Service

func NewNamespaceManager

func NewNamespaceManager(
	conf config.NamespaceConfiguration,
) (*namespace.Manager, error)

func NewNoopKafkaTopicProvisioner

func NewNoopKafkaTopicProvisioner() pkgkafka.TopicProvisioner

func NewNoopNotificationWebhookHandler

func NewNoopNotificationWebhookHandler(
	logger *slog.Logger,
) (notificationwebhook.Handler, error)

func NewNotificationAdapter

func NewNotificationAdapter(
	logger *slog.Logger,
	db *entdb.Client,
) (notification.Repository, error)

func NewNotificationService

func NewNotificationService(
	logger *slog.Logger,
	adapter notification.Repository,
	webhook notificationwebhook.Handler,
	featureConnector feature.FeatureConnector,
) (notification.Service, error)

func NewNotificationWebhookHandler

func NewNotificationWebhookHandler(
	logger *slog.Logger,
	tracer trace.Tracer,
	webhookConfig config.WebhookConfiguration,
	svixClient *svix.Svix,
) (notificationwebhook.Handler, error)

func NewPlanAddonService

func NewPlanAddonService(
	logger *slog.Logger,
	db *entdb.Client,
	planService plan.Service,
	addonService addon.Service,
	publisher eventbus.Publisher,
) (planaddon.Service, error)

func NewPlanService

func NewPlanService(
	logger *slog.Logger,
	db *entdb.Client,
	featureResolver productcatalog.FeatureResolver,
	taxCodeService taxcode.Service,
	publisher eventbus.Publisher,
) (plan.Service, error)

func NewPortalService

func NewPortalService(conf config.PortalConfiguration) (portal.Service, error)

func NewPostgresDriver

func NewPostgresDriver(
	ctx context.Context,
	conf config.PostgresConfig,
	meterProvider metric.MeterProvider,
	meter metric.Meter,
	tracerProvider trace.TracerProvider,
	logger *slog.Logger,
) (*pgdriver.Driver, func(), error)

func NewProgressManager

func NewProgressManager(logger *slog.Logger, conf config.ProgressManagerConfiguration) (progressmanager.Service, error)

NewProgressManager creates a new progress manager service.

func NewPublisher

func NewPublisher(
	ctx context.Context,
	options watermillkafka.PublisherOptions,
	logger *slog.Logger,
) (message.Publisher, func(), error)

NOTE: this is also used by the sink-worker that requires control over how the publisher is closed

func NewRecognizerService

func NewRecognizerService(
	db *entdb.Client,
	ledgerService ledger.Ledger,
	balanceQuerier ledger.BalanceQuerier,
	accountResolver ledger.AccountResolver,
	accountService ledgeraccount.Service,
	lineageService lineage.Service,
) (recognizer.Service, error)

func NewReservedEventTypePatterns

func NewReservedEventTypePatterns(reserved []config.ReservedEventTypePattern) ([]*meter.EventTypePattern, error)

func NewRouterHooks

func NewRouterHooks(
	telemetry TelemetryMiddlewareHook,
) *server.RouterHooks

func NewServerPublisher

func NewServerPublisher(
	ctx context.Context,
	options watermillkafka.PublisherOptions,
	logger *slog.Logger,
) (message.Publisher, func(), error)

func NewSink

func NewSink(
	conf config.SinkConfiguration,
	logger *slog.Logger,
	metricMeter metric.Meter,
	tracer trace.Tracer,
	kafkaConsumer *SinkKafkaConsumer,
	sinkStorage sink.Storage,
	deduplicator dedupe.Deduplicator,
	meterService meter.Service,
	topicResolver topicresolver.Resolver,
	flushHandler flushhandler.FlushEventHandler,
) (*sink.Sink, func(), error)

func NewSinkDeduplicator

func NewSinkDeduplicator(conf config.SinkConfiguration, logger *slog.Logger) (dedupe.Deduplicator, func(), error)

func NewSinkKafkaConsumer

func NewSinkKafkaConsumer(conf config.SinkConfiguration, logger *slog.Logger) (*kafka.Consumer, func(), error)

func NewSinkStorage

func NewSinkStorage(
	streaming streaming.Connector,
) (sink.Storage, error)

func NewSinkWorkerPublisher

func NewSinkWorkerPublisher(
	ctx context.Context,
	options watermillkafka.PublisherOptions,
	logger *slog.Logger,
) (message.Publisher, func(), error)

the sink-worker requires control over how the publisher is closed

func NewStreamingConnector

func NewStreamingConnector(
	ctx context.Context,
	conf config.AggregationConfiguration,
	clickHouse clickhouse.Conn,
	logger *slog.Logger,
	progressmanager progressmanager.Service,
	namespaceManager *namespace.Manager,
) (streaming.Connector, error)

func NewSubjectAdapter

func NewSubjectAdapter(
	db *entdb.Client,
) (subject.Adapter, error)

func NewSubjectCustomerHook

func NewSubjectCustomerHook(
	subject subject.Service,
	customer customer.Service,
	logger *slog.Logger,
	tracer trace.Tracer,
) (subjecthooks.CustomerSubjectHook, error)

func NewSubjectService

func NewSubjectService(
	adapter subject.Adapter,
) (subject.Service, error)

func NewSvixAPIClient

func NewSvixAPIClient(
	config config.SvixConfig,
	meterProvider metric.MeterProvider,
	tracerProvider trace.TracerProvider,
) (*svix.Svix, error)

func NewTaxCodeAdapter

func NewTaxCodeAdapter(logger *slog.Logger, db *entdb.Client) (taxcode.Repository, error)

func NewTaxCodeService

func NewTaxCodeService(
	logger *slog.Logger,
	adapter taxcode.Repository,
) (taxcode.Service, error)

func NewTelemetryResource

func NewTelemetryResource(metadata Metadata) *resource.Resource

func NewTerminationCheckerActor

func NewTerminationCheckerActor(r *TerminationChecker, logger *slog.Logger) (execute func() error, interrupt func(error), err error)

NewTerminationCheckerActor returns actor functions (execute, interrupt) which can be passed to run.Add() to register it. Its state is set to TerminationStateStopping as soon as interrupt function is called.

func NewTracer

func NewTracer(tracerProvider trace.TracerProvider, metadata Metadata) trace.Tracer

func NewTracerProvider

func NewTracerProvider(ctx context.Context, conf config.TraceTelemetryConfig, res *resource.Resource, logger *slog.Logger) (*sdktrace.TracerProvider, func(), error)

func NewUnsafeSecretService

func NewUnsafeSecretService(logger *slog.Logger, db *entdb.Client) (*secretservice.Service, error)

func NotificationServiceProvisionTopics

func NotificationServiceProvisionTopics(conf config.NotificationConfiguration) []pkgkafka.TopicConfig

func ServerProvisionTopics

func ServerProvisionTopics(conf config.EventsConfiguration) []pkgkafka.TopicConfig

func SinkWorkerProvisionTopics

func SinkWorkerProvisionTopics(conf config.EventsConfiguration) []pkgkafka.TopicConfig

func TelemetryLoggerNoAdditionalMiddlewares

func TelemetryLoggerNoAdditionalMiddlewares() []slogmulti.Middleware

Types

type AppRegistry

type AppRegistry struct {
	Service            app.Service
	SandboxProvisioner AppSandboxProvisioner
	Stripe             appstripe.Service
	CustomInvoicing    appcustominvoicing.Service
}

func NewAppRegistry

func NewAppRegistry(
	Service app.Service,
	SandboxProvisioner AppSandboxProvisioner,
	Stripe appstripe.Service,
	CustomInvoicing appcustominvoicing.Service,
) AppRegistry

type AppSandboxProvisioner

type AppSandboxProvisioner func(ctx context.Context, orgID string) error

func NewAppSandboxProvisioner

func NewAppSandboxProvisioner(ctx context.Context, logger *slog.Logger, appsConfig config.AppsConfiguration, appService app.Service, namespaceManager *namespace.Manager, billingRegistry BillingRegistry, _ *appsandbox.Factory,
) (AppSandboxProvisioner, error)

type BalanceWorkerEntitlementRepo

type BalanceWorkerEntitlementRepo interface {
	entitlement.EntitlementRepo
	balanceworker.BalanceWorkerRepository
}

func NewBalanceWorkerEntitlementRepo

func NewBalanceWorkerEntitlementRepo(db *db.Client) BalanceWorkerEntitlementRepo

type BillingRegistry

type BillingRegistry struct {
	Billing billing.Service
	Charges *ChargesRegistry
}

BillingRegistry bundles the billing and charges services. External callers that need billing or charges should depend on BillingRegistry rather than individual services.

func NewBillingRegistry

func NewBillingRegistry(
	logger *slog.Logger,
	appService app.Service,
	billingAdapter billing.Adapter,
	billingRatingService rating.Service,
	customerService customer.Service,
	featureConnector feature.FeatureConnector,
	meterService meter.Service,
	streamingConnector streaming.Connector,
	eventPublisher eventbus.Publisher,
	billingConfig config.BillingConfiguration,
	subscriptionServices SubscriptionServiceWithWorkflow,
	db *entdb.Client,
	fsConfig config.BillingFeatureSwitchesConfiguration,
	creditsConfig config.CreditsConfiguration,
	tracer trace.Tracer,
	taxCodeService taxcode.Service,
	locker *lockr.Locker,
	ledgerService ledger.Ledger,
	balanceQuerier ledger.BalanceQuerier,
	accountResolver ledger.AccountResolver,
	accountService ledgeraccount.Service,
) (BillingRegistry, error)

NewBillingRegistry assembles the billing and optional charges services.

func (BillingRegistry) ChargesServiceOrNil

func (r BillingRegistry) ChargesServiceOrNil() charges.Service

type ChargesRegistry

type ChargesRegistry struct {
	Service               charges.Service
	FlatFeeService        flatfee.Service
	UsageBasedService     usagebased.Service
	CreditPurchaseService creditpurchase.Service
	RecognizerService     recognizer.Service
}

ChargesRegistry groups all charge-type services.

type CustomerEntitlementValidatorHook

type CustomerEntitlementValidatorHook customerservicehooks.EntitlementValidatorHook

func NewCustomerEntitlementValidatorServiceHook

func NewCustomerEntitlementValidatorServiceHook(
	logger *slog.Logger,
	entitlementRegistry *registry.Entitlement,
	customerService customer.Service,
) (CustomerEntitlementValidatorHook, error)

type CustomerLedgerHook

type CustomerLedgerHook ledgerresolvers.CustomerLedgerHook

func NewCustomerLedgerServiceHook

func NewCustomerLedgerServiceHook(
	creditsConfig config.CreditsConfiguration,
	tracer trace.Tracer,
	accountResolver customerLedgerProvisioner,
	customerService customer.Service,
) (CustomerLedgerHook, error)

type CustomerSubjectHook

type CustomerSubjectHook customerservicehooks.SubjectCustomerHook

func NewCustomerSubjectServiceHook

func NewCustomerSubjectServiceHook(
	config config.CustomerConfiguration,
	logger *slog.Logger,
	tracer trace.Tracer,
	subjectService subject.Service,
	customerService customer.Service,
	customerOverrideService billing.CustomerOverrideService,
) (CustomerSubjectHook, error)

type FFXConfigContextMiddleware

type FFXConfigContextMiddleware api.MiddlewareFunc

func NewFFXConfigContextMiddleware

func NewFFXConfigContextMiddleware(
	subsConfig config.SubscriptionConfiguration,
	namespaceDriver namespacedriver.NamespaceDecoder,
	logger *slog.Logger,
) FFXConfigContextMiddleware

NewFFXConfigContextMiddleware creates a middleware hook that sets the feature flag access context on the request context. This hook MUST register after any session authentication step so user namespaces are available.

type GlobalInitializer

type GlobalInitializer struct {
	Logger            *slog.Logger
	MeterProvider     metric.MeterProvider
	TracerProvider    trace.TracerProvider
	TextMapPropagator propagation.TextMapPropagator
}

GlobalInitializer initializes global variables for the application.

func (*GlobalInitializer) SetGlobals

func (i *GlobalInitializer) SetGlobals()

SetGlobals initializes global variables for the application.

It is intended to be embedded into application structs and be called from func main.

type IngestNotificationHandler

type IngestNotificationHandler flushhandler.FlushEventHandler

func NewIngestNotificationHandler

func NewIngestNotificationHandler(
	sinkConfig config.SinkConfiguration,
	eventPublisher eventbus.Publisher,
	meter metric.Meter,
	logger *slog.Logger,
) (IngestNotificationHandler, error)

type LevelHandler

type LevelHandler struct {
	// contains filtered or unexported fields
}

LevelHandler is a slog.Handler that filters log records based on the log level.

func NewLevelHandler

func NewLevelHandler(handler slog.Handler, level slog.Leveler) *LevelHandler

NewLevelHandler returns a new LevelHandler.

func (*LevelHandler) Enabled

func (h *LevelHandler) Enabled(ctx context.Context, level slog.Level) bool

func (*LevelHandler) Handle

func (h *LevelHandler) Handle(ctx context.Context, record slog.Record) error

func (*LevelHandler) WithAttrs

func (h *LevelHandler) WithAttrs(attrs []slog.Attr) slog.Handler

func (*LevelHandler) WithGroup

func (h *LevelHandler) WithGroup(name string) slog.Handler

type Metadata

type Metadata struct {
	ServiceName       string
	Version           string
	Environment       string
	OpenTelemetryName string

	AdditionalAttributes []attribute.KeyValue
}

Metadata provides information about the service to components that need it (eg. telemetry).

func NewMetadata

func NewMetadata(conf config.Configuration, version string, serviceName string, additionalAttributes ...attribute.KeyValue) Metadata

type MeterConfigInitializer

type MeterConfigInitializer = func(ctx context.Context) error

func NewMeterConfigInitializer

func NewMeterConfigInitializer(
	logger *slog.Logger,
	configMeters []*meter.Meter,
	meterManagerService meter.ManageService,
	namespaceManager *namespace.Manager,
) MeterConfigInitializer

type Migrator

type Migrator struct {
	Config config.PostgresConfig
	Client *db.Client
	Logger *slog.Logger
}

Migrator executes database migrations.

func (Migrator) Migrate

func (m Migrator) Migrate(ctx context.Context) error

type Runner

type Runner struct {
	Group  run.Group
	Logger *slog.Logger
}

Runner is a helper struct that runs a group of services.

func (Runner) Run

func (r Runner) Run()

type RuntimeMetricsCollector

type RuntimeMetricsCollector struct{}

func NewRuntimeMetricsCollector

func NewRuntimeMetricsCollector(
	meterProvider metric.MeterProvider,
	logger *slog.Logger,
) (RuntimeMetricsCollector, error)

type SinkKafkaConsumer

type SinkKafkaConsumer = kafka.Consumer

type SubscriptionServiceWithWorkflow

type SubscriptionServiceWithWorkflow struct {
	Service                  subscription.Service
	WorkflowService          subscriptionworkflow.Service
	PlanSubscriptionService  plansubscription.PlanSubscriptionService
	SubscriptionAddonService subscriptionaddon.Service
}

TODO: break up to multiple initializers

func NewSubscriptionServices

func NewSubscriptionServices(
	logger *slog.Logger,
	db *entdb.Client,
	featureConnector feature.FeatureConnector,
	entitlementRegistry *registry.Entitlement,
	customerService customer.Service,
	planService plan.Service,
	planAddonService planaddon.Service,
	addonService addon.Service,
	eventPublisher eventbus.Publisher,
	lockr *lockr.Locker,
	featureFlags ffx.Service,
	taxCodeService taxcode.Service,
) (SubscriptionServiceWithWorkflow, error)

type TelemetryHandler

type TelemetryHandler http.Handler

func NewTelemetryHandler

func NewTelemetryHandler(
	metricsConf config.MetricsTelemetryConfig,
	healthChecker health.Health,
	logger *slog.Logger,
) TelemetryHandler

type TelemetryMiddlewareHook

type TelemetryMiddlewareHook server.MiddlewareHook

func NewTelemetryRouterHook

func NewTelemetryRouterHook(meterProvider metric.MeterProvider, tracerProvider trace.TracerProvider) TelemetryMiddlewareHook

type TelemetryServer

type TelemetryServer = *http.Server

func NewTelemetryServer

func NewTelemetryServer(conf config.TelemetryConfig, handler TelemetryHandler) (TelemetryServer, func())

type TerminationChecker

type TerminationChecker struct {
	// contains filtered or unexported fields
}

func NewTerminationChecker

func NewTerminationChecker(conf config.TerminationConfig, healthChecker health.Health) (*TerminationChecker, error)

NewTerminationChecker returns a new TerminationChecker and registers the TerminationChecker to the provided health checker.

func (*TerminationChecker) Execute

func (s *TerminationChecker) Execute(_ context.Context) (interface{}, error)

func (*TerminationChecker) IsRunning

func (s *TerminationChecker) IsRunning() bool

func (*TerminationChecker) IsTerminating

func (s *TerminationChecker) IsTerminating() bool

func (*TerminationChecker) Name

func (s *TerminationChecker) Name() string

func (*TerminationChecker) Terminate

func (s *TerminationChecker) Terminate(reason error)

func (*TerminationChecker) WaitForPropagation

func (s *TerminationChecker) WaitForPropagation(ctx context.Context) error

WaitForPropagation blocks for

type TerminationState

type TerminationState = string
const (
	TerminationStateRunning     TerminationState = "running"
	TerminationStateTerminating TerminationState = "terminating"
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL