models

package
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 23, 2026 License: Apache-2.0 Imports: 11 Imported by: 0

Documentation

Index

Constants

View Source
const (
	AttemptStatusSuccess = "success"
	AttemptStatusFailed  = "failed"
)

Variables

View Source
var (
	ErrInvalidTopics       = errors.New("validation failed: invalid topics")
	ErrInvalidTopicsFormat = errors.New("validation failed: invalid topics format")
)

Functions

func MatchFilter added in v0.13.0

func MatchFilter(filter Filter, event Event) bool

MatchFilter checks if the given event matches the filter. Returns true if no filter is set (nil or empty) or if the event matches the filter.

func RetryID added in v0.13.0

func RetryID(eventID, destinationID string) string

RetryID returns the ID used for scheduling and canceling retries. Uses event_id:destination_id to allow manual retries to cancel pending automatic retries.

Types

type Attempt added in v0.13.0

type Attempt struct {
	ID              string                 `json:"id"`
	TenantID        string                 `json:"tenant_id"`
	EventID         string                 `json:"event_id"`
	DestinationID   string                 `json:"destination_id"`
	DestinationType string                 `json:"destination_type"`
	AttemptNumber   int                    `json:"attempt_number"`
	Manual          bool                   `json:"manual"`
	Status          string                 `json:"status"`
	Time            time.Time              `json:"time"`
	Code            string                 `json:"code"`
	ResponseData    map[string]interface{} `json:"response_data"`
}

type Config

type Config = MapStringString

type Credentials

type Credentials = MapStringString

type Data

type Data = json.RawMessage

Data holds the event payload as raw JSON bytes so that key order, whitespace and numeric precision survive every serialisation hop.

type DeliveryMetadata added in v0.8.0

type DeliveryMetadata = MapStringString

type DeliveryTask added in v0.13.0

type DeliveryTask struct {
	Event         Event              `json:"event"`
	DestinationID string             `json:"destination_id"`
	Attempt       int                `json:"attempt"`
	Manual        bool               `json:"manual"`
	Telemetry     *DeliveryTelemetry `json:"telemetry,omitempty"`
}

DeliveryTask represents a task to deliver an event to a destination. This is a message type (no ID) used by: publishmq -> deliverymq, retry -> deliverymq

func NewDeliveryTask added in v0.13.0

func NewDeliveryTask(event Event, destinationID string) DeliveryTask

NewDeliveryTask creates a new DeliveryTask for an event and destination.

func NewManualDeliveryTask added in v0.13.0

func NewManualDeliveryTask(event Event, destinationID string, attemptNumber int) DeliveryTask

NewManualDeliveryTask creates a new DeliveryTask for a manual retry. attemptNumber is the 1-indexed attempt number derived from the count of prior attempts.

func (*DeliveryTask) FromMessage added in v0.13.0

func (t *DeliveryTask) FromMessage(msg *mqs.Message) error

func (*DeliveryTask) IdempotencyKey added in v0.13.0

func (t *DeliveryTask) IdempotencyKey() string

IdempotencyKey returns the key used for idempotency checks. Uses event_id:destination_id:attempt_number so that:

  • Manual and auto retries with the same attempt_number are deduplicated (race protection)
  • Each new attempt gets a fresh key (no need to clear on failure)
  • MQ redeliveries of the same message are still deduplicated

func (*DeliveryTask) ToMessage added in v0.13.0

func (t *DeliveryTask) ToMessage() (*mqs.Message, error)

type DeliveryTelemetry added in v0.13.0

type DeliveryTelemetry struct {
	TraceID string
	SpanID  string
}

type Destination

type Destination struct {
	ID               string           `json:"id" redis:"id"`
	TenantID         string           `json:"tenant_id" redis:"-"`
	Type             string           `json:"type" redis:"type"`
	Topics           Topics           `json:"topics" redis:"-"`
	Filter           Filter           `json:"filter,omitempty" redis:"-"`
	Config           Config           `json:"config" redis:"-"`
	Credentials      Credentials      `json:"credentials" redis:"-"`
	DeliveryMetadata DeliveryMetadata `json:"delivery_metadata,omitempty" redis:"-"`
	Metadata         Metadata         `json:"metadata,omitempty" redis:"-"`
	CreatedAt        time.Time        `json:"created_at" redis:"created_at"`
	UpdatedAt        time.Time        `json:"updated_at" redis:"updated_at"`
	DisabledAt       *time.Time       `json:"disabled_at" redis:"disabled_at"`
}

func (*Destination) MatchEvent added in v0.10.0

func (d *Destination) MatchEvent(event Event) bool

MatchEvent checks if the destination matches the given event. Returns true if the destination is enabled, topic matches, and filter matches.

func (*Destination) Validate

func (d *Destination) Validate(topics []string) error

type Event

type Event struct {
	ID                    string    `json:"id"`
	TenantID              string    `json:"tenant_id"`
	DestinationID         string    `json:"destination_id"`
	MatchedDestinationIDs []string  `json:"matched_destination_ids"`
	Topic                 string    `json:"topic"`
	EligibleForRetry      bool      `json:"eligible_for_retry"`
	Time                  time.Time `json:"time"`
	Metadata              Metadata  `json:"metadata"`
	Data                  Data      `json:"data"`

	// Telemetry data, must exist to properly trace events between publish receiver & delivery handler
	Telemetry *EventTelemetry `json:"telemetry,omitempty"`
}

func (*Event) FromMessage

func (e *Event) FromMessage(msg *mqs.Message) error

func (*Event) ParsedData added in v0.13.2

func (e *Event) ParsedData() (map[string]any, error)

ParsedData unmarshals the raw JSON Data into a map[string]any. This is used by code that needs to inspect individual fields (e.g. filters, partition-key extraction) without losing the original byte representation.

func (*Event) ToMessage

func (e *Event) ToMessage() (*mqs.Message, error)

type EventTelemetry

type EventTelemetry struct {
	TraceID      string
	SpanID       string
	ReceivedTime string // format time.RFC3339Nano
}

type Filter added in v0.10.0

type Filter map[string]any

Filter represents a JSON schema filter for event matching. It uses the simplejsonmatch schema syntax for filtering events.

func (*Filter) MarshalBinary added in v0.10.0

func (f *Filter) MarshalBinary() ([]byte, error)

func (*Filter) UnmarshalBinary added in v0.10.0

func (f *Filter) UnmarshalBinary(data []byte) error

type LogEntry added in v0.13.0

type LogEntry struct {
	Event       *Event       `json:"event"`
	Attempt     *Attempt     `json:"attempt"`
	Destination *Destination `json:"destination,omitempty"` // carried for alert evaluation in logmq; ignored by logstore
}

LogEntry represents a message for the log queue.

IMPORTANT: Both Event and Attempt are REQUIRED. The logstore requires both to exist for proper data consistency. The logmq consumer validates this requirement and rejects entries missing either field.

func (*LogEntry) FromMessage added in v0.13.0

func (e *LogEntry) FromMessage(msg *mqs.Message) error

func (*LogEntry) ToMessage added in v0.13.0

func (e *LogEntry) ToMessage() (*mqs.Message, error)

type MapStringString

type MapStringString map[string]string

func (*MapStringString) MarshalBinary

func (m *MapStringString) MarshalBinary() ([]byte, error)

func (*MapStringString) UnmarshalBinary

func (m *MapStringString) UnmarshalBinary(data []byte) error

func (*MapStringString) UnmarshalJSON

func (m *MapStringString) UnmarshalJSON(data []byte) error

type Metadata

type Metadata = MapStringString

type Tenant

type Tenant struct {
	ID                string    `json:"id" redis:"id"`
	DestinationsCount int       `json:"destinations_count" redis:"-"`
	Topics            []string  `json:"topics" redis:"-"`
	Metadata          Metadata  `json:"metadata,omitempty" redis:"-"`
	CreatedAt         time.Time `json:"created_at" redis:"created_at"`
	UpdatedAt         time.Time `json:"updated_at" redis:"updated_at"`
}

type Topics

type Topics []string

func TopicsFromString

func TopicsFromString(s string) Topics

func (*Topics) MarshalBinary

func (t *Topics) MarshalBinary() ([]byte, error)

func (*Topics) MarshalJSON

func (t *Topics) MarshalJSON() ([]byte, error)

func (*Topics) MatchTopic added in v0.9.0

func (t *Topics) MatchTopic(eventTopic string) bool

func (*Topics) MatchesAll

func (t *Topics) MatchesAll() bool

func (*Topics) UnmarshalBinary

func (t *Topics) UnmarshalBinary(data []byte) error

func (*Topics) UnmarshalJSON

func (t *Topics) UnmarshalJSON(data []byte) error

func (*Topics) Validate

func (t *Topics) Validate(availableTopics []string) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL