Documentation
¶
Index ¶
- Constants
- Variables
- type Event
- type EventAction
- type EventHandler
- type GroupRemoved
- type OrgCreated
- type OrgRemoved
- type ProfileCreated
- type ProfileRemoved
- type ProfileUpdated
- type Publisher
- type PublisherConfig
- type Subscriber
- type SubscriberConfig
- type ThingCreated
- type ThingGroupAndProfileUpdated
- type ThingRemoved
- type ThingUpdated
Constants ¶
const ( // DefStreamMaxLen is the default approx number of messages in the Redis event stream. DefStreamMaxLen = 100000 // DefBufferSize is the default capacity of the publish // buffer channel. DefBufferSize = 10000 // DefDrainInitialInterval is the initial sleep duration between XADD retries when // emitting an event fails. DefDrainInitialInterval = time.Second // DefDrainMaxBackoff is the maximum backoff duration between XADD retries. DefDrainMaxBackoff = 30 * time.Second // DefShutdownDrainTimeout is the maximum duration that Close() can spends trying to drain // the remaining buffered events. DefShutdownDrainTimeout = 5 * time.Second )
const ( // Event operation strings ThingCreate = thingPrefix + "create" ThingUpdate = thingPrefix + "update" ThingUpdateGroupAndProfile = thingPrefix + "update_group_and_profile" ThingRemove = thingPrefix + "remove" ProfileCreate = profilePrefix + "create" ProfileUpdate = profilePrefix + "update" ProfileRemove = profilePrefix + "remove" GroupRemove = groupPrefix + "remove" OrgCreate = orgPrefix + "create" OrgRemove = orgPrefix + "remove" // Redis event streams ThingsStream = mainfluxPrefix + "things" AuthStream = mainfluxPrefix + "auth" )
Variables ¶
var ( // ErrEmptyStream is returned when stream name is empty. ErrEmptyStream = errors.New("stream name cannot be empty") // ErrEmptyName is returned when the subscriber name is empty. ErrEmptyName = errors.New("subscriber name cannot be empty") )
Functions ¶
This section is empty.
Types ¶
type Event ¶
type Event struct {
// The specific EventAction that occurred.
Action EventAction
// The identity of the user whose action triggered the event.
JWTUserIdentity domain.Identity
// The ID of the organization to which the event belongs. May be empty.
OrgID string
// The ID of the group to which the event belongs. May be empty.
GroupID string
OccurredAt time.Time
}
Event is the type representing a platform event.
type EventAction ¶ added in v0.41.0
type EventAction interface {
// Encode returns a map representation of the event action.
Encode() map[string]any
// Operation returns the string representation of the event action operation.
Operation() string
}
EventAction represents an action occurring on the platform.
type EventHandler ¶
EventHandler reacts to a single event delivered by a Subscriber.
type GroupRemoved ¶ added in v0.41.0
GroupRemoved signals that a group has been removed.
func (GroupRemoved) Encode ¶ added in v0.41.0
func (e GroupRemoved) Encode() map[string]any
func (GroupRemoved) Operation ¶ added in v0.41.0
func (e GroupRemoved) Operation() string
type OrgCreated ¶ added in v0.41.0
type OrgCreated struct {
ID string
}
OrgCreated signals the creation of an organization.
func (OrgCreated) Encode ¶ added in v0.41.0
func (e OrgCreated) Encode() map[string]any
func (OrgCreated) Operation ¶ added in v0.41.0
func (e OrgCreated) Operation() string
type OrgRemoved ¶ added in v0.41.0
type OrgRemoved struct {
ID string
}
OrgRemoved signals that an organization has been removed.
func (OrgRemoved) Encode ¶ added in v0.41.0
func (e OrgRemoved) Encode() map[string]any
func (OrgRemoved) Operation ¶ added in v0.41.0
func (e OrgRemoved) Operation() string
type ProfileCreated ¶ added in v0.41.0
ProfileCreated signals the creation of a profile.
func (ProfileCreated) Encode ¶ added in v0.41.0
func (e ProfileCreated) Encode() map[string]any
func (ProfileCreated) Operation ¶ added in v0.41.0
func (e ProfileCreated) Operation() string
type ProfileRemoved ¶ added in v0.41.0
type ProfileRemoved struct {
ID string
}
ProfileRemoved signals that a profile has been removed.
func (ProfileRemoved) Encode ¶ added in v0.41.0
func (e ProfileRemoved) Encode() map[string]any
func (ProfileRemoved) Operation ¶ added in v0.41.0
func (e ProfileRemoved) Operation() string
type ProfileUpdated ¶ added in v0.41.0
type ProfileUpdated struct {
ID string
Name string
Config *domain.ProfileConfig
Metadata map[string]any
}
ProfileUpdated signals a profile update.
func (ProfileUpdated) Encode ¶ added in v0.41.0
func (e ProfileUpdated) Encode() map[string]any
func (ProfileUpdated) Operation ¶ added in v0.41.0
func (e ProfileUpdated) Operation() string
type Publisher ¶ added in v0.41.0
type Publisher interface {
// Publish queues the passed event for delivery by sending it to the buffer
// channel. It's asynchronous, meaning that it doesn't block the caller
// and as such returns no errors. If the underlying event buffer is full,
// the oldest queued event is dropped to make space.
Publish(ctx context.Context, e Event)
// Close stops the event drainer goroutine, attempts a final drain,
// and closes the underlying Redis client.
Close() error
}
Publisher delivers Events to the Redis event store asynchronously. It uses an in-memory buffer to queue and retry failed events.
func NewPublisher ¶ added in v0.41.0
func NewPublisher(cfg PublisherConfig, log logger.Logger) (Publisher, error)
NewPublisher returns a Publisher that buffers events in process and drains them to Redis in the background.
type PublisherConfig ¶ added in v0.41.0
type PublisherConfig struct {
URL string
Stream string
// MaxLen is the approximate stream retention (XADD MAXLEN ~).
MaxLen int64
// BufferSize is the capacity of the buffer channel holding events.
BufferSize int
// DrainInitialInterval is the initial XADD retry sleep on failure.
DrainInitialInterval time.Duration
// DrainMaxBackoff caps the exponential retry backoff.
DrainMaxBackoff time.Duration
// ShutdownDrainTimeout bounds the final drain in Close().
ShutdownDrainTimeout time.Duration
}
type Subscriber ¶
type Subscriber interface {
// Subscribe subscribes to the event stream and consumes events.
Subscribe(ctx context.Context, handler EventHandler) error
// Close gracefully closes event subscriber's connection.
Close() error
}
Subscriber specifies event subscription API.
func NewSubscriber ¶ added in v0.34.0
func NewSubscriber(cfg SubscriberConfig, log logger.Logger) (Subscriber, error)
NewSubscriber returns a Subscriber that reads from a Redis stream using XRead and tracks its position via a per-subscriber Redis key.
type SubscriberConfig ¶ added in v0.41.0
type SubscriberConfig struct {
// URL is the Redis connection URL (e.g. redis://host:6379/0).
URL string
// Stream is the name of the Redis stream to read from.
Stream string
// Name identifies this subscriber. Used in the cursor key so
// independent subscribers maintain independent cursors on the same
// stream.
Name string
}
SubscriberConfig holds the parameters for creating a Subscriber.
type ThingCreated ¶ added in v0.41.0
type ThingCreated struct {
ID string
GroupID string
ProfileID string
Name string
Metadata map[string]any
}
ThingCreated signals the creation of a thing.
func (ThingCreated) Encode ¶ added in v0.41.0
func (e ThingCreated) Encode() map[string]any
func (ThingCreated) Operation ¶ added in v0.41.0
func (e ThingCreated) Operation() string
type ThingGroupAndProfileUpdated ¶ added in v0.41.0
ThingGroupAndProfileUpdated signals a thing's group/profile reassignment.
func (ThingGroupAndProfileUpdated) Encode ¶ added in v0.41.0
func (e ThingGroupAndProfileUpdated) Encode() map[string]any
func (ThingGroupAndProfileUpdated) Operation ¶ added in v0.41.0
func (e ThingGroupAndProfileUpdated) Operation() string
type ThingRemoved ¶ added in v0.41.0
type ThingRemoved struct {
ID string
}
ThingRemoved signals that a thing has been removed.
func (ThingRemoved) Encode ¶ added in v0.41.0
func (e ThingRemoved) Encode() map[string]any
func (ThingRemoved) Operation ¶ added in v0.41.0
func (e ThingRemoved) Operation() string
type ThingUpdated ¶ added in v0.41.0
ThingUpdated signals a thing update.
func (ThingUpdated) Encode ¶ added in v0.41.0
func (e ThingUpdated) Encode() map[string]any
func (ThingUpdated) Operation ¶ added in v0.41.0
func (e ThingUpdated) Operation() string