models

package
v0.13.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 20, 2026 License: Apache-2.0 Imports: 11 Imported by: 0

Documentation

Index

Constants

View Source
const (
	AttemptStatusSuccess = "success"
	AttemptStatusFailed  = "failed"
)

Variables

View Source
var (
	ErrInvalidTopics       = errors.New("validation failed: invalid topics")
	ErrInvalidTopicsFormat = errors.New("validation failed: invalid topics format")
)

Functions

func MatchFilter added in v0.13.0

func MatchFilter(filter Filter, event Event) bool

MatchFilter checks if the given event matches the filter. Returns true if no filter is set (nil or empty) or if the event matches the filter.

func RetryID added in v0.13.0

func RetryID(eventID, destinationID string) string

RetryID returns the ID used for scheduling and canceling retries. Uses event_id:destination_id to allow manual retries to cancel pending automatic retries.

Types

type Attempt added in v0.13.0

type Attempt struct {
	ID            string                 `json:"id"`
	TenantID      string                 `json:"tenant_id"`
	EventID       string                 `json:"event_id"`
	DestinationID string                 `json:"destination_id"`
	AttemptNumber int                    `json:"attempt_number"`
	Manual        bool                   `json:"manual"`
	Status        string                 `json:"status"`
	Time          time.Time              `json:"time"`
	Code          string                 `json:"code"`
	ResponseData  map[string]interface{} `json:"response_data"`
}

type Config

type Config = MapStringString

type Credentials

type Credentials = MapStringString

type Data

type Data map[string]interface{}

func (*Data) String

func (d *Data) String() string

func (*Data) UnmarshalBinary

func (d *Data) UnmarshalBinary(data []byte) error

type DeliveryMetadata added in v0.8.0

type DeliveryMetadata = MapStringString

type DeliveryTask added in v0.13.0

type DeliveryTask struct {
	Event         Event              `json:"event"`
	DestinationID string             `json:"destination_id"`
	Attempt       int                `json:"attempt"`
	Manual        bool               `json:"manual"`
	Nonce         string             `json:"nonce,omitempty"`
	Telemetry     *DeliveryTelemetry `json:"telemetry,omitempty"`
}

DeliveryTask represents a task to deliver an event to a destination. This is a message type (no ID) used by: publishmq -> deliverymq, retry -> deliverymq

func NewDeliveryTask added in v0.13.0

func NewDeliveryTask(event Event, destinationID string) DeliveryTask

NewDeliveryTask creates a new DeliveryTask for an event and destination.

func NewManualDeliveryTask added in v0.13.0

func NewManualDeliveryTask(event Event, destinationID string) DeliveryTask

NewManualDeliveryTask creates a new DeliveryTask for a manual retry. Each manual retry gets a unique nonce so separate /retry requests are not deduplicated.

func (*DeliveryTask) FromMessage added in v0.13.0

func (t *DeliveryTask) FromMessage(msg *mqs.Message) error

func (*DeliveryTask) IdempotencyKey added in v0.13.0

func (t *DeliveryTask) IdempotencyKey() string

IdempotencyKey returns the key used for idempotency checks. Manual retries include a nonce so each /retry request gets its own idempotency key, while MQ redeliveries of the same message (same nonce) are still deduplicated. Nonce was added to fix a regression from #653 where removing DeliveryEvent.ID made the manual retry idempotency key static per event+destination.

func (*DeliveryTask) ToMessage added in v0.13.0

func (t *DeliveryTask) ToMessage() (*mqs.Message, error)

type DeliveryTelemetry added in v0.13.0

type DeliveryTelemetry struct {
	TraceID string
	SpanID  string
}

type Destination

type Destination struct {
	ID               string           `json:"id" redis:"id"`
	TenantID         string           `json:"tenant_id" redis:"-"`
	Type             string           `json:"type" redis:"type"`
	Topics           Topics           `json:"topics" redis:"-"`
	Filter           Filter           `json:"filter,omitempty" redis:"-"`
	Config           Config           `json:"config" redis:"-"`
	Credentials      Credentials      `json:"credentials" redis:"-"`
	DeliveryMetadata DeliveryMetadata `json:"delivery_metadata,omitempty" redis:"-"`
	Metadata         Metadata         `json:"metadata,omitempty" redis:"-"`
	CreatedAt        time.Time        `json:"created_at" redis:"created_at"`
	UpdatedAt        time.Time        `json:"updated_at" redis:"updated_at"`
	DisabledAt       *time.Time       `json:"disabled_at" redis:"disabled_at"`
}

func (*Destination) MatchEvent added in v0.10.0

func (d *Destination) MatchEvent(event Event) bool

MatchEvent checks if the destination matches the given event. Returns true if the destination is enabled, topic matches, and filter matches.

func (*Destination) Validate

func (d *Destination) Validate(topics []string) error

type Event

type Event struct {
	ID               string    `json:"id"`
	TenantID         string    `json:"tenant_id"`
	DestinationID    string    `json:"destination_id"`
	Topic            string    `json:"topic"`
	EligibleForRetry bool      `json:"eligible_for_retry"`
	Time             time.Time `json:"time"`
	Metadata         Metadata  `json:"metadata"`
	Data             Data      `json:"data"`

	// Telemetry data, must exist to properly trace events between publish receiver & delivery handler
	Telemetry *EventTelemetry `json:"telemetry,omitempty"`
}

func (*Event) FromMessage

func (e *Event) FromMessage(msg *mqs.Message) error

func (*Event) ToMessage

func (e *Event) ToMessage() (*mqs.Message, error)

type EventTelemetry

type EventTelemetry struct {
	TraceID      string
	SpanID       string
	ReceivedTime string // format time.RFC3339Nano
}

type Filter added in v0.10.0

type Filter map[string]any

Filter represents a JSON schema filter for event matching. It uses the simplejsonmatch schema syntax for filtering events.

func (*Filter) MarshalBinary added in v0.10.0

func (f *Filter) MarshalBinary() ([]byte, error)

func (*Filter) UnmarshalBinary added in v0.10.0

func (f *Filter) UnmarshalBinary(data []byte) error

type LogEntry added in v0.13.0

type LogEntry struct {
	Event   *Event   `json:"event"`
	Attempt *Attempt `json:"attempt"`
}

LogEntry represents a message for the log queue.

IMPORTANT: Both Event and Attempt are REQUIRED. The logstore requires both to exist for proper data consistency. The logmq consumer validates this requirement and rejects entries missing either field.

func (*LogEntry) FromMessage added in v0.13.0

func (e *LogEntry) FromMessage(msg *mqs.Message) error

func (*LogEntry) ToMessage added in v0.13.0

func (e *LogEntry) ToMessage() (*mqs.Message, error)

type MapStringString

type MapStringString map[string]string

func (*MapStringString) MarshalBinary

func (m *MapStringString) MarshalBinary() ([]byte, error)

func (*MapStringString) UnmarshalBinary

func (m *MapStringString) UnmarshalBinary(data []byte) error

func (*MapStringString) UnmarshalJSON

func (m *MapStringString) UnmarshalJSON(data []byte) error

type Metadata

type Metadata = MapStringString

type Tenant

type Tenant struct {
	ID                string    `json:"id" redis:"id"`
	DestinationsCount int       `json:"destinations_count" redis:"-"`
	Topics            []string  `json:"topics" redis:"-"`
	Metadata          Metadata  `json:"metadata,omitempty" redis:"-"`
	CreatedAt         time.Time `json:"created_at" redis:"created_at"`
	UpdatedAt         time.Time `json:"updated_at" redis:"updated_at"`
}

type Topics

type Topics []string

func TopicsFromString

func TopicsFromString(s string) Topics

func (*Topics) MarshalBinary

func (t *Topics) MarshalBinary() ([]byte, error)

func (*Topics) MarshalJSON

func (t *Topics) MarshalJSON() ([]byte, error)

func (*Topics) MatchTopic added in v0.9.0

func (t *Topics) MatchTopic(eventTopic string) bool

func (*Topics) MatchesAll

func (t *Topics) MatchesAll() bool

func (*Topics) UnmarshalBinary

func (t *Topics) UnmarshalBinary(data []byte) error

func (*Topics) UnmarshalJSON

func (t *Topics) UnmarshalJSON(data []byte) error

func (*Topics) Validate

func (t *Topics) Validate(availableTopics []string) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL