data

package
v0.2.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 28, 2025 License: Apache-2.0 Imports: 12 Imported by: 0

Documentation

Index

Constants

View Source
const (
	PullConsumer ConsumerType = iota
	PushConsumer
	PullConsumerStr = "pull"
	PushConsumerStr = "push"
)
View Source
const (

	// JobQueued is the job status during first attempt
	JobQueued JobStatus = iota + 1000
	// JobInflight is to signify that the DeliveryJob is in its first attempt
	JobInflight
	// JobDelivered signifies that the DeliveryJob received 2XX status from consumer
	JobDelivered
	// JobDead signifies that retry has taken its toll and max retried happened
	JobDead
	// JobQueuedStr is the string rep of JobQueued
	JobQueuedStr = "QUEUED"
	// JobInflightStr is the string rep of JobInflight
	JobInflightStr = "INFLIGHT"
	// JobDeliveredStr is the string rep of JobDelivered
	JobDeliveredStr = "DELIVERED"
	// JobDeadStr is the string rep of JobDead
	JobDeadStr = "DEAD"
)
View Source
const (

	// MsgStatusAcknowledged represents the state after receiving the message but before it is dispatched
	MsgStatusAcknowledged MsgStatus = iota + 100
	// MsgStatusDispatched represents the fact that the dispatch jobs have been created for the message
	MsgStatusDispatched
	// MsgStatusAcknowledgedStr is the string representation of message's acknowledged status
	MsgStatusAcknowledgedStr = "ACKNOWLEDGED"
	// MsgStatusDispatchedStr is the string representation of message's dispatched status
	MsgStatusDispatchedStr = "DISPATCHED"
)
View Source
const (

	// ScheduledMsgStatusScheduled represents the state where message is waiting to be dispatched
	ScheduledMsgStatusScheduled ScheduledMsgStatus = iota + 100
	// ScheduledMsgStatusDispatched represents when message has been dispatched
	ScheduledMsgStatusDispatched
	// ScheduledMsgStatusScheduledStr is the string representation of message's scheduled status
	ScheduledMsgStatusScheduledStr = "SCHEDULED"
	// ScheduledMsgStatusDispatchedStr is the string representation of message's dispatched status
	ScheduledMsgStatusDispatchedStr = "DISPATCHED"
)

Variables

View Source
var (
	// ErrInsufficientInformationForCreating is returned when NewProducer is called with insufficient information
	ErrInsufficientInformationForCreating = errors.New("Necessary information missing for persistence")
)
View Source
var (
	// ErrLockableNil represents the error returned when lockable is nil in NewLock
	ErrLockableNil = errors.New("lockable can not be nil")
)

Functions

This section is empty.

Types

type App

type App struct {
	// contains filtered or unexported fields
}

App represents this application state for cross cluster use

func NewApp

func NewApp(seedData *config.SeedData, status AppStatus) *App

NewApp initializes a new App instance

func (*App) GetSeedData

func (app *App) GetSeedData() *config.SeedData

GetSeedData retrieves the current seed data config of the App. In NonInitialized status it can be nil

func (*App) GetStatus

func (app *App) GetStatus() AppStatus

GetStatus retrieves the current status of the App

type AppStatus

type AppStatus int

AppStatus represents the status of this App

const (
	// NotInitialized is when the App is just started and no initialization ever happened
	NotInitialized AppStatus = iota + 1
	// Initializing is when App has started to run the initializing process
	Initializing
	// Initialized is when init process is completed for the App
	Initialized
)

type BasePaginateable

type BasePaginateable struct {
	ID        xid.ID
	CreatedAt time.Time
	UpdatedAt time.Time
}

BasePaginateable provides common functionalities around paginateable objects

func (*BasePaginateable) GetCursor

func (paginateable *BasePaginateable) GetCursor() (cursor *Cursor, err error)

GetCursor returns the cursor value for this producer

func (*BasePaginateable) GetLastUpdatedHTTPTimeString

func (paginateable *BasePaginateable) GetLastUpdatedHTTPTimeString() string

GetLastUpdatedHTTPTimeString exposes the string rep of the last modified timestamp for the object

func (*BasePaginateable) QuickFix

func (paginateable *BasePaginateable) QuickFix() bool

QuickFix fixes base paginate-able model's attribute

type Channel

type Channel struct {
	MessageStakeholder
	ChannelID string
}

Channel is the object that producer broadcasts to and consumer consumes from

func NewChannel

func NewChannel(channelID string, token string) (*Channel, error)

NewChannel creates new Consumer

func (*Channel) IsInValidState

func (channel *Channel) IsInValidState() bool

IsInValidState returns false if any of channel id or name or token is empty

func (*Channel) QuickFix

func (channel *Channel) QuickFix() bool

QuickFix fixes the model to set default ID, name same as channel id, created and updated at to current time.

type Consumer

type Consumer struct {
	MessageStakeholder
	ConsumerID    string
	CallbackURL   string
	ConsumingFrom *Channel
	Type          ConsumerType
}

Consumer is the object that producer broadcasts to and consumer consumes from

func NewConsumer

func NewConsumer(channel *Channel, consumerID, token string, callbackURL *url.URL, consumerTypeStr string) (*Consumer, error)

NewConsumer creates new Consumer

func (*Consumer) GetChannelIDSafely

func (consumer *Consumer) GetChannelIDSafely() (channelID string)

GetChannelIDSafely retrieves channel id account for the fact that ConsumingFrom may be null

func (*Consumer) IsInValidState

func (consumer *Consumer) IsInValidState() bool

IsInValidState returns false if any of consumer id or name or token is empty, channel is not nil and callback URL is absolute URL

func (*Consumer) QuickFix

func (consumer *Consumer) QuickFix() bool

QuickFix fixes the model to set default ID, name same as producer id, created and updated at to current time.

type ConsumerType

type ConsumerType int

func (ConsumerType) String

func (consumerType ConsumerType) String() string

type Cursor

type Cursor struct {
	ID        string
	Timestamp time.Time
}

Cursor represents a string used for pagination

func ParseCursor

func ParseCursor(encodedCursorString string) (cursor *Cursor, err error)

ParseCursor creates Cursor from its string representation

func (*Cursor) String

func (c *Cursor) String() string

type DeliveryJob

type DeliveryJob struct {
	BasePaginateable
	Message               *Message
	Listener              *Consumer
	Status                JobStatus
	StatusChangedAt       time.Time
	DispatchReceivedAt    time.Time
	EarliestNextAttemptAt time.Time
	RetryAttemptCount     uint
	Priority              uint
	IncrementalTimeout    uint // in seconds
}

DeliveryJob represents the DTO object for deliverying a Message to a consumer

func NewDeliveryJob

func NewDeliveryJob(msg *Message, consumer *Consumer) (job *DeliveryJob, err error)

NewDeliveryJob creates a new instance of DeliveryJob; returns insufficient info error if parameters are not valid for a new DeliveryJob

func (*DeliveryJob) GetLockID

func (job *DeliveryJob) GetLockID() string

GetLockID retrieves the Lock ID representing this instance of DeliveryJob

func (*DeliveryJob) IsInValidState

func (job *DeliveryJob) IsInValidState() bool

IsInValidState returns false if any of message id or payload or content type is empty, channel is nil, callback URL is not url or not absolute URL, status not recognized, received at and outboxed at not set properly. Call QuickFix before IsInValidState is called.

func (*DeliveryJob) QuickFix

func (job *DeliveryJob) QuickFix() bool

QuickFix fixes the object state automatically as much as possible

type HeadersMap

type HeadersMap map[string]string

func (*HeadersMap) Scan

func (hmap *HeadersMap) Scan(value interface{}) error

func (HeadersMap) Value

func (hmap HeadersMap) Value() (driver.Value, error)

type JobStatus

type JobStatus int

JobStatus represents the delivery job status

func (JobStatus) GetValue

func (status JobStatus) GetValue() int

func (JobStatus) String

func (status JobStatus) String() string

type Lock

type Lock struct {
	LockID     string
	AttainedAt time.Time
}

Lock represents the construct for lock information

func NewLock

func NewLock(lockable Lockable) (lock *Lock, err error)

NewLock returns a new instance of lock from the lockable

type Lockable

type Lockable interface {
	GetLockID() string
}

Lockable represents the API necessary to lock an object for distributed MUTEX operation

type Message

type Message struct {
	BasePaginateable
	MessageID     string
	Payload       string
	ContentType   string
	Priority      uint
	Status        MsgStatus
	BroadcastedTo *Channel
	ProducedBy    *Producer
	ReceivedAt    time.Time
	OutboxedAt    time.Time
	Headers       HeadersMap
}

Message represents the main payload of the application to be delivered

func NewMessage

func NewMessage(channel *Channel, producedBy *Producer, payload, contentType string, headers HeadersMap) (*Message, error)

NewMessage creates and returns new instance of message

func (*Message) GetChannelIDSafely

func (message *Message) GetChannelIDSafely() (channelID string)

GetChannelIDSafely retrieves channel id account for the fact that BroadcastedTo may be null

func (*Message) GetLockID

func (message *Message) GetLockID() string

GetLockID retrieves lock ID for the current instance of message

func (*Message) IsInValidState

func (message *Message) IsInValidState() bool

IsInValidState returns false if any of message id or payload or content type is empty, channel is nil, callback URL is not url or not absolute URL, status not recognized, received at and outboxed at not set properly. Call QuickFix before IsInValidState is called.

func (*Message) QuickFix

func (message *Message) QuickFix() bool

QuickFix fixes the object state automatically as much as possible

type MessageStakeholder

type MessageStakeholder struct {
	BasePaginateable
	Name  string
	Token string
}

MessageStakeholder represents all objects around a message, for example, Producer, Channel, Consumer

type MsgStatus

type MsgStatus int

MsgStatus represents the state of a Msg.

func (MsgStatus) GetValue

func (status MsgStatus) GetValue() int

func (MsgStatus) String

func (status MsgStatus) String() string

type Paginateable

type Paginateable interface {
	GetCursor() (*Cursor, error)
}

Paginateable should be implemented by objects having xid.ID as field ID in DB and helps get cursor object

type Pagination

type Pagination struct {
	Next     *Cursor
	Previous *Cursor
}

Pagination represents a data structure to determine how to traverse a list

func NewPagination

func NewPagination(after Paginateable, before Paginateable) *Pagination

NewPagination returns a new pagination wrapper

type Producer

type Producer struct {
	MessageStakeholder
	ProducerID string
}

Producer represents generator of messages

func NewProducer

func NewProducer(producerID string, token string) (*Producer, error)

NewProducer creates new Producer

func (*Producer) IsInValidState

func (prod *Producer) IsInValidState() bool

IsInValidState returns false if any of producer id or name or token is empty

func (*Producer) QuickFix

func (prod *Producer) QuickFix() bool

QuickFix fixes the model to set default ID, name same as producer id, created and updated at to current time.

type ScheduledMessage added in v0.2.1

type ScheduledMessage struct {
	BasePaginateable
	MessageID        string
	Payload          string
	ContentType      string
	Priority         uint
	Status           ScheduledMsgStatus
	BroadcastedTo    *Channel
	ProducedBy       *Producer
	DispatchSchedule time.Time
	DispatchedAt     time.Time
	Headers          HeadersMap
}

ScheduledMessage represents a message scheduled for future delivery

func NewScheduledMessage added in v0.2.1

func NewScheduledMessage(channel *Channel, producedBy *Producer, payload, contentType string, dispatchSchedule time.Time, headers HeadersMap) (*ScheduledMessage, error)

NewScheduledMessage creates and returns new instance of scheduled message

func (*ScheduledMessage) GetChannelIDSafely added in v0.2.1

func (message *ScheduledMessage) GetChannelIDSafely() (channelID string)

GetChannelIDSafely retrieves channel id account for the fact that BroadcastedTo may be null

func (*ScheduledMessage) GetLockID added in v0.2.1

func (message *ScheduledMessage) GetLockID() string

GetLockID retrieves lock ID for the current instance of scheduled message

func (*ScheduledMessage) IsInValidState added in v0.2.1

func (message *ScheduledMessage) IsInValidState() bool

IsInValidState returns false if any of message id or payload or content type is empty, channel is nil, status not recognized, dispatch schedule not set properly. Call QuickFix before IsInValidState is called.

func (*ScheduledMessage) QuickFix added in v0.2.1

func (message *ScheduledMessage) QuickFix() bool

QuickFix fixes the object state automatically as much as possible

type ScheduledMsgStatus added in v0.2.1

type ScheduledMsgStatus int

ScheduledMsgStatus represents the state of a ScheduledMessage

func (ScheduledMsgStatus) GetValue added in v0.2.1

func (status ScheduledMsgStatus) GetValue() int

func (ScheduledMsgStatus) String added in v0.2.1

func (status ScheduledMsgStatus) String() string

type Status

type Status interface {
	String() string
	// GetValue returns the underlying status value.  This is necessary
	// since the String() method might perform formatting.
	GetValue() int
}

Status represents a generic status with string conversion.

type StatusCount

type StatusCount[T Status] struct {
	Status              T      `json:"status"`
	Count               int    `json:"count"`
	OldestItemTimestamp string `json:"oldestItemTimestamp"`
	NewestItemTimestamp string `json:"newestItemTimestamp"`
}

func (StatusCount[T]) String

func (sc StatusCount[T]) String() string

type Updateable

type Updateable interface {
	GetLastUpdatedHTTPTimeString() string
}

Updateable represents interface for objects that expose updated date

type ValidateableModel

type ValidateableModel interface {
	QuickFix() bool
	IsInValidState() bool
}

ValidateableModel model supporting this can be checked for valid state before write ops. Also allows for quick fix to be applied

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL