common

package
v1.0.0-beta.201 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 12, 2025 License: Apache-2.0 Imports: 108 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultShutdownTimeout = 5 * time.Second
)

Variables

View Source
var ClickHouse = wire.NewSet(
	NewClickHouse,
)

We have configs separatly to be able to reuse wires in other projects

View Source
var Framework = wire.NewSet(
	wire.Struct(new(GlobalInitializer), "*"),
	wire.Struct(new(Runner), "*"),
)
View Source
var ProductCatalog = wire.NewSet(
	Feature,
	Plan,
)
View Source
var ProgressManager = wire.NewSet(
	NewProgressManager,
)

TODO: move this back to Watermill NOTE: this is also used by the sink-worker that requires control over how the publisher is closed

View Source
var WatermillRouter = wire.NewSet(
	wire.Struct(new(router.Options), "*"),
)

Functions

func BalanceWorkerGroup

func BalanceWorkerGroup(
	ctx context.Context,
	worker *balanceworker.Worker,
	telemetryServer TelemetryServer,
) run.Group

func BalanceWorkerProvisionTopics

func BalanceWorkerProvisionTopics(conf config.BalanceWorkerConfiguration) []pkgkafka.TopicConfig

func BalanceWorkerSubjectResolver

func BalanceWorkerSubjectResolver() balanceworker.SubjectResolver

func BalanceWorkerSubscriber

func BalanceWorkerSubscriber(conf config.BalanceWorkerConfiguration, brokerOptions watermillkafka.BrokerOptions) (message.Subscriber, error)

no closer function: the subscriber is closed by the router/worker

func BillingAdapter

func BillingAdapter(
	logger *slog.Logger,
	db *entdb.Client,
) (billing.Adapter, error)

func BillingService

func BillingService(
	logger *slog.Logger,
	db *entdb.Client,
	appService app.Service,
	billingAdapter billing.Adapter,
	billingConfig config.BillingConfiguration,
	customerService customer.Service,
	featureConnector feature.FeatureConnector,
	meterService meter.Service,
	streamingConnector streaming.Connector,
	eventPublisher eventbus.Publisher,
	advancementStrategy billing.AdvancementStrategy,
) (billing.Service, error)

func BillingSubscriptionValidator

func BillingSubscriptionValidator(
	billingService billing.Service,
	billingConfig config.BillingConfiguration,
) (*billingsubscription.Validator, error)

func BillingWorkerGroup

func BillingWorkerGroup(
	ctx context.Context,
	worker *billingworker.Worker,
	telemetryServer TelemetryServer,
) run.Group

func BillingWorkerProvisionTopics

func BillingWorkerProvisionTopics(conf config.BillingConfiguration) []pkgkafka.TopicConfig

func BillingWorkerSubscriber

func BillingWorkerSubscriber(conf config.BillingConfiguration, brokerOptions watermillkafka.BrokerOptions) (message.Subscriber, error)

no closer function: the subscriber is closed by the router/worker

func NewAppService

func NewAppService(logger *slog.Logger, db *entdb.Client, appsConfig config.AppsConfiguration) (app.Service, error)

func NewAppStripeService

func NewAppStripeService(logger *slog.Logger, db *entdb.Client, appsConfig config.AppsConfiguration, appService app.Service, customerService customer.Service, secretService secret.Service, billingService billing.Service) (appstripe.Service, error)

func NewBalanceWorker

func NewBalanceWorker(workerOptions balanceworker.WorkerOptions) (*balanceworker.Worker, error)

func NewBalanceWorkerOptions

func NewBalanceWorkerOptions(
	eventConfig config.EventsConfiguration,
	routerOptions router.Options,
	eventBus eventbus.Publisher,
	entitlements *registry.Entitlement,
	repo balanceworker.BalanceWorkerRepository,
	subjectResolver balanceworker.SubjectResolver,
	logger *slog.Logger,
) balanceworker.WorkerOptions

func NewBillingAutoAdvancer

func NewBillingAutoAdvancer(logger *slog.Logger, service billing.Service) (*billingworkerautoadvance.AutoAdvancer, error)

func NewBillingCollector

func NewBillingCollector(logger *slog.Logger, service billing.Service) (*billingworkercollect.InvoiceCollector, error)

func NewBillingWorker

func NewBillingWorker(workerOptions billingworker.WorkerOptions) (*billingworker.Worker, error)

func NewBillingWorkerOptions

func NewBillingWorkerOptions(
	eventConfig config.EventsConfiguration,
	routerOptions router.Options,
	eventBus eventbus.Publisher,
	billingService billing.Service,
	billingAdapter billing.Adapter,
	subscriptionServices SubscriptionServiceWithWorkflow,
	logger *slog.Logger,
) billingworker.WorkerOptions

func NewBrokerConfiguration

func NewBrokerConfiguration(
	kafkaConfig config.KafkaConfiguration,
	appMetadata Metadata,
	logger *slog.Logger,
	meter metric.Meter,
) watermillkafka.BrokerOptions

func NewClickHouse

TODO: add closer function?

func NewCustomerService

func NewCustomerService(logger *slog.Logger, db *entdb.Client, entRegistry *registry.Entitlement) (customer.Service, error)

func NewDB

func NewDB(driver *pgdriver.Driver) *sql.DB

TODO: add closer function?

func NewDefaultTextMapPropagator

func NewDefaultTextMapPropagator() propagation.TextMapPropagator

func NewEntClient

func NewEntClient(driver *entdriver.EntPostgresDriver) *db.Client

TODO: add closer function?

func NewEntPostgresDriver

func NewEntPostgresDriver(db *sql.DB, logger *slog.Logger) (*entdriver.EntPostgresDriver, func())

func NewEntitlementRegistry

func NewEntitlementRegistry(
	logger *slog.Logger,
	db *entdb.Client,
	tracer trace.Tracer,
	entitlementConfig config.EntitlementsConfiguration,
	streamingConnector streaming.Connector,
	meterService meter.Service,
	eventPublisher eventbus.Publisher,
) *registry.Entitlement

func NewEventBusPublisher

func NewEventBusPublisher(
	publisher message.Publisher,
	conf config.EventsConfiguration,
	logger *slog.Logger,
) (eventbus.Publisher, error)

func NewFeatureConnector

func NewFeatureConnector(logger *slog.Logger, db *entdb.Client, meterService meter.Service) feature.FeatureConnector

func NewFlushHandler

func NewFlushHandler(
	eventsConfig config.EventsConfiguration,
	sinkConfig config.SinkConfiguration,
	messagePublisher message.Publisher,
	eventPublisher eventbus.Publisher,
	logger *slog.Logger,
	meter metric.Meter,
) (flushhandler.FlushEventHandler, error)

func NewHealthChecker

func NewHealthChecker(logger *slog.Logger) health.Health

func NewIngestCollector

func NewIngestCollector(
	conf config.Configuration,
	kafkaCollector *kafkaingest.Collector,
	logger *slog.Logger,
	meter metric.Meter,
	tracer trace.Tracer,
) (ingest.Collector, func(), error)

func NewKafkaAdminClient

func NewKafkaAdminClient(conf config.KafkaConfiguration) (*kafka.AdminClient, error)

func NewKafkaIngestCollector

func NewKafkaIngestCollector(
	config config.KafkaIngestConfiguration,
	producer *kafka.Producer,
	topicResolver topicresolver.Resolver,
	topicProvisioner pkgkafka.TopicProvisioner,
	logger *slog.Logger,
	tracer trace.Tracer,
) (*kafkaingest.Collector, error)

func NewKafkaMetrics

func NewKafkaMetrics(meter metric.Meter) (*kafkametrics.Metrics, error)

func NewKafkaNamespaceHandler

func NewKafkaNamespaceHandler(
	topicResolver topicresolver.Resolver,
	topicProvisioner pkgkafka.TopicProvisioner,
	conf config.KafkaIngestConfiguration,
) (*kafkaingest.NamespaceHandler, error)

func NewKafkaProducer

func NewKafkaProducer(conf config.KafkaIngestConfiguration, logger *slog.Logger) (*kafka.Producer, error)

TODO: use ingest config directly? TODO: use kafka config directly? TODO: add closer function?

func NewKafkaTopicProvisioner

func NewKafkaTopicProvisioner(conf pkgkafka.TopicProvisionerConfig) (pkgkafka.TopicProvisioner, error)

TODO: do we need a separate constructor for the sake of a custom error message?

func NewKafkaTopicProvisionerConfig

func NewKafkaTopicProvisionerConfig(
	adminClient *kafka.AdminClient,
	logger *slog.Logger,
	meter metric.Meter,
	settings config.TopicProvisionerConfig,
) pkgkafka.TopicProvisionerConfig

TODO: fill struct fields automatically?

func NewLogger

func NewLogger(conf config.LogTelemetryConfig, res *resource.Resource, loggerProvider log.LoggerProvider, metadata Metadata) *slog.Logger

func NewLoggerProvider

func NewLoggerProvider(ctx context.Context, conf config.LogTelemetryConfig, res *resource.Resource) (*sdklog.LoggerProvider, func(), error)

func NewMeter

func NewMeter(meterProvider metric.MeterProvider, metadata Metadata) metric.Meter

func NewMeterEventService

func NewMeterEventService(
	streamingConnector streaming.Connector,
	meterService meter.Service,
) meterevent.Service

func NewMeterManageService

func NewMeterManageService(
	ctx context.Context,
	db *entdb.Client,
	logger *slog.Logger,
	entitlementRegistry *registry.Entitlement,
	namespaceManager *namespace.Manager,
	streamingConnector streaming.Connector,
) (meter.ManageService, error)

func NewMeterProvider

func NewMeterProvider(ctx context.Context, conf config.MetricsTelemetryConfig, res *resource.Resource, logger *slog.Logger) (*sdkmetric.MeterProvider, func(), error)

func NewMeterService

func NewMeterService(
	logger *slog.Logger,
	db *entdb.Client,
) (meter.Service, error)

func NewNamespaceHandlers

func NewNamespaceHandlers(
	kafkaHandler *kafkaingest.NamespaceHandler,
	clickHouseHandler streaming.Connector,
) []namespace.Handler

func NewNamespaceManager

func NewNamespaceManager(
	handlers []namespace.Handler,
	conf config.NamespaceConfiguration,
) (*namespace.Manager, error)

func NewNotificationService

func NewNotificationService(
	logger *slog.Logger,
	db *entdb.Client,
	notificationConfig config.NotificationConfiguration,
	svixConfig config.SvixConfig,
	featureConnector feature.FeatureConnector,
) (notification.Service, error)

func NewPlanService

func NewPlanService(
	logger *slog.Logger,
	db *entdb.Client,
	productCatalogConf config.ProductCatalogConfiguration,
	featureConnector feature.FeatureConnector,
) (plan.Service, error)

func NewPortalService

func NewPortalService(conf config.PortalConfiguration) (portal.Service, error)

func NewPostgresDriver

func NewPostgresDriver(
	ctx context.Context,
	conf config.PostgresConfig,
	meterProvider metric.MeterProvider,
	meter metric.Meter,
	tracerProvider trace.TracerProvider,
	logger *slog.Logger,
) (*pgdriver.Driver, func(), error)

func NewProgressManager

func NewProgressManager(logger *slog.Logger, conf config.ProgressManagerConfiguration) (progressmanager.Service, error)

NewProgressManager creates a new progress manager service.

func NewPublisher

func NewPublisher(
	ctx context.Context,
	options watermillkafka.PublisherOptions,
	logger *slog.Logger,
) (message.Publisher, func(), error)

NOTE: this is also used by the sink-worker that requires control over how the publisher is closed

func NewServerPublisher

func NewServerPublisher(
	ctx context.Context,
	options watermillkafka.PublisherOptions,
	logger *slog.Logger,
) (message.Publisher, func(), error)

func NewSinkWorkerPublisher

func NewSinkWorkerPublisher(
	ctx context.Context,
	options watermillkafka.PublisherOptions,
	logger *slog.Logger,
) (message.Publisher, func(), error)

the sink-worker requires control over how the publisher is closed

func NewStreamingConnector

func NewStreamingConnector(
	ctx context.Context,
	conf config.AggregationConfiguration,
	clickHouse clickhouse.Conn,
	logger *slog.Logger,
	progressmanager progressmanager.Service,
) (streaming.Connector, error)

func NewTelemetryResource

func NewTelemetryResource(metadata Metadata) *resource.Resource

func NewTelemetryRouterHook

func NewTelemetryRouterHook(meterProvider metric.MeterProvider, tracerProvider trace.TracerProvider) func(chi.Router)

func NewTerminationCheckerActor

func NewTerminationCheckerActor(r *TerminationChecker, logger *slog.Logger) (execute func() error, interrupt func(error), err error)

NewTerminationCheckerActor returns actor functions (execute, interrupt) which can be passed to run.Add() to register it. Its state is set to TerminationStateStopping as soon as interrupt function is called.

func NewTracer

func NewTracer(tracerProvider trace.TracerProvider, metadata Metadata) trace.Tracer

func NewTracerProvider

func NewTracerProvider(ctx context.Context, conf config.TraceTelemetryConfig, res *resource.Resource, logger *slog.Logger) (*sdktrace.TracerProvider, func(), error)

func NewUnsafeSecretService

func NewUnsafeSecretService(logger *slog.Logger, db *entdb.Client) (*secretservice.Service, error)

func NotificationServiceProvisionTopics

func NotificationServiceProvisionTopics(conf config.NotificationConfiguration) []pkgkafka.TopicConfig

func ServerProvisionTopics

func ServerProvisionTopics(conf config.EventsConfiguration) []pkgkafka.TopicConfig

func SinkWorkerProvisionTopics

func SinkWorkerProvisionTopics(conf config.EventsConfiguration) []pkgkafka.TopicConfig

Types

type AppSandboxProvisioner

type AppSandboxProvisioner func() error

func NewAppSandboxProvisioner

func NewAppSandboxProvisioner(ctx context.Context, logger *slog.Logger, appsConfig config.AppsConfiguration, appService app.Service, namespaceManager *namespace.Manager, billingService billing.Service) (AppSandboxProvisioner, error)

type BalanceWorkerEntitlementRepo

type BalanceWorkerEntitlementRepo interface {
	entitlement.EntitlementRepo
	balanceworker.BalanceWorkerRepository
}

func NewBalanceWorkerEntitlementRepo

func NewBalanceWorkerEntitlementRepo(db *db.Client) BalanceWorkerEntitlementRepo

type GlobalInitializer

type GlobalInitializer struct {
	Logger            *slog.Logger
	MeterProvider     metric.MeterProvider
	TracerProvider    trace.TracerProvider
	TextMapPropagator propagation.TextMapPropagator
}

GlobalInitializer initializes global variables for the application.

func (*GlobalInitializer) SetGlobals

func (i *GlobalInitializer) SetGlobals()

SetGlobals initializes global variables for the application.

It is intended to be embedded into application structs and be called from func main.

type LevelHandler

type LevelHandler struct {
	// contains filtered or unexported fields
}

LevelHandler is a slog.Handler that filters log records based on the log level.

func NewLevelHandler

func NewLevelHandler(handler slog.Handler, level slog.Leveler) *LevelHandler

NewLevelHandler returns a new LevelHandler.

func (*LevelHandler) Enabled

func (h *LevelHandler) Enabled(ctx context.Context, level slog.Level) bool

func (*LevelHandler) Handle

func (h *LevelHandler) Handle(ctx context.Context, record slog.Record) error

func (*LevelHandler) WithAttrs

func (h *LevelHandler) WithAttrs(attrs []slog.Attr) slog.Handler

func (*LevelHandler) WithGroup

func (h *LevelHandler) WithGroup(name string) slog.Handler

type Metadata

type Metadata struct {
	ServiceName       string
	Version           string
	Environment       string
	OpenTelemetryName string
}

Metadata provides information about the service to components that need it (eg. telemetry).

func NewMetadata

func NewMetadata(conf config.Configuration, version string, serviceName string) Metadata

type MeterConfigInitializer

type MeterConfigInitializer = func(ctx context.Context) error

func NewMeterConfigInitializer

func NewMeterConfigInitializer(
	logger *slog.Logger,
	configMeters []*meter.Meter,
	meterManagerService meter.ManageService,
	namespaceManager *namespace.Manager,
) MeterConfigInitializer

type Migrator

type Migrator struct {
	Config config.PostgresConfig
	Client *db.Client
	Logger *slog.Logger
}

Migrator executes database migrations.

func (Migrator) Migrate

func (m Migrator) Migrate(ctx context.Context) error

type Runner

type Runner struct {
	Group  run.Group
	Logger *slog.Logger
}

Runner is a helper struct that runs a group of services.

func (Runner) Run

func (r Runner) Run()

type RuntimeMetricsCollector

type RuntimeMetricsCollector struct {
	// contains filtered or unexported fields
}

func NewRuntimeMetricsCollector

func NewRuntimeMetricsCollector(
	meterProvider metric.MeterProvider,
	conf config.TelemetryConfig,
	logger *slog.Logger,
) (RuntimeMetricsCollector, error)

func (RuntimeMetricsCollector) Start

func (c RuntimeMetricsCollector) Start() error

type SubscriptionServiceWithWorkflow

type SubscriptionServiceWithWorkflow struct {
	Service                 subscription.Service
	WorkflowService         subscription.WorkflowService
	PlanSubscriptionService plansubscription.PlanSubscriptionService
}

Combine Srvice and WorkflowService into one struct We do this to able to initialize the Service and WorkflowService together and share the same subscriptionRepo.

func NewSubscriptionServices

func NewSubscriptionServices(
	logger *slog.Logger,
	db *entdb.Client,
	productcatalogConfig config.ProductCatalogConfiguration,
	entitlementConfig config.EntitlementsConfiguration,
	featureConnector feature.FeatureConnector,
	entitlementRegistry *registry.Entitlement,
	customerService customer.Service,
	planService plan.Service,
	eventPublisher eventbus.Publisher,
	billingSubscriptionValidator *billingsubscription.Validator,
) (SubscriptionServiceWithWorkflow, error)

type TelemetryHandler

type TelemetryHandler http.Handler

func NewTelemetryHandler

func NewTelemetryHandler(
	metricsConf config.MetricsTelemetryConfig,
	healthChecker health.Health,
	runtimeMetricsCollector RuntimeMetricsCollector,
	logger *slog.Logger,
) TelemetryHandler

type TelemetryServer

type TelemetryServer = *http.Server

func NewTelemetryServer

func NewTelemetryServer(conf config.TelemetryConfig, handler TelemetryHandler) (TelemetryServer, func())

type TerminationChecker

type TerminationChecker struct {
	// contains filtered or unexported fields
}

func NewTerminationChecker

func NewTerminationChecker(conf config.TerminationConfig, healthChecker health.Health) (*TerminationChecker, error)

NewTerminationChecker returns a new TerminationChecker and registers the TerminationChecker to the provided health checker.

func (*TerminationChecker) Execute

func (s *TerminationChecker) Execute(_ context.Context) (interface{}, error)

func (*TerminationChecker) IsRunning

func (s *TerminationChecker) IsRunning() bool

func (*TerminationChecker) IsTerminating

func (s *TerminationChecker) IsTerminating() bool

func (*TerminationChecker) Name

func (s *TerminationChecker) Name() string

func (*TerminationChecker) Terminate

func (s *TerminationChecker) Terminate(reason error)

func (*TerminationChecker) WaitForPropagation

func (s *TerminationChecker) WaitForPropagation(ctx context.Context) error

WaitForPropagation blocks for

type TerminationState

type TerminationState = string
const (
	TerminationStateRunning     TerminationState = "running"
	TerminationStateTerminating TerminationState = "terminating"
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL