Documentation
¶
Index ¶
- Constants
- Variables
- type App
- type AppStatus
- type BasePaginateable
- type Channel
- type Consumer
- type ConsumerType
- type Cursor
- type DeliveryJob
- type HeadersMap
- type JobStatus
- type Lock
- type Lockable
- type Message
- type MessageStakeholder
- type MsgStatus
- type Paginateable
- type Pagination
- type Producer
- type ScheduledMessage
- type ScheduledMsgStatus
- type Status
- type StatusCount
- type Updateable
- type ValidateableModel
Constants ¶
const ( PullConsumer ConsumerType = iota PushConsumer PullConsumerStr = "pull" PushConsumerStr = "push" )
const ( // JobQueued is the job status during first attempt JobQueued JobStatus = iota + 1000 // JobInflight is to signify that the DeliveryJob is in its first attempt JobInflight // JobDelivered signifies that the DeliveryJob received 2XX status from consumer JobDelivered // JobDead signifies that retry has taken its toll and max retried happened JobDead // JobQueuedStr is the string rep of JobQueued JobQueuedStr = "QUEUED" // JobInflightStr is the string rep of JobInflight JobInflightStr = "INFLIGHT" // JobDeliveredStr is the string rep of JobDelivered JobDeliveredStr = "DELIVERED" // JobDeadStr is the string rep of JobDead JobDeadStr = "DEAD" )
const ( // MsgStatusAcknowledged represents the state after receiving the message but before it is dispatched MsgStatusAcknowledged MsgStatus = iota + 100 // MsgStatusDispatched represents the fact that the dispatch jobs have been created for the message MsgStatusDispatched // MsgStatusAcknowledgedStr is the string representation of message's acknowledged status MsgStatusAcknowledgedStr = "ACKNOWLEDGED" // MsgStatusDispatchedStr is the string representation of message's dispatched status MsgStatusDispatchedStr = "DISPATCHED" )
const ( // ScheduledMsgStatusScheduled represents the state where message is waiting to be dispatched ScheduledMsgStatusScheduled ScheduledMsgStatus = iota + 100 // ScheduledMsgStatusDispatched represents when message has been dispatched ScheduledMsgStatusDispatched // ScheduledMsgStatusScheduledStr is the string representation of message's scheduled status ScheduledMsgStatusScheduledStr = "SCHEDULED" // ScheduledMsgStatusDispatchedStr is the string representation of message's dispatched status ScheduledMsgStatusDispatchedStr = "DISPATCHED" )
Variables ¶
var ( // ErrInsufficientInformationForCreating is returned when NewProducer is called with insufficient information ErrInsufficientInformationForCreating = errors.New("Necessary information missing for persistence") )
var ( // ErrLockableNil represents the error returned when lockable is nil in NewLock ErrLockableNil = errors.New("lockable can not be nil") )
Functions ¶
This section is empty.
Types ¶
type App ¶
type App struct {
// contains filtered or unexported fields
}
App represents this application state for cross cluster use
func (*App) GetSeedData ¶
GetSeedData retrieves the current seed data config of the App. In NonInitialized status it can be nil
type BasePaginateable ¶
BasePaginateable provides common functionalities around paginateable objects
func (*BasePaginateable) GetCursor ¶
func (paginateable *BasePaginateable) GetCursor() (cursor *Cursor, err error)
GetCursor returns the cursor value for this producer
func (*BasePaginateable) GetLastUpdatedHTTPTimeString ¶
func (paginateable *BasePaginateable) GetLastUpdatedHTTPTimeString() string
GetLastUpdatedHTTPTimeString exposes the string rep of the last modified timestamp for the object
func (*BasePaginateable) QuickFix ¶
func (paginateable *BasePaginateable) QuickFix() bool
QuickFix fixes base paginate-able model's attribute
type Channel ¶
type Channel struct {
MessageStakeholder
ChannelID string
}
Channel is the object that producer broadcasts to and consumer consumes from
func NewChannel ¶
NewChannel creates new Consumer
func (*Channel) IsInValidState ¶
IsInValidState returns false if any of channel id or name or token is empty
type Consumer ¶
type Consumer struct {
MessageStakeholder
ConsumerID string
CallbackURL string
ConsumingFrom *Channel
Type ConsumerType
}
Consumer is the object that producer broadcasts to and consumer consumes from
func NewConsumer ¶
func NewConsumer(channel *Channel, consumerID, token string, callbackURL *url.URL, consumerTypeStr string) (*Consumer, error)
NewConsumer creates new Consumer
func (*Consumer) GetChannelIDSafely ¶
GetChannelIDSafely retrieves channel id account for the fact that ConsumingFrom may be null
func (*Consumer) IsInValidState ¶
IsInValidState returns false if any of consumer id or name or token is empty, channel is not nil and callback URL is absolute URL
type ConsumerType ¶
type ConsumerType int
func (ConsumerType) String ¶
func (consumerType ConsumerType) String() string
type Cursor ¶
Cursor represents a string used for pagination
func ParseCursor ¶
ParseCursor creates Cursor from its string representation
type DeliveryJob ¶
type DeliveryJob struct {
BasePaginateable
Message *Message
Listener *Consumer
Status JobStatus
StatusChangedAt time.Time
DispatchReceivedAt time.Time
EarliestNextAttemptAt time.Time
RetryAttemptCount uint
Priority uint
IncrementalTimeout uint // in seconds
}
DeliveryJob represents the DTO object for deliverying a Message to a consumer
func NewDeliveryJob ¶
func NewDeliveryJob(msg *Message, consumer *Consumer) (job *DeliveryJob, err error)
NewDeliveryJob creates a new instance of DeliveryJob; returns insufficient info error if parameters are not valid for a new DeliveryJob
func (*DeliveryJob) GetLockID ¶
func (job *DeliveryJob) GetLockID() string
GetLockID retrieves the Lock ID representing this instance of DeliveryJob
func (*DeliveryJob) IsInValidState ¶
func (job *DeliveryJob) IsInValidState() bool
IsInValidState returns false if any of message id or payload or content type is empty, channel is nil, callback URL is not url or not absolute URL, status not recognized, received at and outboxed at not set properly. Call QuickFix before IsInValidState is called.
func (*DeliveryJob) QuickFix ¶
func (job *DeliveryJob) QuickFix() bool
QuickFix fixes the object state automatically as much as possible
type HeadersMap ¶
func (*HeadersMap) Scan ¶
func (hmap *HeadersMap) Scan(value interface{}) error
type Lockable ¶
type Lockable interface {
GetLockID() string
}
Lockable represents the API necessary to lock an object for distributed MUTEX operation
type Message ¶
type Message struct {
BasePaginateable
MessageID string
Payload string
ContentType string
Priority uint
Status MsgStatus
BroadcastedTo *Channel
ProducedBy *Producer
ReceivedAt time.Time
OutboxedAt time.Time
Headers HeadersMap
}
Message represents the main payload of the application to be delivered
func NewMessage ¶
func NewMessage(channel *Channel, producedBy *Producer, payload, contentType string, headers HeadersMap) (*Message, error)
NewMessage creates and returns new instance of message
func (*Message) GetChannelIDSafely ¶
GetChannelIDSafely retrieves channel id account for the fact that BroadcastedTo may be null
func (*Message) IsInValidState ¶
IsInValidState returns false if any of message id or payload or content type is empty, channel is nil, callback URL is not url or not absolute URL, status not recognized, received at and outboxed at not set properly. Call QuickFix before IsInValidState is called.
type MessageStakeholder ¶
type MessageStakeholder struct {
BasePaginateable
Name string
Token string
}
MessageStakeholder represents all objects around a message, for example, Producer, Channel, Consumer
type Paginateable ¶
Paginateable should be implemented by objects having xid.ID as field ID in DB and helps get cursor object
type Pagination ¶
Pagination represents a data structure to determine how to traverse a list
func NewPagination ¶
func NewPagination(after Paginateable, before Paginateable) *Pagination
NewPagination returns a new pagination wrapper
type Producer ¶
type Producer struct {
MessageStakeholder
ProducerID string
}
Producer represents generator of messages
func NewProducer ¶
NewProducer creates new Producer
func (*Producer) IsInValidState ¶
IsInValidState returns false if any of producer id or name or token is empty
type ScheduledMessage ¶ added in v0.2.1
type ScheduledMessage struct {
BasePaginateable
MessageID string
Payload string
ContentType string
Priority uint
Status ScheduledMsgStatus
BroadcastedTo *Channel
ProducedBy *Producer
DispatchSchedule time.Time
DispatchedAt time.Time
Headers HeadersMap
}
ScheduledMessage represents a message scheduled for future delivery
func NewScheduledMessage ¶ added in v0.2.1
func NewScheduledMessage(channel *Channel, producedBy *Producer, payload, contentType string, dispatchSchedule time.Time, headers HeadersMap) (*ScheduledMessage, error)
NewScheduledMessage creates and returns new instance of scheduled message
func (*ScheduledMessage) GetChannelIDSafely ¶ added in v0.2.1
func (message *ScheduledMessage) GetChannelIDSafely() (channelID string)
GetChannelIDSafely retrieves channel id account for the fact that BroadcastedTo may be null
func (*ScheduledMessage) GetLockID ¶ added in v0.2.1
func (message *ScheduledMessage) GetLockID() string
GetLockID retrieves lock ID for the current instance of scheduled message
func (*ScheduledMessage) IsInValidState ¶ added in v0.2.1
func (message *ScheduledMessage) IsInValidState() bool
IsInValidState returns false if any of message id or payload or content type is empty, channel is nil, status not recognized, dispatch schedule not set properly. Call QuickFix before IsInValidState is called.
func (*ScheduledMessage) QuickFix ¶ added in v0.2.1
func (message *ScheduledMessage) QuickFix() bool
QuickFix fixes the object state automatically as much as possible
type ScheduledMsgStatus ¶ added in v0.2.1
type ScheduledMsgStatus int
ScheduledMsgStatus represents the state of a ScheduledMessage
func (ScheduledMsgStatus) GetValue ¶ added in v0.2.1
func (status ScheduledMsgStatus) GetValue() int
func (ScheduledMsgStatus) String ¶ added in v0.2.1
func (status ScheduledMsgStatus) String() string
type Status ¶
type Status interface {
String() string
// GetValue returns the underlying status value. This is necessary
// since the String() method might perform formatting.
GetValue() int
}
Status represents a generic status with string conversion.
type StatusCount ¶
type StatusCount[T Status] struct { Status T `json:"status"` Count int `json:"count"` OldestItemTimestamp string `json:"oldestItemTimestamp"` NewestItemTimestamp string `json:"newestItemTimestamp"` }
func (StatusCount[T]) String ¶
func (sc StatusCount[T]) String() string
type Updateable ¶
type Updateable interface {
GetLastUpdatedHTTPTimeString() string
}
Updateable represents interface for objects that expose updated date
type ValidateableModel ¶
ValidateableModel model supporting this can be checked for valid state before write ops. Also allows for quick fix to be applied