Documentation
¶
Index ¶
- Constants
- Variables
- func Consume(ctx context.Context, topic string, since time.Time, ...)
- func InitializeKafkaBroker(ctx context.Context) error
- func InitializeStubBroker(bufferSize int) error
- func InitializeTopicRequests(ctx context.Context)
- func Send(ctx context.Context, messages ...*GenericMessage) error
- type AvailabilityStatusMessage
- type Broker
- type GenericHeader
- type GenericMessage
- type NativeMessage
- type NotificationContext
- type NotificationError
- type NotificationEvent
- type NotificationMessage
- type SourceResult
- type StatusType
Constants ¶
View Source
const ( NotificationSuccessEventType = "launch-success" NotificationFailureEventType = "launch-failed" )
Variables ¶
View Source
var ( ErrDifferentTopic = errors.New("messages in batch have different topics") ErrUnknownSASLMechanism = errors.New("unknown SASL mechanism") )
View Source
var ( AvailabilityStatusRequestTopic string SourcesStatusTopic string NotificationTopic string )
topics after clowder mapping
Functions ¶
func InitializeKafkaBroker ¶
func InitializeStubBroker ¶
func InitializeTopicRequests ¶
InitializeTopicRequests performs clowder mapping of topics.
Types ¶
type AvailabilityStatusMessage ¶
type AvailabilityStatusMessage struct {
SourceID string `json:"source_id"`
}
func NewAvailabilityStatusMessage ¶
func NewAvailabilityStatusMessage(msg *GenericMessage) (*AvailabilityStatusMessage, error)
func (AvailabilityStatusMessage) GenericMessage ¶
func (m AvailabilityStatusMessage) GenericMessage(ctx context.Context) (GenericMessage, error)
type Broker ¶
type Broker interface {
// Send one or more messages to the kafka
Send(ctx context.Context, messages ...*GenericMessage) error
// Consume messages of a single topic in a loop. Blocking call, use context cancellation to stop.
Consume(ctx context.Context, topic string, since time.Time, handler func(ctx context.Context, message *GenericMessage))
}
func NewStubBroker ¶
type GenericHeader ¶
func GenericHeaders ¶
func GenericHeaders(args ...string) []GenericHeader
GenericHeaders returns slice of headers
type GenericMessage ¶
type GenericMessage struct {
// Topic of the message. Some producers already have associated topic, in that case Topic from the message will be ignored.
Topic string
// Key is used for topic partitioning. Can be nil.
Key []byte
// Value is the payload. Typically, a JSON marshaled data.
Value []byte
// List of key-value pairs for each message.
Headers []GenericHeader
}
GenericMessage is a platform independent message.
func NewMessageFromKafka ¶
func NewMessageFromKafka(km *kafka.Message) *GenericMessage
NewMessageFromKafka converts generic message to native message
func (GenericMessage) Header ¶
func (m GenericMessage) Header(name string) string
func (GenericMessage) KafkaMessage ¶
func (m GenericMessage) KafkaMessage() kafka.Message
KafkaMessage converts from generic to native message.
type NativeMessage ¶
type NativeMessage interface {
// GenericMessage returns a generic message that is platform independent.
GenericMessage(ctx context.Context) (GenericMessage, error)
}
NativeMessage represents a native (kafka) message. It can be converted to GenericMessage.
type NotificationContext ¶
type NotificationError ¶
type NotificationError struct {
Error string `json:"error"`
}
type NotificationEvent ¶
type NotificationEvent struct {
Payload json.RawMessage `json:"payload"`
}
type NotificationMessage ¶
type NotificationMessage struct {
Version string `json:"version"`
Bundle string `json:"bundle"`
Application string `json:"application"`
EventType string `json:"event_type"`
Timestamp string `json:"timestamp"`
AccountID string `json:"account_id"`
OrgId string `json:"org_id"`
Context interface{} `json:"context"`
Events []NotificationEvent `json:"events"`
Recipients []notificationRecipients `json:"recipients"`
ID string `json:"id"`
}
func (NotificationMessage) GenericMessage ¶
func (m NotificationMessage) GenericMessage(ctx context.Context) (GenericMessage, error)
type SourceResult ¶
type SourceResult struct {
MessageContext context.Context `json:"-"` // Carries logger and identity
ResourceID string `json:"resource_id"`
ResourceType string `json:"resource_type"`
Status StatusType `json:"status"`
UserError string `json:"error"`
Err error `json:"-"` // Sources do not support error field
MissingPermissions []string `json:"-"` // Sources do not support reason field
}
func (SourceResult) GenericMessage ¶
func (sr SourceResult) GenericMessage(ctx context.Context) (GenericMessage, error)
type StatusType ¶
type StatusType string
const ( StatusAvailable StatusType = "available" )
func (StatusType) String ¶
func (st StatusType) String() string
Click to show internal directories.
Click to hide internal directories.