Documentation
¶
Index ¶
- Constants
- Variables
- func ApplyMetadata(cloudEvent map[string]interface{}, componentFeatures []Feature, ...)
- func ConvertTLSPropertiesToTLSConfig(properties TLSProperties) (*tls.Config, error)
- func FromCloudEvent(cloudEvent []byte, topic, pubsub, traceParent string, traceState string) (map[string]interface{}, error)
- func FromRawPayload(data []byte, topic, pubsub string) map[string]interface{}
- func HasExpired(cloudEvent map[string]interface{}) bool
- func NewCloudEventsEnvelope(id, source, eventType, subject string, topic string, pubsubName string, ...) map[string]interface{}
- func Ping(ctx context.Context, pubsub PubSub) error
- type AppBulkResponse
- type AppBulkResponseEntry
- type AppResponse
- type AppResponseStatus
- type BulkHandler
- type BulkMessage
- type BulkMessageEntry
- type BulkPublishRequest
- type BulkPublishResponse
- type BulkPublishResponseFailedEntry
- type BulkPublisher
- type BulkSubscribeConfig
- type BulkSubscribeResponse
- type BulkSubscribeResponseEntry
- type BulkSubscriber
- type ConcurrencyMode
- type Feature
- type Handler
- type Metadata
- type NewMessage
- type PausableSubscriber
- type PubSub
- type PublishRequest
- type SubscribeRequest
- type TLSProperties
Constants ¶
const ( // DefaultCloudEventType is the default event type for an Dapr published event. DefaultCloudEventType = "com.dapr.event.sent" // DefaultBulkEventType is the default bulk event type for a Dapr published event. DefaultBulkEventType = "com.dapr.event.sent.bulk" // CloudEventsSpecVersion is the specversion used by Dapr for the cloud events implementation. CloudEventsSpecVersion = "1.0" // DefaultCloudEventSource is the default event source. DefaultCloudEventSource = "Dapr" // DefaultCloudEventDataContentType is the default content-type for the data attribute. DefaultCloudEventDataContentType = "text/plain" // traceid, backwards compatibles. // ::TODO delete traceid, and keep traceparent. TraceIDField = "traceid" TraceParentField = "traceparent" TraceStateField = "tracestate" TopicField = "topic" PubsubField = "pubsubname" ExpirationField = "expiration" DataContentTypeField = "datacontenttype" DataField = "data" DataBase64Field = "data_base64" SpecVersionField = "specversion" TypeField = "type" SourceField = "source" IDField = "id" SubjectField = "subject" TimeField = "time" )
const ( // CACert is the metadata key name for the CA certificate. CACert = "caCert" // ClientCert is the metadata key name for the client certificate. ClientCert = "clientCert" // ClientKey is the metadata key name for the client key. ClientKey = "clientKey" )
const RuntimeConsumerIDKey = "consumerID"
When the Dapr component does not explicitly specify a consumer group, this value provided by the runtime must be used. This value is specific to each Dapr App. As a result, by default, each Dapr App will receive all messages published to the topic at least once. See https://github.com/dapr/dapr/blob/21566de8d7fdc7d43ae627ffc0698cc073fa71b0/pkg/runtime/runtime.go#L1735-L1739
Variables ¶
var ErrGracefulShutdown = errors.New("pubsub shutdown")
Functions ¶
func ApplyMetadata ¶ added in v1.0.0
func ApplyMetadata(cloudEvent map[string]interface{}, componentFeatures []Feature, metadata map[string]string)
ApplyMetadata will process metadata to modify the cloud event based on the component's feature set.
func ConvertTLSPropertiesToTLSConfig ¶ added in v1.10.1
func ConvertTLSPropertiesToTLSConfig(properties TLSProperties) (*tls.Config, error)
ConvertTLSPropertiesToTLSConfig converts the TLSProperties to a tls.Config.
func FromCloudEvent ¶ added in v1.0.0
func FromCloudEvent(cloudEvent []byte, topic, pubsub, traceParent string, traceState string) (map[string]interface{}, error)
FromCloudEvent returns a map representation of an existing cloudevents JSON.
func FromRawPayload ¶ added in v1.2.0
FromRawPayload returns a CloudEvent for a raw payload on subscriber's end.
func HasExpired ¶ added in v1.0.0
HasExpired determines if the current cloud event has expired.
func NewCloudEventsEnvelope ¶
func NewCloudEventsEnvelope(id, source, eventType, subject string, topic string, pubsubName string, dataContentType string, data []byte, traceParent string, traceState string, ) map[string]interface{}
NewCloudEventsEnvelope returns a map representation of a cloudevents JSON.
Types ¶
type AppBulkResponse ¶ added in v1.10.1
type AppBulkResponse struct {
AppResponses []AppBulkResponseEntry `json:"statuses"`
}
AppBulkResponse is the whole bulk subscribe response sent by App
type AppBulkResponseEntry ¶ added in v1.10.1
type AppBulkResponseEntry struct {
EntryId string `json:"entryId"` //nolint:stylecheck
Status AppResponseStatus `json:"status"`
}
AppBulkResponseEntry Represents single response, as part of AppBulkResponse, to be sent by subscibed App for the corresponding single message during bulk subscribe
type AppResponse ¶ added in v0.4.0
type AppResponse struct {
Status AppResponseStatus `json:"status"`
}
AppResponse is the object describing the response from user code after a pubsub event.
type AppResponseStatus ¶ added in v0.4.0
type AppResponseStatus string
AppResponseStatus represents a status of a PubSub response.
const ( // Success means the message is received and processed correctly. Success AppResponseStatus = "SUCCESS" // Retry means the message is received but could not be processed and must be retried. Retry AppResponseStatus = "RETRY" // Drop means the message is received but should not be processed. Drop AppResponseStatus = "DROP" )
type BulkHandler ¶ added in v1.10.1
type BulkHandler func(ctx context.Context, msg *BulkMessage) ([]BulkSubscribeResponseEntry, error)
[]BulkSubscribeResponseEntry represents individual statuses for each message in an orderly fashion.
type BulkMessage ¶ added in v1.10.1
type BulkMessage struct {
Entries []BulkMessageEntry `json:"entries"`
Topic string `json:"topic"`
Metadata map[string]string `json:"metadata"`
}
BulkMessage represents bulk message arriving from a message bus instance.
func (BulkMessage) String ¶ added in v1.10.1
func (m BulkMessage) String() string
String implements fmt.Stringer and it's useful for debugging.
type BulkMessageEntry ¶ added in v1.10.1
type BulkMessageEntry struct {
EntryId string `json:"entryId"` //nolint:stylecheck
Event []byte `json:"event"`
ContentType string `json:"contentType,omitempty"`
Metadata map[string]string `json:"metadata"`
}
BulkMessageEntry represents a single message inside a bulk request.
func (BulkMessageEntry) String ¶ added in v1.10.1
func (m BulkMessageEntry) String() string
String implements fmt.Stringer and it's useful for debugging.
type BulkPublishRequest ¶ added in v1.10.1
type BulkPublishRequest struct {
Entries []BulkMessageEntry `json:"entries"`
PubsubName string `json:"pubsubname"`
Topic string `json:"topic"`
Metadata map[string]string `json:"metadata"`
}
BulkPublishRequest is the request to publish mutilple messages.
type BulkPublishResponse ¶ added in v1.10.1
type BulkPublishResponse struct {
FailedEntries []BulkPublishResponseFailedEntry `json:"failedEntries"`
}
BulkPublishResponse contains the list of failed entries in a bulk publish request.
func NewBulkPublishResponse ¶ added in v1.10.1
func NewBulkPublishResponse(messages []BulkMessageEntry, err error) BulkPublishResponse
NewBulkPublishResponse returns a BulkPublishResponse with each entry having same error. This method is a helper method to map a single error response on BulkPublish to multiple events.
type BulkPublishResponseFailedEntry ¶ added in v1.10.1
type BulkPublishResponseFailedEntry struct {
EntryId string `json:"entryId"` //nolint:stylecheck
Error error `json:"error"`
}
BulkPublishResponseFailedEntry Represents single publish response, as part of BulkPublishResponse to be sent to publishing App for the corresponding single message during bulk publish
type BulkPublisher ¶ added in v1.10.1
type BulkPublisher interface {
BulkPublish(ctx context.Context, req *BulkPublishRequest) (BulkPublishResponse, error)
}
BulkPublish publishes a collection of entries/messages in a BulkPublishRequest to a message bus topic and returns a BulkPublishResponse with failed entries for any failed messages. Error is returned on partial or complete failures. If there are no failures, error is nil.
type BulkSubscribeConfig ¶ added in v1.10.1
type BulkSubscribeConfig struct {
MaxMessagesCount int `json:"maxMessagesCount,omitempty"`
MaxAwaitDurationMs int `json:"maxAwaitDurationMs,omitempty"`
}
BulkSubscribeConfig represents the configuration for bulk subscribe. It depends on specific componets to support these.
type BulkSubscribeResponse ¶ added in v1.10.1
type BulkSubscribeResponse struct {
Error error `json:"error"`
Statuses []BulkSubscribeResponseEntry `json:"statuses"`
}
BulkSubscribeResponse is the whole bulk subscribe response sent to building block
type BulkSubscribeResponseEntry ¶ added in v1.10.1
type BulkSubscribeResponseEntry struct {
EntryId string `json:"entryId"` //nolint:stylecheck
Error error `json:"error"`
}
BulkSubscribeResponseEntry Represents single subscribe response item, as part of BulkSubscribeResponse to be sent to building block for the corresponding single message during bulk subscribe
type BulkSubscriber ¶ added in v1.10.1
type BulkSubscriber interface {
// BulkSubscribe is used to subscribe to a topic and receive collection of entries/ messages
// from a message bus topic.
// The bulkHandler will be called with a list of messages.
BulkSubscribe(ctx context.Context, req SubscribeRequest, bulkHandler BulkHandler) error
}
BulkSubscriber is the interface defining BulkSubscribe definition for message buses
type ConcurrencyMode ¶ added in v1.0.0
type ConcurrencyMode string
ConcurrencyMode is a pub/sub metadata setting that allows to specify whether messages are delivered in a serial or parallel execution.
const ( // ConcurrencyKey is the metadata key name for ConcurrencyMode. ConcurrencyKey = "concurrencyMode" Single ConcurrencyMode = "single" Parallel ConcurrencyMode = "parallel" )
func Concurrency ¶ added in v1.0.0
func Concurrency(metadata map[string]string) (ConcurrencyMode, error)
Concurrency takes a metadata object and returns the ConcurrencyMode configured. Default is Parallel.
type Feature ¶ added in v1.0.0
Feature names a feature that can be implemented by PubSub components.
const ( // FeatureMessageTTL is the feature to handle message TTL. FeatureMessageTTL Feature = "MESSAGE_TTL" // FeatureSubscribeWildcards is the feature to allow subscribing to topics/queues using a wildcard. FeatureSubscribeWildcards Feature = "SUBSCRIBE_WILDCARDS" FeatureBulkPublish Feature = "BULK_PUBSUB" )
type Handler ¶ added in v1.2.0
type Handler func(ctx context.Context, msg *NewMessage) error
Handler is the handler used to invoke the app handler.
type NewMessage ¶
type NewMessage struct {
Data []byte `json:"data"`
Topic string `json:"topic"`
Metadata map[string]string `json:"metadata"`
ContentType *string `json:"contentType,omitempty"`
}
NewMessage is an event arriving from a message bus instance.
func (NewMessage) String ¶ added in v1.10.1
func (m NewMessage) String() string
String implements fmt.Stringer and it's useful for debugging.
type PausableSubscriber ¶ added in v1.17.4
type PausableSubscriber interface {
// Pause stops fetching new messages from the broker for all active
// subscriptions on this component. Already-buffered messages can still
// be processed.
Pause(ctx context.Context) error
// Resume resumes fetching new messages from the broker.
Resume(ctx context.Context) error
}
PausableSubscriber is an optional capability that lets the runtime pause fetching new messages from the broker without tearing down active subscriptions.
While paused, the broker stops delivering new messages but the session, connection, and partition assignments are preserved. Messages already buffered locally can continue to be processed by handlers — this enables a clean drain-during-shutdown pattern: pause, let the in-flight pipeline drain, then close.
Implementations must be safe to call concurrently with Subscribe and must be idempotent.
type PubSub ¶
type PubSub interface {
metadata.ComponentWithMetadata
Init(ctx context.Context, metadata Metadata) error
Features() []Feature
Publish(ctx context.Context, req *PublishRequest) error
Subscribe(ctx context.Context, req SubscribeRequest, handler Handler) error
io.Closer
}
PubSub is the interface for message buses.
type PublishRequest ¶
type PublishRequest struct {
Data []byte `json:"data"`
PubsubName string `json:"pubsubname"`
Topic string `json:"topic"`
Metadata map[string]string `json:"metadata"`
ContentType *string `json:"contentType,omitempty"`
}
PublishRequest is the request to publish a message.
type SubscribeRequest ¶
type SubscribeRequest struct {
Topic string `json:"topic"`
Metadata map[string]string `json:"metadata"`
BulkSubscribeConfig BulkSubscribeConfig `json:"bulkSubscribe,omitempty"`
}
SubscribeRequest is the request to subscribe to a topic.
type TLSProperties ¶ added in v1.10.1
TLSProperties is a struct that contains the TLS properties.