workers

package
v0.15.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 3, 2026 License: Apache-2.0 Imports: 40 Imported by: 0

Documentation

Index

Constants

View Source
const (
	UsageSyncCreatedServiceName     = "superplane." + messages.CanvasExchange + "." + messages.OrganizationCreatedRoutingKey + ".worker-consumer"
	UsageSyncPlanChangedServiceName = "superplane." + messages.CanvasExchange + "." + messages.OrganizationPlanChangedRoutingKey + ".worker-consumer"
	UsageSyncConnectionName         = "superplane"
)
View Source
const InvitationEmailConnectionName = "superplane"
View Source
const InvitationEmailServiceName = "superplane" + "." + messages.CanvasExchange + "." + messages.InvitationCreatedRoutingKey + ".worker-consumer"
View Source
const MagicCodeEmailConnectionName = "superplane"
View Source
const MagicCodeEmailServiceName = "superplane" + "." + messages.CanvasExchange + "." + messages.MagicCodeRequestedRoutingKey + ".worker-consumer"
View Source
const NotificationEmailConnectionName = "superplane"
View Source
const NotificationEmailServiceName = "superplane" + "." + messages.CanvasExchange + "." + messages.NotificationEmailRequestedRoutingKey + ".worker-consumer"

Variables

View Source
var ErrRecordLocked = errors.New("record locked")

Functions

This section is empty.

Types

type CanvasCleanupWorker added in v0.6.0

type CanvasCleanupWorker struct {
	// contains filtered or unexported fields
}

func NewCanvasCleanupWorker added in v0.6.0

func NewCanvasCleanupWorker() *CanvasCleanupWorker

func (*CanvasCleanupWorker) LockAndProcessCanvas added in v0.6.0

func (w *CanvasCleanupWorker) LockAndProcessCanvas(canvas models.Canvas) error

func (*CanvasCleanupWorker) Start added in v0.6.0

func (w *CanvasCleanupWorker) Start(ctx context.Context)

type EventDistributer

type EventDistributer struct {
	// contains filtered or unexported fields
}

EventDistributer coordinates message consumption from RabbitMQ and distributes events to websocket clients

func NewEventDistributer

func NewEventDistributer(wsHub *ws.Hub) *EventDistributer

NewEventDistributer creates a new event distributer coordinator

func (*EventDistributer) Shutdown

func (e *EventDistributer) Shutdown(ctx context.Context) error

Shutdown gracefully stops the worker

func (*EventDistributer) Start

func (e *EventDistributer) Start() error

Start begins consuming messages from RabbitMQ for all relevant routing keys

type EventRetentionWorker added in v0.13.0

type EventRetentionWorker struct {
	// contains filtered or unexported fields
}

func NewEventRetentionWorker added in v0.13.0

func NewEventRetentionWorker(usageService usage.Service) *EventRetentionWorker

func (*EventRetentionWorker) LockAndProcessRootEvent added in v0.13.0

func (w *EventRetentionWorker) LockAndProcessRootEvent(rootEvent models.CanvasEvent, referenceTime time.Time) error

func (*EventRetentionWorker) Start added in v0.13.0

func (w *EventRetentionWorker) Start(ctx context.Context)

type EventRouter added in v0.6.0

type EventRouter struct {
	// contains filtered or unexported fields
}

func NewEventRouter added in v0.6.0

func NewEventRouter(rabbitMQURL string) *EventRouter

func (*EventRouter) Consume added in v0.12.0

func (w *EventRouter) Consume(delivery tackle.Delivery) error

func (*EventRouter) LockAndProcessEvent added in v0.6.0

func (w *EventRouter) LockAndProcessEvent(logger *log.Entry, event models.CanvasEvent) error

func (*EventRouter) Name added in v0.12.0

func (w *EventRouter) Name() string

func (*EventRouter) Start added in v0.6.0

func (w *EventRouter) Start(ctx context.Context)

func (*EventRouter) StartRabbitMQConsumer added in v0.12.0

func (w *EventRouter) StartRabbitMQConsumer(ctx context.Context)

type IntegrationCleanupWorker added in v0.6.0

type IntegrationCleanupWorker struct {
	// contains filtered or unexported fields
}

func NewIntegrationCleanupWorker added in v0.6.0

func NewIntegrationCleanupWorker(registry *registry.Registry, encryptor crypto.Encryptor, baseURL string) *IntegrationCleanupWorker

func (*IntegrationCleanupWorker) LockAndProcessIntegration added in v0.6.0

func (w *IntegrationCleanupWorker) LockAndProcessIntegration(integration models.Integration) error

func (*IntegrationCleanupWorker) Start added in v0.6.0

type IntegrationRequestWorker added in v0.6.0

type IntegrationRequestWorker struct {
	// contains filtered or unexported fields
}

func NewIntegrationRequestWorker added in v0.6.0

func NewIntegrationRequestWorker(encryptor crypto.Encryptor, registry *registry.Registry, oidcProvider oidc.Provider, baseURL string, webhooksBaseURL string) *IntegrationRequestWorker

func (*IntegrationRequestWorker) LockAndProcessRequest added in v0.6.0

func (w *IntegrationRequestWorker) LockAndProcessRequest(request models.IntegrationRequest) error

func (*IntegrationRequestWorker) Start added in v0.6.0

type InvitationEmailConsumer

type InvitationEmailConsumer struct {
	Consumer     *tackle.Consumer
	RabbitMQURL  string
	EmailService services.EmailService
	BaseURL      string
}

func NewInvitationEmailConsumer

func NewInvitationEmailConsumer(rabbitMQURL string, emailService services.EmailService, baseURL string) *InvitationEmailConsumer

func (*InvitationEmailConsumer) Consume

func (c *InvitationEmailConsumer) Consume(delivery tackle.Delivery) error

func (*InvitationEmailConsumer) Start

func (c *InvitationEmailConsumer) Start() error

func (*InvitationEmailConsumer) Stop

func (c *InvitationEmailConsumer) Stop()

type MagicCodeEmailConsumer added in v0.14.0

type MagicCodeEmailConsumer struct {
	Consumer     *tackle.Consumer
	RabbitMQURL  string
	EmailService services.EmailService
	BaseURL      string
}

func NewMagicCodeEmailConsumer added in v0.14.0

func NewMagicCodeEmailConsumer(rabbitMQURL string, emailService services.EmailService, baseURL string) *MagicCodeEmailConsumer

func (*MagicCodeEmailConsumer) Consume added in v0.14.0

func (c *MagicCodeEmailConsumer) Consume(delivery tackle.Delivery) error

func (*MagicCodeEmailConsumer) Start added in v0.14.0

func (c *MagicCodeEmailConsumer) Start() error

func (*MagicCodeEmailConsumer) Stop added in v0.14.0

func (c *MagicCodeEmailConsumer) Stop()

type NodeExecutor added in v0.6.0

type NodeExecutor struct {
	// contains filtered or unexported fields
}

func NewNodeExecutor added in v0.6.0

func NewNodeExecutor(encryptor crypto.Encryptor, registry *registry.Registry, baseURL string, webhookBaseURL string, rabbitMQURL string) *NodeExecutor

func (*NodeExecutor) Consume added in v0.12.0

func (w *NodeExecutor) Consume(delivery tackle.Delivery) error

func (*NodeExecutor) LockAndProcessNodeExecution added in v0.6.0

func (w *NodeExecutor) LockAndProcessNodeExecution(id uuid.UUID) error

func (*NodeExecutor) Name added in v0.12.0

func (w *NodeExecutor) Name() string

func (*NodeExecutor) Start added in v0.6.0

func (w *NodeExecutor) Start(ctx context.Context)

func (*NodeExecutor) StartRabbitMQConsumer added in v0.12.0

func (w *NodeExecutor) StartRabbitMQConsumer(ctx context.Context)

type NodeQueueWorker added in v0.6.0

type NodeQueueWorker struct {
	// contains filtered or unexported fields
}

func NewNodeQueueWorker added in v0.6.0

func NewNodeQueueWorker(registry *registry.Registry, rabbitMQURL string) *NodeQueueWorker

func (*NodeQueueWorker) Consume added in v0.12.0

func (w *NodeQueueWorker) Consume(delivery tackle.Delivery) error

func (*NodeQueueWorker) LockAndProcessNode added in v0.6.0

func (w *NodeQueueWorker) LockAndProcessNode(logger *log.Entry, node models.CanvasNode) error

func (*NodeQueueWorker) Name added in v0.12.0

func (w *NodeQueueWorker) Name() string

func (*NodeQueueWorker) Start added in v0.6.0

func (w *NodeQueueWorker) Start(ctx context.Context)

func (*NodeQueueWorker) StartRabbitMQConsumer added in v0.12.0

func (w *NodeQueueWorker) StartRabbitMQConsumer(ctx context.Context)

type NodeRequestWorker

type NodeRequestWorker struct {
	// contains filtered or unexported fields
}

func NewNodeRequestWorker

func NewNodeRequestWorker(encryptor crypto.Encryptor, registry *registry.Registry, webhookBaseURL string) *NodeRequestWorker

func (*NodeRequestWorker) LockAndProcessRequest

func (w *NodeRequestWorker) LockAndProcessRequest(request models.CanvasNodeRequest) error

func (*NodeRequestWorker) Start

func (w *NodeRequestWorker) Start(ctx context.Context)

type NotificationEmailConsumer added in v0.0.43

type NotificationEmailConsumer struct {
	Consumer     *tackle.Consumer
	RabbitMQURL  string
	EmailService services.EmailService
	AuthService  authorization.Authorization
}

func NewNotificationEmailConsumer added in v0.0.43

func NewNotificationEmailConsumer(
	rabbitMQURL string,
	emailService services.EmailService,
	authService authorization.Authorization,
) *NotificationEmailConsumer

func (*NotificationEmailConsumer) Consume added in v0.0.43

func (c *NotificationEmailConsumer) Consume(delivery tackle.Delivery) error

func (*NotificationEmailConsumer) Start added in v0.0.43

func (c *NotificationEmailConsumer) Start() error

func (*NotificationEmailConsumer) Stop added in v0.0.43

func (c *NotificationEmailConsumer) Stop()

type OrganizationCleanupWorker added in v0.13.0

type OrganizationCleanupWorker struct {
	// contains filtered or unexported fields
}

func NewOrganizationCleanupWorker added in v0.13.0

func NewOrganizationCleanupWorker() *OrganizationCleanupWorker

func (*OrganizationCleanupWorker) LockAndProcessOrganization added in v0.13.0

func (w *OrganizationCleanupWorker) LockAndProcessOrganization(organization models.Organization) error

func (*OrganizationCleanupWorker) Start added in v0.13.0

type UsageSyncWorker added in v0.13.0

type UsageSyncWorker struct {
	CreatedConsumer     *tackle.Consumer
	PlanChangedConsumer *tackle.Consumer
	RabbitMQURL         string
	UsageService        usage.Service
}

func NewUsageSyncWorker added in v0.13.0

func NewUsageSyncWorker(rabbitMQURL string, usageService usage.Service) *UsageSyncWorker

func (*UsageSyncWorker) Consume added in v0.13.0

func (w *UsageSyncWorker) Consume(delivery tackle.Delivery) error

func (*UsageSyncWorker) ConsumeOrganizationPlanChanged added in v0.13.0

func (w *UsageSyncWorker) ConsumeOrganizationPlanChanged(delivery tackle.Delivery) error

func (*UsageSyncWorker) Start added in v0.13.0

func (w *UsageSyncWorker) Start(ctx context.Context)

func (*UsageSyncWorker) Stop added in v0.13.0

func (w *UsageSyncWorker) Stop()

type WebhookCleanupWorker

type WebhookCleanupWorker struct {
	// contains filtered or unexported fields
}

func NewWebhookCleanupWorker

func NewWebhookCleanupWorker(encryptor crypto.Encryptor, registry *registry.Registry, baseURL string) *WebhookCleanupWorker

func (*WebhookCleanupWorker) LockAndProcessWebhook

func (w *WebhookCleanupWorker) LockAndProcessWebhook(webhook models.Webhook) error

func (*WebhookCleanupWorker) Start

func (w *WebhookCleanupWorker) Start(ctx context.Context)

type WebhookProvisioner

type WebhookProvisioner struct {
	// contains filtered or unexported fields
}

func NewWebhookProvisioner

func NewWebhookProvisioner(baseURL string, encryptor crypto.Encryptor, registry *registry.Registry) *WebhookProvisioner

func (*WebhookProvisioner) LockAndProcessWebhook

func (w *WebhookProvisioner) LockAndProcessWebhook(webhook models.Webhook) error

LockAndProcessWebhook processes a webhook in 3 phases to avoid holding a DB connection during potentially long-running external API calls:

  • Phase 1 (short tx): Lock the webhook and set state to "provisioning"
  • Phase 2 (no tx): Run the external handler.Setup() call
  • Phase 3 (short tx): Set state to "ready" or handle errors

func (*WebhookProvisioner) Start

func (w *WebhookProvisioner) Start(ctx context.Context)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL