events

package
v0.41.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 26, 2026 License: Apache-2.0 Imports: 11 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// DefStreamMaxLen is the default approx number of messages in the Redis event stream.
	DefStreamMaxLen = 100000

	// DefBufferSize is the default capacity of the publish
	// buffer channel.
	DefBufferSize = 10000

	// DefDrainInitialInterval is the initial sleep duration between XADD retries when
	// emitting an event fails.
	DefDrainInitialInterval = time.Second

	// DefDrainMaxBackoff is the maximum backoff duration between XADD retries.
	DefDrainMaxBackoff = 30 * time.Second

	// DefShutdownDrainTimeout is the maximum duration that Close() can spends trying to drain
	// the remaining buffered events.
	DefShutdownDrainTimeout = 5 * time.Second
)
View Source
const (

	// Event operation strings
	ThingCreate                = thingPrefix + "create"
	ThingUpdate                = thingPrefix + "update"
	ThingUpdateGroupAndProfile = thingPrefix + "update_group_and_profile"
	ThingRemove                = thingPrefix + "remove"

	ProfileCreate = profilePrefix + "create"
	ProfileUpdate = profilePrefix + "update"
	ProfileRemove = profilePrefix + "remove"

	GroupRemove = groupPrefix + "remove"

	OrgCreate = orgPrefix + "create"
	OrgRemove = orgPrefix + "remove"

	// Redis event streams
	ThingsStream = mainfluxPrefix + "things"
	AuthStream   = mainfluxPrefix + "auth"
)

Variables

View Source
var (
	// ErrEmptyStream is returned when stream name is empty.
	ErrEmptyStream = errors.New("stream name cannot be empty")

	// ErrEmptyName is returned when the subscriber name is empty.
	ErrEmptyName = errors.New("subscriber name cannot be empty")
)

Functions

This section is empty.

Types

type Event

type Event struct {
	// The specific EventAction that occurred.
	Action EventAction
	// The identity of the user whose action triggered the event.
	JWTUserIdentity domain.Identity
	// The ID of the organization to which the event belongs. May be empty.
	OrgID string
	// The ID of the group to which the event belongs. May be empty.
	GroupID    string
	OccurredAt time.Time
}

Event is the type representing a platform event.

func (Event) Encode

func (e Event) Encode() redisEvent

Encode turns an Event into a redisEvent

type EventAction added in v0.41.0

type EventAction interface {
	// Encode returns a map representation of the event action.
	Encode() map[string]any

	// Operation returns the string representation of the event action operation.
	Operation() string
}

EventAction represents an action occurring on the platform.

type EventHandler

type EventHandler interface {
	Handle(ctx context.Context, event Event) error
}

EventHandler reacts to a single event delivered by a Subscriber.

type GroupRemoved added in v0.41.0

type GroupRemoved struct {
	ID       string
	ThingIDs []string
}

GroupRemoved signals that a group has been removed.

func (GroupRemoved) Encode added in v0.41.0

func (e GroupRemoved) Encode() map[string]any

func (GroupRemoved) Operation added in v0.41.0

func (e GroupRemoved) Operation() string

type OrgCreated added in v0.41.0

type OrgCreated struct {
	ID string
}

OrgCreated signals the creation of an organization.

func (OrgCreated) Encode added in v0.41.0

func (e OrgCreated) Encode() map[string]any

func (OrgCreated) Operation added in v0.41.0

func (e OrgCreated) Operation() string

type OrgRemoved added in v0.41.0

type OrgRemoved struct {
	ID string
}

OrgRemoved signals that an organization has been removed.

func (OrgRemoved) Encode added in v0.41.0

func (e OrgRemoved) Encode() map[string]any

func (OrgRemoved) Operation added in v0.41.0

func (e OrgRemoved) Operation() string

type ProfileCreated added in v0.41.0

type ProfileCreated struct {
	ID       string
	GroupID  string
	Name     string
	Metadata map[string]any
}

ProfileCreated signals the creation of a profile.

func (ProfileCreated) Encode added in v0.41.0

func (e ProfileCreated) Encode() map[string]any

func (ProfileCreated) Operation added in v0.41.0

func (e ProfileCreated) Operation() string

type ProfileRemoved added in v0.41.0

type ProfileRemoved struct {
	ID string
}

ProfileRemoved signals that a profile has been removed.

func (ProfileRemoved) Encode added in v0.41.0

func (e ProfileRemoved) Encode() map[string]any

func (ProfileRemoved) Operation added in v0.41.0

func (e ProfileRemoved) Operation() string

type ProfileUpdated added in v0.41.0

type ProfileUpdated struct {
	ID       string
	Name     string
	Config   *domain.ProfileConfig
	Metadata map[string]any
}

ProfileUpdated signals a profile update.

func (ProfileUpdated) Encode added in v0.41.0

func (e ProfileUpdated) Encode() map[string]any

func (ProfileUpdated) Operation added in v0.41.0

func (e ProfileUpdated) Operation() string

type Publisher added in v0.41.0

type Publisher interface {
	// Publish queues the passed event for delivery by sending it to the buffer
	// channel. It's asynchronous, meaning that it doesn't block the caller
	// and as such returns no errors. If the underlying event buffer is full,
	// the oldest queued event is dropped to make space.
	Publish(ctx context.Context, e Event)

	// Close stops the event drainer goroutine, attempts a final drain,
	// and closes the underlying Redis client.
	Close() error
}

Publisher delivers Events to the Redis event store asynchronously. It uses an in-memory buffer to queue and retry failed events.

func NewPublisher added in v0.41.0

func NewPublisher(cfg PublisherConfig, log logger.Logger) (Publisher, error)

NewPublisher returns a Publisher that buffers events in process and drains them to Redis in the background.

type PublisherConfig added in v0.41.0

type PublisherConfig struct {
	URL    string
	Stream string

	// MaxLen is the approximate stream retention (XADD MAXLEN ~).
	MaxLen int64

	// BufferSize is the capacity of the buffer channel holding events.
	BufferSize int

	// DrainInitialInterval is the initial XADD retry sleep on failure.
	DrainInitialInterval time.Duration

	// DrainMaxBackoff caps the exponential retry backoff.
	DrainMaxBackoff time.Duration

	// ShutdownDrainTimeout bounds the final drain in Close().
	ShutdownDrainTimeout time.Duration
}

type Subscriber

type Subscriber interface {
	// Subscribe subscribes to the event stream and consumes events.
	Subscribe(ctx context.Context, handler EventHandler) error

	// Close gracefully closes event subscriber's connection.
	Close() error
}

Subscriber specifies event subscription API.

func NewSubscriber added in v0.34.0

func NewSubscriber(cfg SubscriberConfig, log logger.Logger) (Subscriber, error)

NewSubscriber returns a Subscriber that reads from a Redis stream using XRead and tracks its position via a per-subscriber Redis key.

type SubscriberConfig added in v0.41.0

type SubscriberConfig struct {
	// URL is the Redis connection URL (e.g. redis://host:6379/0).
	URL string
	// Stream is the name of the Redis stream to read from.
	Stream string
	// Name identifies this subscriber. Used in the cursor key so
	// independent subscribers maintain independent cursors on the same
	// stream.
	Name string
}

SubscriberConfig holds the parameters for creating a Subscriber.

type ThingCreated added in v0.41.0

type ThingCreated struct {
	ID        string
	GroupID   string
	ProfileID string
	Name      string
	Metadata  map[string]any
}

ThingCreated signals the creation of a thing.

func (ThingCreated) Encode added in v0.41.0

func (e ThingCreated) Encode() map[string]any

func (ThingCreated) Operation added in v0.41.0

func (e ThingCreated) Operation() string

type ThingGroupAndProfileUpdated added in v0.41.0

type ThingGroupAndProfileUpdated struct {
	ID        string
	ProfileID string
	GroupID   string
}

ThingGroupAndProfileUpdated signals a thing's group/profile reassignment.

func (ThingGroupAndProfileUpdated) Encode added in v0.41.0

func (e ThingGroupAndProfileUpdated) Encode() map[string]any

func (ThingGroupAndProfileUpdated) Operation added in v0.41.0

func (e ThingGroupAndProfileUpdated) Operation() string

type ThingRemoved added in v0.41.0

type ThingRemoved struct {
	ID string
}

ThingRemoved signals that a thing has been removed.

func (ThingRemoved) Encode added in v0.41.0

func (e ThingRemoved) Encode() map[string]any

func (ThingRemoved) Operation added in v0.41.0

func (e ThingRemoved) Operation() string

type ThingUpdated added in v0.41.0

type ThingUpdated struct {
	ID        string
	ProfileID string
	Name      string
	Metadata  map[string]any
}

ThingUpdated signals a thing update.

func (ThingUpdated) Encode added in v0.41.0

func (e ThingUpdated) Encode() map[string]any

func (ThingUpdated) Operation added in v0.41.0

func (e ThingUpdated) Operation() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL