queue

package
v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 14, 2025 License: MIT Imports: 23 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// Job kinds
	JobKindEventProcessing = "event_processing"
	JobKindWebhookDelivery = "webhook_delivery"

	// Queue names
	QueueEventProcessing = "events"
	QueueWebhookDelivery = "webhooks"
	QueueDefault         = river.QueueDefault
)

Variables

This section is empty.

Functions

func NewJobInserter

func NewJobInserter(client *river.Client[pgx.Tx]) *jobInserter

NewJobInserter creates a new JobInserter

Types

type EventArgs

type EventArgs struct {
	EventID    string            `json:"event_id"`
	Namespace  string            `json:"namespace"`
	Event      string            `json:"event"`
	TTLSeconds int64             `json:"ttl_seconds"`
	Metadata   map[string]string `json:"metadata"`
	CreatedAt  time.Time         `json:"created_at"`
}

EventArgs represents an event processing job Contains only essential identifiers - the payload is stored in the database

func (EventArgs) InsertOpts

func (EventArgs) InsertOpts() river.InsertOpts

func (EventArgs) Kind

func (EventArgs) Kind() string

Kind returns the job kind for River queue

type EventProcessingWorker

type EventProcessingWorker struct {
	river.WorkerDefaults[EventArgs]
	// contains filtered or unexported fields
}

EventProcessingWorker processes events and triggers webhook deliveries

func NewEventProcessingWorker

func NewEventProcessingWorker(webhookRepo store.RepositoryInterface, jobInserter JobInserter) *EventProcessingWorker

NewEventProcessingWorker creates a new event processing worker with a river client

func (*EventProcessingWorker) Work

Work processes an event and creates webhook delivery jobs

type JobInserter

type JobInserter interface {
	Insert(ctx context.Context, args river.JobArgs) (*rivertype.JobInsertResult, error)
	BatchInsert(ctx context.Context, args []river.JobArgs) ([]*rivertype.JobInsertResult, error)
}

QueueManagerInterface defines the interface for queue management.

type JobInserterWithTracing

type JobInserterWithTracing struct {
	JobInserter
	// contains filtered or unexported fields
}

JobInserterWithTracing implements JobInserter interface instrumented with open telemetry spans

func NewJobInserterWithTracing

func NewJobInserterWithTracing(base JobInserter, instance string, spanDecorator ...func(span trace.Span, params, results map[string]interface{})) JobInserterWithTracing

NewJobInserterWithTracing returns JobInserterWithTracing

func (JobInserterWithTracing) BatchInsert

func (_d JobInserterWithTracing) BatchInsert(ctx context.Context, args []river.JobArgs) (jpa1 []*rivertype.JobInsertResult, err error)

BatchInsert implements JobInserter

func (JobInserterWithTracing) Insert

Insert implements JobInserter

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager handles the River queue management

func NewManager

func NewManager(ctx context.Context, webhookRepo store.RepositoryInterface, dbPool *pgxpool.Pool) (*Manager, error)

NewManager creates a new queue manager

func (*Manager) GetClient

func (m *Manager) GetClient() *river.Client[pgx.Tx]

func (*Manager) GetJobInserter

func (m *Manager) GetJobInserter() JobInserter

func (*Manager) Start

func (m *Manager) Start(ctx context.Context) error

Start starts the queue processing

func (*Manager) Stop

func (m *Manager) Stop(ctx context.Context) error

Stop stops the queue processing

type WebhookArgs

type WebhookArgs struct {
	DeliveryID  string    `json:"delivery_id"`
	WebhookID   string    `json:"webhook_id"`
	EventID     string    `json:"event_id"`
	ExpiresAt   time.Time `json:"expires_at"`
	Namespace   string    `json:"namespace"`
	MaxAttempts int       `json:"max_attempts"`
}

WebhookArgs represents a webhook delivery job Contains only essential identifiers - webhook config and event payload retrieved from database

func (WebhookArgs) InsertOpts

func (w WebhookArgs) InsertOpts() river.InsertOpts

func (WebhookArgs) Kind

func (WebhookArgs) Kind() string

Kind returns the job kind for River queue

type WebhookWorker

type WebhookWorker struct {
	river.WorkerDefaults[WebhookArgs]
	// contains filtered or unexported fields
}

WebhookWorker handles webhook delivery jobs

func NewWebhookWorker

func NewWebhookWorker(webhookRepo store.RepositoryInterface) *WebhookWorker

NewWebhookWorker creates a new webhook worker

func (*WebhookWorker) Work

func (w *WebhookWorker) Work(ctx context.Context, job *river.Job[WebhookArgs]) error

Work processes the webhook delivery job

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL