Documentation
¶
Index ¶
Constants ¶
const ( // Job kinds JobKindEventProcessing = "event_processing" JobKindWebhookDelivery = "webhook_delivery" // Queue names QueueEventProcessing = "events" QueueWebhookDelivery = "webhooks" QueueDefault = river.QueueDefault )
Variables ¶
This section is empty.
Functions ¶
Types ¶
type EventArgs ¶
type EventArgs struct {
EventID string `json:"event_id"`
Namespace string `json:"namespace"`
Event string `json:"event"`
TTLSeconds int64 `json:"ttl_seconds"`
Metadata map[string]string `json:"metadata"`
CreatedAt time.Time `json:"created_at"`
}
EventArgs represents an event processing job Contains only essential identifiers - the payload is stored in the database
func (EventArgs) InsertOpts ¶
func (EventArgs) InsertOpts() river.InsertOpts
type EventProcessingWorker ¶
type EventProcessingWorker struct {
river.WorkerDefaults[EventArgs]
// contains filtered or unexported fields
}
EventProcessingWorker processes events and triggers webhook deliveries
func NewEventProcessingWorker ¶
func NewEventProcessingWorker(webhookRepo store.RepositoryInterface, jobInserter JobInserter) *EventProcessingWorker
NewEventProcessingWorker creates a new event processing worker with a river client
type JobInserter ¶
type JobInserter interface {
Insert(ctx context.Context, args river.JobArgs) (*rivertype.JobInsertResult, error)
BatchInsert(ctx context.Context, args []river.JobArgs) ([]*rivertype.JobInsertResult, error)
}
QueueManagerInterface defines the interface for queue management.
type JobInserterWithTracing ¶
type JobInserterWithTracing struct {
JobInserter
// contains filtered or unexported fields
}
JobInserterWithTracing implements JobInserter interface instrumented with open telemetry spans
func NewJobInserterWithTracing ¶
func NewJobInserterWithTracing(base JobInserter, instance string, spanDecorator ...func(span trace.Span, params, results map[string]interface{})) JobInserterWithTracing
NewJobInserterWithTracing returns JobInserterWithTracing
func (JobInserterWithTracing) BatchInsert ¶
func (_d JobInserterWithTracing) BatchInsert(ctx context.Context, args []river.JobArgs) (jpa1 []*rivertype.JobInsertResult, err error)
BatchInsert implements JobInserter
func (JobInserterWithTracing) Insert ¶
func (_d JobInserterWithTracing) Insert(ctx context.Context, args river.JobArgs) (jp1 *rivertype.JobInsertResult, err error)
Insert implements JobInserter
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
Manager handles the River queue management
func NewManager ¶
func NewManager(ctx context.Context, webhookRepo store.RepositoryInterface, dbPool *pgxpool.Pool) (*Manager, error)
NewManager creates a new queue manager
func (*Manager) GetJobInserter ¶
func (m *Manager) GetJobInserter() JobInserter
type WebhookArgs ¶
type WebhookArgs struct {
DeliveryID string `json:"delivery_id"`
WebhookID string `json:"webhook_id"`
EventID string `json:"event_id"`
ExpiresAt time.Time `json:"expires_at"`
Namespace string `json:"namespace"`
MaxAttempts int `json:"max_attempts"`
}
WebhookArgs represents a webhook delivery job Contains only essential identifiers - webhook config and event payload retrieved from database
func (WebhookArgs) InsertOpts ¶
func (w WebhookArgs) InsertOpts() river.InsertOpts
func (WebhookArgs) Kind ¶
func (WebhookArgs) Kind() string
Kind returns the job kind for River queue
type WebhookWorker ¶
type WebhookWorker struct {
river.WorkerDefaults[WebhookArgs]
// contains filtered or unexported fields
}
WebhookWorker handles webhook delivery jobs
func NewWebhookWorker ¶
func NewWebhookWorker(webhookRepo store.RepositoryInterface) *WebhookWorker
NewWebhookWorker creates a new webhook worker
func (*WebhookWorker) Work ¶
func (w *WebhookWorker) Work(ctx context.Context, job *river.Job[WebhookArgs]) error
Work processes the webhook delivery job