pubsub

package
v1.16.0-rc.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 28, 2025 License: Apache-2.0 Imports: 17 Imported by: 101

README

Pub Sub

Pub Sub components provide a common way to interact with different message bus implementations to achieve reliable, high-scale scenarios based on event-driven async communications, while allowing users to opt-in to advanced capabilities using defined metadata.

Implementing a new Pub Sub

A compliant pub sub needs to implement the PubSub inteface included in the pubsub.go file.

Message TTL (or Time To Live)

Message Time to live is implemented by default in Dapr. A publishing application can set the expiration of individual messages by publishing it with the ttlInSeconds metadata. Components that support message TTL should parse this metadata attribute. For components that do not implement this feature in Dapr, the runtime will automatically populate the expiration attribute in the CloudEvent object if ttlInSeconds is present - in this case, Dapr will expire the message when a Dapr subscriber is about to consume an expired message. The expiration attribute is handled by Dapr runtime as a convenience to subscribers, dropping expired messages without invoking subscribers' endpoint. Subscriber applications that don't use Dapr, need to handle this attribute and implement the expiration logic.

If the pub sub component implementation can handle message TTL natively without relying on Dapr, consume the ttlInSeconds metadata in the component implementation for the Publish function. Also, implement the Features() function so the Dapr runtime knows that it should not add the expiration attribute to events.

Example:

import contribMetadata "github.com/dapr/components-contrib/metadata"

//...

func (c *MyComponent) Publish(req *pubsub.PublishRequest) error {
	//...
	ttl, hasTTL, _ := contribMetadata.TryGetTTL(req.Metadata)
	if hasTTL {
		//... handle ttl for component.
	}
	//...
	return nil
}

func (c *MyComponent) Features() []pubsub.Feature {
	// Tip: cache this list into a private property.
	// Simply return nil if component does not implement any addition features.
	return []pubsub.Feature{pubsub.FeatureMessageTTL}
}

For pub sub components that support TTL per topic or queue but not per message, there are some design choices:

  • Configure the TTL for the topic or queue as usual. Optionally, implement topic or queue provisioning in the Init() method, using the component configuration's metadata to determine the topic or queue TTL.
  • Let Dapr runtime handle ttlInSeconds for messages that want to expire earlier than the topic's or queue's TTL. So, applications can still benefit from TTL per message via Dapr for this scenario.

Note: as per the CloudEvent spec, timestamps (like expiration) are formatted using RFC3339.

Documentation

Index

Constants

View Source
const (
	// DefaultCloudEventType is the default event type for an Dapr published event.
	DefaultCloudEventType = "com.dapr.event.sent"
	// DefaultBulkEventType is the default bulk event type for a Dapr published event.
	DefaultBulkEventType = "com.dapr.event.sent.bulk"
	// CloudEventsSpecVersion is the specversion used by Dapr for the cloud events implementation.
	CloudEventsSpecVersion = "1.0"
	// DefaultCloudEventSource is the default event source.
	DefaultCloudEventSource = "Dapr"
	// DefaultCloudEventDataContentType is the default content-type for the data attribute.
	DefaultCloudEventDataContentType = "text/plain"
	// traceid, backwards compatibles.
	// ::TODO delete traceid, and keep traceparent.
	TraceIDField         = "traceid"
	TraceParentField     = "traceparent"
	TraceStateField      = "tracestate"
	TopicField           = "topic"
	PubsubField          = "pubsubname"
	ExpirationField      = "expiration"
	DataContentTypeField = "datacontenttype"
	DataField            = "data"
	DataBase64Field      = "data_base64"
	SpecVersionField     = "specversion"
	TypeField            = "type"
	SourceField          = "source"
	IDField              = "id"
	SubjectField         = "subject"
	TimeField            = "time"
)
View Source
const (
	// CACert is the metadata key name for the CA certificate.
	CACert = "caCert"
	// ClientCert is the metadata key name for the client certificate.
	ClientCert = "clientCert"
	// ClientKey is the metadata key name for the client key.
	ClientKey = "clientKey"
)
View Source
const RuntimeConsumerIDKey = "consumerID"

When the Dapr component does not explicitly specify a consumer group, this value provided by the runtime must be used. This value is specific to each Dapr App. As a result, by default, each Dapr App will receive all messages published to the topic at least once. See https://github.com/dapr/dapr/blob/21566de8d7fdc7d43ae627ffc0698cc073fa71b0/pkg/runtime/runtime.go#L1735-L1739

Variables

View Source
var ErrGracefulShutdown = errors.New("pubsub shutdown")

Functions

func ApplyMetadata added in v1.0.0

func ApplyMetadata(cloudEvent map[string]interface{}, componentFeatures []Feature, metadata map[string]string)

ApplyMetadata will process metadata to modify the cloud event based on the component's feature set.

func ConvertTLSPropertiesToTLSConfig added in v1.10.1

func ConvertTLSPropertiesToTLSConfig(properties TLSProperties) (*tls.Config, error)

ConvertTLSPropertiesToTLSConfig converts the TLSProperties to a tls.Config.

func FromCloudEvent added in v1.0.0

func FromCloudEvent(cloudEvent []byte, topic, pubsub, traceParent string, traceState string) (map[string]interface{}, error)

FromCloudEvent returns a map representation of an existing cloudevents JSON.

func FromRawPayload added in v1.2.0

func FromRawPayload(data []byte, topic, pubsub string) map[string]interface{}

FromRawPayload returns a CloudEvent for a raw payload on subscriber's end.

func HasExpired added in v1.0.0

func HasExpired(cloudEvent map[string]interface{}) bool

HasExpired determines if the current cloud event has expired.

func NewCloudEventsEnvelope

func NewCloudEventsEnvelope(id, source, eventType, subject string, topic string, pubsubName string,
	dataContentType string, data []byte, traceParent string, traceState string,
) map[string]interface{}

NewCloudEventsEnvelope returns a map representation of a cloudevents JSON.

func Ping added in v1.8.0

func Ping(ctx context.Context, pubsub PubSub) error

Types

type AppBulkResponse added in v1.10.1

type AppBulkResponse struct {
	AppResponses []AppBulkResponseEntry `json:"statuses"`
}

AppBulkResponse is the whole bulk subscribe response sent by App

type AppBulkResponseEntry added in v1.10.1

type AppBulkResponseEntry struct {
	EntryId string            `json:"entryId"` //nolint:stylecheck
	Status  AppResponseStatus `json:"status"`
}

AppBulkResponseEntry Represents single response, as part of AppBulkResponse, to be sent by subscibed App for the corresponding single message during bulk subscribe

type AppResponse added in v0.4.0

type AppResponse struct {
	Status AppResponseStatus `json:"status"`
}

AppResponse is the object describing the response from user code after a pubsub event.

type AppResponseStatus added in v0.4.0

type AppResponseStatus string

AppResponseStatus represents a status of a PubSub response.

const (
	// Success means the message is received and processed correctly.
	Success AppResponseStatus = "SUCCESS"
	// Retry means the message is received but could not be processed and must be retried.
	Retry AppResponseStatus = "RETRY"
	// Drop means the message is received but should not be processed.
	Drop AppResponseStatus = "DROP"
)

type BulkHandler added in v1.10.1

type BulkHandler func(ctx context.Context, msg *BulkMessage) ([]BulkSubscribeResponseEntry, error)

[]BulkSubscribeResponseEntry represents individual statuses for each message in an orderly fashion.

type BulkMessage added in v1.10.1

type BulkMessage struct {
	Entries  []BulkMessageEntry `json:"entries"`
	Topic    string             `json:"topic"`
	Metadata map[string]string  `json:"metadata"`
}

BulkMessage represents bulk message arriving from a message bus instance.

func (BulkMessage) String added in v1.10.1

func (m BulkMessage) String() string

String implements fmt.Stringer and it's useful for debugging.

type BulkMessageEntry added in v1.10.1

type BulkMessageEntry struct {
	EntryId     string            `json:"entryId"` //nolint:stylecheck
	Event       []byte            `json:"event"`
	ContentType string            `json:"contentType,omitempty"`
	Metadata    map[string]string `json:"metadata"`
}

BulkMessageEntry represents a single message inside a bulk request.

func (BulkMessageEntry) String added in v1.10.1

func (m BulkMessageEntry) String() string

String implements fmt.Stringer and it's useful for debugging.

type BulkPublishRequest added in v1.10.1

type BulkPublishRequest struct {
	Entries    []BulkMessageEntry `json:"entries"`
	PubsubName string             `json:"pubsubname"`
	Topic      string             `json:"topic"`
	Metadata   map[string]string  `json:"metadata"`
}

BulkPublishRequest is the request to publish mutilple messages.

type BulkPublishResponse added in v1.10.1

type BulkPublishResponse struct {
	FailedEntries []BulkPublishResponseFailedEntry `json:"failedEntries"`
}

BulkPublishResponse contains the list of failed entries in a bulk publish request.

func NewBulkPublishResponse added in v1.10.1

func NewBulkPublishResponse(messages []BulkMessageEntry, err error) BulkPublishResponse

NewBulkPublishResponse returns a BulkPublishResponse with each entry having same error. This method is a helper method to map a single error response on BulkPublish to multiple events.

type BulkPublishResponseFailedEntry added in v1.10.1

type BulkPublishResponseFailedEntry struct {
	EntryId string `json:"entryId"` //nolint:stylecheck
	Error   error  `json:"error"`
}

BulkPublishResponseFailedEntry Represents single publish response, as part of BulkPublishResponse to be sent to publishing App for the corresponding single message during bulk publish

type BulkPublisher added in v1.10.1

type BulkPublisher interface {
	BulkPublish(ctx context.Context, req *BulkPublishRequest) (BulkPublishResponse, error)
}

BulkPublish publishes a collection of entries/messages in a BulkPublishRequest to a message bus topic and returns a BulkPublishResponse with failed entries for any failed messages. Error is returned on partial or complete failures. If there are no failures, error is nil.

type BulkSubscribeConfig added in v1.10.1

type BulkSubscribeConfig struct {
	MaxMessagesCount   int `json:"maxMessagesCount,omitempty"`
	MaxAwaitDurationMs int `json:"maxAwaitDurationMs,omitempty"`
}

BulkSubscribeConfig represents the configuration for bulk subscribe. It depends on specific componets to support these.

type BulkSubscribeResponse added in v1.10.1

type BulkSubscribeResponse struct {
	Error    error                        `json:"error"`
	Statuses []BulkSubscribeResponseEntry `json:"statuses"`
}

BulkSubscribeResponse is the whole bulk subscribe response sent to building block

type BulkSubscribeResponseEntry added in v1.10.1

type BulkSubscribeResponseEntry struct {
	EntryId string `json:"entryId"` //nolint:stylecheck
	Error   error  `json:"error"`
}

BulkSubscribeResponseEntry Represents single subscribe response item, as part of BulkSubscribeResponse to be sent to building block for the corresponding single message during bulk subscribe

type BulkSubscriber added in v1.10.1

type BulkSubscriber interface {
	// BulkSubscribe is used to subscribe to a topic and receive collection of entries/ messages
	// from a message bus topic.
	// The bulkHandler will be called with a list of messages.
	BulkSubscribe(ctx context.Context, req SubscribeRequest, bulkHandler BulkHandler) error
}

BulkSubscriber is the interface defining BulkSubscribe definition for message buses

type ConcurrencyMode added in v1.0.0

type ConcurrencyMode string

ConcurrencyMode is a pub/sub metadata setting that allows to specify whether messages are delivered in a serial or parallel execution.

const (
	// ConcurrencyKey is the metadata key name for ConcurrencyMode.
	ConcurrencyKey                 = "concurrencyMode"
	Single         ConcurrencyMode = "single"
	Parallel       ConcurrencyMode = "parallel"
)

func Concurrency added in v1.0.0

func Concurrency(metadata map[string]string) (ConcurrencyMode, error)

Concurrency takes a metadata object and returns the ConcurrencyMode configured. Default is Parallel.

type Feature added in v1.0.0

type Feature = features.Feature[PubSub]

Feature names a feature that can be implemented by PubSub components.

const (
	// FeatureMessageTTL is the feature to handle message TTL.
	FeatureMessageTTL Feature = "MESSAGE_TTL"
	// FeatureSubscribeWildcards is the feature to allow subscribing to topics/queues using a wildcard.
	FeatureSubscribeWildcards Feature = "SUBSCRIBE_WILDCARDS"
	FeatureBulkPublish        Feature = "BULK_PUBSUB"
)

type Handler added in v1.2.0

type Handler func(ctx context.Context, msg *NewMessage) error

Handler is the handler used to invoke the app handler.

type Metadata

type Metadata struct {
	metadata.Base `json:",inline"`
}

Metadata represents a set of message-bus specific properties.

type NewMessage

type NewMessage struct {
	Data        []byte            `json:"data"`
	Topic       string            `json:"topic"`
	Metadata    map[string]string `json:"metadata"`
	ContentType *string           `json:"contentType,omitempty"`
}

NewMessage is an event arriving from a message bus instance.

func (NewMessage) String added in v1.10.1

func (m NewMessage) String() string

String implements fmt.Stringer and it's useful for debugging.

type PubSub

type PubSub interface {
	metadata.ComponentWithMetadata

	Init(ctx context.Context, metadata Metadata) error
	Features() []Feature
	Publish(ctx context.Context, req *PublishRequest) error
	Subscribe(ctx context.Context, req SubscribeRequest, handler Handler) error
	io.Closer
}

PubSub is the interface for message buses.

type PublishRequest

type PublishRequest struct {
	Data        []byte            `json:"data"`
	PubsubName  string            `json:"pubsubname"`
	Topic       string            `json:"topic"`
	Metadata    map[string]string `json:"metadata"`
	ContentType *string           `json:"contentType,omitempty"`
}

PublishRequest is the request to publish a message.

type SubscribeRequest

type SubscribeRequest struct {
	Topic               string              `json:"topic"`
	Metadata            map[string]string   `json:"metadata"`
	BulkSubscribeConfig BulkSubscribeConfig `json:"bulkSubscribe,omitempty"`
}

SubscribeRequest is the request to subscribe to a topic.

type TLSProperties added in v1.10.1

type TLSProperties struct {
	CACert     string
	ClientCert string
	ClientKey  string
}

TLSProperties is a struct that contains the TLS properties.

func TLS added in v1.10.1

func TLS(metadata map[string]string) (TLSProperties, error)

TLS takes a metadata object and returns the TLSProperties configured.

Directories

Path Synopsis
aws
azure
gcp
solace

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL