Documentation
¶
Index ¶
- Constants
- Variables
- type AESCipher
- type Cipher
- type Config
- type Credentials
- type Data
- type Delivery
- type DeliveryEvent
- type DeliveryEventTelemetry
- type DeliveryMetadata
- type Destination
- type DestinationFilter
- type DestinationSummary
- type EntityStore
- type EntityStoreOption
- type Event
- type EventTelemetry
- type Filter
- type ListDestinationByTenantOpts
- type MapStringString
- type Metadata
- type Tenant
- type Topics
- func (t *Topics) MarshalBinary() ([]byte, error)
- func (t *Topics) MarshalJSON() ([]byte, error)
- func (t *Topics) MatchTopic(eventTopic string) bool
- func (t *Topics) MatchesAll() bool
- func (t *Topics) UnmarshalBinary(data []byte) error
- func (t *Topics) UnmarshalJSON(data []byte) error
- func (t *Topics) Validate(availableTopics []string) error
Constants ¶
const ( DeliveryStatusSuccess = "success" DeliveryStatusFailed = "failed" )
Variables ¶
var ( ErrInvalidTopics = errors.New("validation failed: invalid topics") ErrInvalidTopicsFormat = errors.New("validation failed: invalid topics format") )
var ( ErrTenantNotFound = errors.New("tenant does not exist") ErrTenantDeleted = errors.New("tenant has been deleted") ErrDuplicateDestination = errors.New("destination already exists") ErrDestinationNotFound = errors.New("destination does not exist") ErrDestinationDeleted = errors.New("destination has been deleted") ErrMaxDestinationsPerTenantReached = errors.New("maximum number of destinations per tenant reached") )
Functions ¶
This section is empty.
Types ¶
type Cipher ¶
func NewAESCipher ¶
type Config ¶
type Config = MapStringString
type Credentials ¶
type Credentials = MapStringString
type Delivery ¶
type Delivery struct {
ID string `json:"id"`
DeliveryEventID string `json:"delivery_event_id"`
EventID string `json:"event_id"`
DestinationID string `json:"destination_id"`
Status string `json:"status"`
Time time.Time `json:"time"`
Code string `json:"code"`
ResponseData map[string]interface{} `json:"response_data"`
}
type DeliveryEvent ¶
type DeliveryEvent struct {
ID string
Attempt int
DestinationID string
Event Event
Delivery *Delivery
Telemetry *DeliveryEventTelemetry
Manual bool // Indicates if this is a manual retry
}
func NewDeliveryEvent ¶
func NewDeliveryEvent(event Event, destinationID string) DeliveryEvent
func NewManualDeliveryEvent ¶
func NewManualDeliveryEvent(event Event, destinationID string) DeliveryEvent
func (*DeliveryEvent) FromMessage ¶
func (e *DeliveryEvent) FromMessage(msg *mqs.Message) error
func (*DeliveryEvent) GetRetryID ¶
func (e *DeliveryEvent) GetRetryID() string
GetRetryID returns the ID used for scheduling retries. We use Event.ID instead of DeliveryEvent.ID because: 1. Each event should only have one scheduled retry at a time 2. Event.ID is always accessible, while DeliveryEvent.ID may require additional queries in retry scenarios
type DeliveryEventTelemetry ¶
type DeliveryMetadata ¶ added in v0.8.0
type DeliveryMetadata = MapStringString
type Destination ¶
type Destination struct {
ID string `json:"id" redis:"id"`
TenantID string `json:"tenant_id" redis:"-"`
Type string `json:"type" redis:"type"`
Topics Topics `json:"topics" redis:"-"`
Filter Filter `json:"filter,omitempty" redis:"-"`
Config Config `json:"config" redis:"-"`
Credentials Credentials `json:"credentials" redis:"-"`
DeliveryMetadata DeliveryMetadata `json:"delivery_metadata,omitempty" redis:"-"`
Metadata Metadata `json:"metadata,omitempty" redis:"-"`
CreatedAt time.Time `json:"created_at" redis:"created_at"`
UpdatedAt time.Time `json:"updated_at" redis:"updated_at"`
DisabledAt *time.Time `json:"disabled_at" redis:"disabled_at"`
}
func (*Destination) MatchEvent ¶ added in v0.10.0
func (d *Destination) MatchEvent(event Event) bool
MatchEvent checks if the destination matches the given event. Returns true if the destination is enabled, topic matches, and filter matches.
func (*Destination) ToSummary ¶
func (d *Destination) ToSummary() *DestinationSummary
func (*Destination) Validate ¶
func (d *Destination) Validate(topics []string) error
type DestinationFilter ¶
type DestinationSummary ¶
type DestinationSummary struct {
ID string `json:"id"`
Type string `json:"type"`
Topics Topics `json:"topics"`
Filter Filter `json:"filter,omitempty"`
Disabled bool `json:"disabled"`
}
func (*DestinationSummary) MarshalBinary ¶
func (ds *DestinationSummary) MarshalBinary() ([]byte, error)
func (*DestinationSummary) MatchFilter ¶ added in v0.10.0
func (ds *DestinationSummary) MatchFilter(event Event) bool
MatchFilter checks if the given event matches the destination's filter. Returns true if no filter is set (nil or empty) or if the event matches the filter.
func (*DestinationSummary) UnmarshalBinary ¶
func (ds *DestinationSummary) UnmarshalBinary(data []byte) error
type EntityStore ¶
type EntityStore interface {
RetrieveTenant(ctx context.Context, tenantID string) (*Tenant, error)
UpsertTenant(ctx context.Context, tenant Tenant) error
DeleteTenant(ctx context.Context, tenantID string) error
ListDestinationByTenant(ctx context.Context, tenantID string, options ...ListDestinationByTenantOpts) ([]Destination, error)
RetrieveDestination(ctx context.Context, tenantID, destinationID string) (*Destination, error)
CreateDestination(ctx context.Context, destination Destination) error
UpsertDestination(ctx context.Context, destination Destination) error
DeleteDestination(ctx context.Context, tenantID, destinationID string) error
MatchEvent(ctx context.Context, event Event) ([]DestinationSummary, error)
}
func NewEntityStore ¶
func NewEntityStore(redisClient redis.Cmdable, opts ...EntityStoreOption) EntityStore
type EntityStoreOption ¶
type EntityStoreOption func(*entityStoreImpl)
func WithAvailableTopics ¶
func WithAvailableTopics(topics []string) EntityStoreOption
func WithCipher ¶
func WithCipher(cipher Cipher) EntityStoreOption
func WithDeploymentID ¶ added in v0.7.0
func WithDeploymentID(deploymentID string) EntityStoreOption
func WithMaxDestinationsPerTenant ¶
func WithMaxDestinationsPerTenant(maxDestinationsPerTenant int) EntityStoreOption
type Event ¶
type Event struct {
ID string `json:"id"`
TenantID string `json:"tenant_id"`
DestinationID string `json:"destination_id"`
Topic string `json:"topic"`
EligibleForRetry bool `json:"eligible_for_retry"`
Time time.Time `json:"time"`
Metadata Metadata `json:"metadata"`
Data Data `json:"data"`
Status string `json:"status,omitempty"`
// Telemetry data, must exist to properly trace events between publish receiver & delivery handler
Telemetry *EventTelemetry `json:"telemetry,omitempty"`
}
type EventTelemetry ¶
type Filter ¶ added in v0.10.0
Filter represents a JSON schema filter for event matching. It uses the simplejsonmatch schema syntax for filtering events.
func (*Filter) MarshalBinary ¶ added in v0.10.0
func (*Filter) UnmarshalBinary ¶ added in v0.10.0
type ListDestinationByTenantOpts ¶
type ListDestinationByTenantOpts struct {
Filter *DestinationFilter
}
func WithDestinationFilter ¶
func WithDestinationFilter(filter DestinationFilter) ListDestinationByTenantOpts
type MapStringString ¶
func (*MapStringString) MarshalBinary ¶
func (m *MapStringString) MarshalBinary() ([]byte, error)
func (*MapStringString) UnmarshalBinary ¶
func (m *MapStringString) UnmarshalBinary(data []byte) error
func (*MapStringString) UnmarshalJSON ¶
func (m *MapStringString) UnmarshalJSON(data []byte) error
type Metadata ¶
type Metadata = MapStringString
type Tenant ¶
type Tenant struct {
ID string `json:"id" redis:"id"`
DestinationsCount int `json:"destinations_count" redis:"-"`
Topics []string `json:"topics" redis:"-"`
Metadata Metadata `json:"metadata,omitempty" redis:"-"`
CreatedAt time.Time `json:"created_at" redis:"created_at"`
UpdatedAt time.Time `json:"updated_at" redis:"updated_at"`
}
type Topics ¶
type Topics []string