Documentation
¶
Index ¶
Constants ¶
const ( IDHeaderKey = "_clinia_message_id" RetryCountHeaderKey = "_clinia_retry_count" )
const ( TopicSeparator = "." TopicRetrySuffix = TopicSeparator + "retry" )
Variables ¶
This section is empty.
Functions ¶
func WithID ¶
func WithID(id string) newMessageOption
WithID sets the ID of the message. A ksuid will be generated if no ID is provided.
func WithMetadata ¶
func WithMetadata(m MessageMetadata) newMessageOption
WithMetadata sets the metadata of the message.
Types ¶
type ConsumerGroup ¶
type ConsumerGroup string
func (ConsumerGroup) ConsumerGroup ¶
func (group ConsumerGroup) ConsumerGroup(scope string) string
ConsumerGroup returns the consumer group name with the given scope. If the scope is empty, it returns the consumer group name as is. This should be used when interacting with the concrete pubsubs (e.g. Kafka).
type Message ¶
type Message struct {
ID string
Metadata MessageMetadata
Payload []byte
}
Message intentionally has no json marshalling fields as we want to pass by our own kgox.DefaultMarshaler
func NewMessage ¶
NewMessage creates a new Message with the given payload and options.
Parameters:
- payload: The payload of the message as a byte slice.
- opts: Optional parameters to customize the creation of the message. By default, a new UUID is generated for the message.
Returns:
- *Message: A pointer to the created Message.
func (*Message) ExtractTraceContext ¶ added in v0.0.82
func (*Message) InjectTraceContext ¶ added in v0.0.82
type MessageMetadata ¶
type MessageReferenceTracer ¶ added in v0.0.114
type MessageReferenceTracer interface {
Message() *Message
ReferenceMsgSpanName() string
// StartMessageProcessingSpan starts a new span with a bidirectional link with
// the original span associated with the creation of the provided Message.
//
// For the created span, a link will be added pointing to the original message's span.
// For the original message's span, a subspan will be created with `${msgReferenceSpanName}`
// which links back to the current span. If `msgReferenceSpanName` is empty, the default
// name "async processing reference" will be used.
//
// This function should be called for any asynchronous processing of the message to enable
// advanced tracing and provide a clear view of the message's lifecycle and its associated
// operations.
StartMessageProcessingSpan(ctx context.Context, name string, opts ...trace.SpanStartOption) (context.Context, trace.Span)
// AttachMessageProcessingSpan creates a bidirectional link between the current span
// and the original span associated with the creation of the provided Message.
// This is useful when the original message's span is already created and you want to
// attach the current span to it.
//
// This function should be called for any asynchronous processing of the message to enable
// advanced tracing and provide a clear view of the message's lifecycle and its associated
// operations.
AttachMessageProcessingSpan(ctx context.Context, span trace.Span)
// contains filtered or unexported methods
}
func NewMessageReferenceTracer ¶ added in v0.0.114
type Topic ¶
type Topic string
func BaseTopicFromName ¶ added in v0.0.80
Expect topic to be format `{scope}.{topic}.{consumer-group}.retry` or `{scope}.{topic}` If the scope is missing, this function will return a wrong result
func TopicFromName ¶
func (Topic) GenerateRetryTopic ¶ added in v0.0.80
func (t Topic) GenerateRetryTopic(consumerGroup ConsumerGroup) Topic
type TraceContextPropagator ¶ added in v0.0.81
type TraceContextPropagator struct {
// contains filtered or unexported fields
}
TraceContextPropagator is responsible for injecting and extracting trace context into and from message metadata.
func NewTraceContextPropagator ¶ added in v0.0.81
func NewTraceContextPropagator() TraceContextPropagator
func (*TraceContextPropagator) Extract ¶ added in v0.0.81
func (t *TraceContextPropagator) Extract(ctx context.Context, metadata MessageMetadata) context.Context
Extract extracts the trace context from the message metadata and returns a new context with the extracted trace context.