common

package
v1.0.0-beta.190 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 12, 2025 License: Apache-2.0 Imports: 100 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultShutdownTimeout = 5 * time.Second
)

Variables

View Source
var ClickHouse = wire.NewSet(
	NewClickHouse,
)

We have configs separatly to be able to reuse wires in other projects

View Source
var Framework = wire.NewSet(
	wire.Struct(new(GlobalInitializer), "*"),
	wire.Struct(new(Runner), "*"),
)
View Source
var ProductCatalog = wire.NewSet(
	Feature,
	Plan,
)

TODO: move this back to Watermill NOTE: this is also used by the sink-worker that requires control over how the publisher is closed

View Source
var WatermillRouter = wire.NewSet(
	wire.Struct(new(router.Options), "*"),
)

Functions

func BalanceWorkerGroup

func BalanceWorkerGroup(
	ctx context.Context,
	worker *balanceworker.Worker,
	telemetryServer TelemetryServer,
) run.Group

func BalanceWorkerProvisionTopics

func BalanceWorkerProvisionTopics(conf config.BalanceWorkerConfiguration) []pkgkafka.TopicConfig

func BalanceWorkerSubjectResolver

func BalanceWorkerSubjectResolver() balanceworker.SubjectResolver

func BalanceWorkerSubscriber

func BalanceWorkerSubscriber(conf config.BalanceWorkerConfiguration, brokerOptions watermillkafka.BrokerOptions) (message.Subscriber, error)

no closer function: the subscriber is closed by the router/worker

func BillingAdapter

func BillingAdapter(
	logger *slog.Logger,
	db *entdb.Client,
) (billing.Adapter, error)

func BillingService

func BillingService(
	logger *slog.Logger,
	db *entdb.Client,
	appService app.Service,
	billingAdapter billing.Adapter,
	billingConfig config.BillingConfiguration,
	customerService customer.Service,
	featureConnector feature.FeatureConnector,
	meterRepo meter.Repository,
	streamingConnector streaming.Connector,
	eventPublisher eventbus.Publisher,
	advancementStrategy billing.AdvancementStrategy,
) (billing.Service, error)

func BillingSubscriptionValidator

func BillingSubscriptionValidator(
	billingService billing.Service,
	billingConfig config.BillingConfiguration,
) (*billingsubscription.Validator, error)

func BillingWorkerGroup

func BillingWorkerGroup(
	ctx context.Context,
	worker *billingworker.Worker,
	telemetryServer TelemetryServer,
) run.Group

func BillingWorkerProvisionTopics

func BillingWorkerProvisionTopics(conf config.BillingConfiguration) []pkgkafka.TopicConfig

func BillingWorkerSubscriber

func BillingWorkerSubscriber(conf config.BillingConfiguration, brokerOptions watermillkafka.BrokerOptions) (message.Subscriber, error)

no closer function: the subscriber is closed by the router/worker

func NewAppService

func NewAppService(logger *slog.Logger, db *entdb.Client, appsConfig config.AppsConfiguration) (app.Service, error)

func NewAppStripeService

func NewAppStripeService(logger *slog.Logger, db *entdb.Client, appsConfig config.AppsConfiguration, appService app.Service, customerService customer.Service, secretService secret.Service, billingService billing.Service) (appstripe.Service, error)

func NewBalanceWorker

func NewBalanceWorker(workerOptions balanceworker.WorkerOptions) (*balanceworker.Worker, error)

func NewBalanceWorkerOptions

func NewBalanceWorkerOptions(
	eventConfig config.EventsConfiguration,
	routerOptions router.Options,
	eventBus eventbus.Publisher,
	entitlements *registry.Entitlement,
	repo balanceworker.BalanceWorkerRepository,
	subjectResolver balanceworker.SubjectResolver,
	logger *slog.Logger,
) balanceworker.WorkerOptions

func NewBillingAutoAdvancer

func NewBillingAutoAdvancer(logger *slog.Logger, service billing.Service) (*billingworkerautoadvance.AutoAdvancer, error)

func NewBillingCollector

func NewBillingCollector(logger *slog.Logger, service billing.Service) (*billingworkercollect.InvoiceCollector, error)

func NewBillingWorker

func NewBillingWorker(workerOptions billingworker.WorkerOptions) (*billingworker.Worker, error)

func NewBillingWorkerOptions

func NewBillingWorkerOptions(
	eventConfig config.EventsConfiguration,
	routerOptions router.Options,
	eventBus eventbus.Publisher,
	billingService billing.Service,
	billingAdapter billing.Adapter,
	subscriptionServices SubscriptionServiceWithWorkflow,
	logger *slog.Logger,
) billingworker.WorkerOptions

func NewBrokerConfiguration

func NewBrokerConfiguration(
	kafkaConfig config.KafkaConfiguration,
	appMetadata Metadata,
	logger *slog.Logger,
	meter metric.Meter,
) watermillkafka.BrokerOptions

func NewClickHouse

TODO: add closer function?

func NewCustomerService

func NewCustomerService(logger *slog.Logger, db *entdb.Client, entRegistry *registry.Entitlement) (customer.Service, error)

func NewDB

func NewDB(driver *pgdriver.Driver) *sql.DB

TODO: add closer function?

func NewDefaultTextMapPropagator

func NewDefaultTextMapPropagator() propagation.TextMapPropagator

func NewEntClient

func NewEntClient(driver *entdriver.EntPostgresDriver) *db.Client

TODO: add closer function?

func NewEntPostgresDriver

func NewEntPostgresDriver(db *sql.DB, logger *slog.Logger) (*entdriver.EntPostgresDriver, func())

func NewEntitlementRegistry

func NewEntitlementRegistry(
	logger *slog.Logger,
	db *entdb.Client,
	entitlementConfig config.EntitlementsConfiguration,
	streamingConnector streaming.Connector,
	meterRepository meter.Repository,
	eventPublisher eventbus.Publisher,
) *registry.Entitlement

func NewEventBusPublisher

func NewEventBusPublisher(
	publisher message.Publisher,
	conf config.EventsConfiguration,
	logger *slog.Logger,
) (eventbus.Publisher, error)

func NewFeatureConnector

func NewFeatureConnector(logger *slog.Logger, db *entdb.Client, meterRepo meter.Repository) feature.FeatureConnector

func NewFlushHandler

func NewFlushHandler(
	eventsConfig config.EventsConfiguration,
	sinkConfig config.SinkConfiguration,
	messagePublisher message.Publisher,
	eventPublisher eventbus.Publisher,
	logger *slog.Logger,
	meter metric.Meter,
) (flushhandler.FlushEventHandler, error)

func NewHealthChecker

func NewHealthChecker(logger *slog.Logger) health.Health

func NewInMemoryRepository

func NewInMemoryRepository(meters []*models.Meter) *meter.InMemoryRepository

func NewIngestCollector

func NewIngestCollector(
	conf config.Configuration,
	kafkaCollector *kafkaingest.Collector,
	logger *slog.Logger,
	meter metric.Meter,
) (ingest.Collector, func(), error)

func NewKafkaAdminClient

func NewKafkaAdminClient(conf config.KafkaConfiguration) (*kafka.AdminClient, error)

func NewKafkaIngestCollector

func NewKafkaIngestCollector(
	config config.KafkaIngestConfiguration,
	producer *kafka.Producer,
	topicResolver topicresolver.Resolver,
	topicProvisioner pkgkafka.TopicProvisioner,
) (*kafkaingest.Collector, error)

func NewKafkaMetrics

func NewKafkaMetrics(meter metric.Meter) (*kafkametrics.Metrics, error)

func NewKafkaNamespaceHandler

func NewKafkaNamespaceHandler(
	topicResolver topicresolver.Resolver,
	topicProvisioner pkgkafka.TopicProvisioner,
	conf config.KafkaIngestConfiguration,
) (*kafkaingest.NamespaceHandler, error)

func NewKafkaProducer

func NewKafkaProducer(conf config.KafkaIngestConfiguration, logger *slog.Logger) (*kafka.Producer, error)

TODO: use ingest config directly? TODO: use kafka config directly? TODO: add closer function?

func NewKafkaTopicProvisioner

func NewKafkaTopicProvisioner(conf pkgkafka.TopicProvisionerConfig) (pkgkafka.TopicProvisioner, error)

TODO: do we need a separate constructor for the sake of a custom error message?

func NewKafkaTopicProvisionerConfig

func NewKafkaTopicProvisionerConfig(
	adminClient *kafka.AdminClient,
	logger *slog.Logger,
	meter metric.Meter,
	settings config.TopicProvisionerConfig,
) pkgkafka.TopicProvisionerConfig

TODO: fill struct fields automatically?

func NewLogger

func NewLogger(conf config.LogTelemetryConfig, res *resource.Resource, loggerProvider log.LoggerProvider, metadata Metadata) *slog.Logger

func NewLoggerProvider

func NewLoggerProvider(ctx context.Context, conf config.LogTelemetryConfig, res *resource.Resource) (*sdklog.LoggerProvider, func(), error)

func NewMeter

func NewMeter(meterProvider metric.MeterProvider, metadata Metadata) metric.Meter

func NewMeterProvider

func NewMeterProvider(ctx context.Context, conf config.MetricsTelemetryConfig, res *resource.Resource, logger *slog.Logger) (*sdkmetric.MeterProvider, func(), error)

func NewNamespaceHandlers

func NewNamespaceHandlers(
	kafkaHandler *kafkaingest.NamespaceHandler,
	clickHouseHandler streaming.Connector,
) []namespace.Handler

func NewNamespaceManager

func NewNamespaceManager(
	handlers []namespace.Handler,
	conf config.NamespaceConfiguration,
) (*namespace.Manager, error)

func NewNotificationService

func NewNotificationService(
	logger *slog.Logger,
	db *entdb.Client,
	notificationConfig config.NotificationConfiguration,
	svixConfig config.SvixConfig,
	featureConnector feature.FeatureConnector,
) (notification.Service, error)

func NewPlanService

func NewPlanService(
	logger *slog.Logger,
	db *entdb.Client,
	productCatalogConf config.ProductCatalogConfiguration,
	featureConnector feature.FeatureConnector,
) (plan.Service, error)

func NewPostgresDriver

func NewPostgresDriver(
	ctx context.Context,
	conf config.PostgresConfig,
	meterProvider metric.MeterProvider,
	meter metric.Meter,
	tracerProvider trace.TracerProvider,
	logger *slog.Logger,
) (*pgdriver.Driver, func(), error)

func NewPublisher

func NewPublisher(
	ctx context.Context,
	options watermillkafka.PublisherOptions,
	logger *slog.Logger,
) (message.Publisher, func(), error)

NOTE: this is also used by the sink-worker that requires control over how the publisher is closed

func NewServerPublisher

func NewServerPublisher(
	ctx context.Context,
	options watermillkafka.PublisherOptions,
	logger *slog.Logger,
) (message.Publisher, func(), error)

func NewSinkWorkerPublisher

func NewSinkWorkerPublisher(
	ctx context.Context,
	options watermillkafka.PublisherOptions,
	logger *slog.Logger,
) (message.Publisher, func(), error)

the sink-worker requires control over how the publisher is closed

func NewStreamingConnector

func NewStreamingConnector(
	ctx context.Context,
	conf config.AggregationConfiguration,
	clickHouse clickhouse.Conn,
	meterRepository meter.Repository,
	logger *slog.Logger,
) (streaming.Connector, error)

func NewTelemetryResource

func NewTelemetryResource(metadata Metadata) *resource.Resource

func NewTelemetryRouterHook

func NewTelemetryRouterHook(meterProvider metric.MeterProvider, tracerProvider trace.TracerProvider) func(chi.Router)

func NewTracer

func NewTracer(tracerProvider trace.TracerProvider, metadata Metadata) trace.Tracer

func NewTracerProvider

func NewTracerProvider(ctx context.Context, conf config.TraceTelemetryConfig, res *resource.Resource, logger *slog.Logger) (*sdktrace.TracerProvider, func(), error)

func NewUnsafeSecretService

func NewUnsafeSecretService(logger *slog.Logger, db *entdb.Client) (*secretservice.Service, error)

func NotificationServiceProvisionTopics

func NotificationServiceProvisionTopics(conf config.NotificationConfiguration) []pkgkafka.TopicConfig

func ServerProvisionTopics

func ServerProvisionTopics(conf config.EventsConfiguration) []pkgkafka.TopicConfig

func SinkWorkerProvisionTopics

func SinkWorkerProvisionTopics(conf config.EventsConfiguration) []pkgkafka.TopicConfig

Types

type AppSandboxProvisioner

type AppSandboxProvisioner func() error

func NewAppSandboxProvisioner

func NewAppSandboxProvisioner(ctx context.Context, logger *slog.Logger, appsConfig config.AppsConfiguration, appService app.Service, namespaceManager *namespace.Manager, billingService billing.Service) (AppSandboxProvisioner, error)

type BalanceWorkerEntitlementRepo

type BalanceWorkerEntitlementRepo interface {
	entitlement.EntitlementRepo
	balanceworker.BalanceWorkerRepository
}

func NewBalanceWorkerEntitlementRepo

func NewBalanceWorkerEntitlementRepo(db *db.Client) BalanceWorkerEntitlementRepo

type GlobalInitializer

type GlobalInitializer struct {
	Logger            *slog.Logger
	MeterProvider     metric.MeterProvider
	TracerProvider    trace.TracerProvider
	TextMapPropagator propagation.TextMapPropagator
}

GlobalInitializer initializes global variables for the application.

func (*GlobalInitializer) SetGlobals

func (i *GlobalInitializer) SetGlobals()

SetGlobals initializes global variables for the application.

It is intended to be embedded into application structs and be called from func main.

type LevelHandler

type LevelHandler struct {
	// contains filtered or unexported fields
}

LevelHandler is a slog.Handler that filters log records based on the log level.

func NewLevelHandler

func NewLevelHandler(handler slog.Handler, level slog.Leveler) *LevelHandler

NewLevelHandler returns a new LevelHandler.

func (*LevelHandler) Enabled

func (h *LevelHandler) Enabled(ctx context.Context, level slog.Level) bool

func (*LevelHandler) Handle

func (h *LevelHandler) Handle(ctx context.Context, record slog.Record) error

func (*LevelHandler) WithAttrs

func (h *LevelHandler) WithAttrs(attrs []slog.Attr) slog.Handler

func (*LevelHandler) WithGroup

func (h *LevelHandler) WithGroup(name string) slog.Handler

type Metadata

type Metadata struct {
	ServiceName       string
	Version           string
	Environment       string
	OpenTelemetryName string
}

Metadata provides information about the service to components that need it (eg. telemetry).

func NewMetadata

func NewMetadata(conf config.Configuration, version string, serviceName string) Metadata

type Migrator

type Migrator struct {
	Config config.PostgresConfig
	Client *db.Client
	Logger *slog.Logger
}

Migrator executes database migrations.

func (Migrator) Migrate

func (m Migrator) Migrate(ctx context.Context) error

type Runner

type Runner struct {
	Group  run.Group
	Logger *slog.Logger
}

Runner is a helper struct that runs a group of services.

func (Runner) Run

func (r Runner) Run()

type SubscriptionServiceWithWorkflow

type SubscriptionServiceWithWorkflow struct {
	Service                 subscription.Service
	WorkflowService         subscription.WorkflowService
	PlanSubscriptionService plansubscription.PlanSubscriptionService
}

Combine Srvice and WorkflowService into one struct We do this to able to initialize the Service and WorkflowService together and share the same subscriptionRepo.

func NewSubscriptionServices

func NewSubscriptionServices(
	logger *slog.Logger,
	db *entdb.Client,
	productcatalogConfig config.ProductCatalogConfiguration,
	entitlementConfig config.EntitlementsConfiguration,
	featureConnector feature.FeatureConnector,
	entitlementRegistry *registry.Entitlement,
	customerService customer.Service,
	planService plan.Service,
	eventPublisher eventbus.Publisher,
	billingSubscriptionValidator *billingsubscription.Validator,
) (SubscriptionServiceWithWorkflow, error)

type TelemetryHandler

type TelemetryHandler http.Handler

func NewTelemetryHandler

func NewTelemetryHandler(metricsConf config.MetricsTelemetryConfig, healthChecker health.Health) TelemetryHandler

type TelemetryServer

type TelemetryServer = *http.Server

func NewTelemetryServer

func NewTelemetryServer(conf config.TelemetryConfig, handler TelemetryHandler) (TelemetryServer, func())

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL