Documentation
¶
Index ¶
Constants ¶
const ( IDHeaderKey = "_clinia_message_id" RetryCountHeaderKey = "_clinia_retry_count" )
const ( TopicSeparator = "." TopicRetrySuffix = TopicSeparator + "retry" )
const ConsumerGroupSeparator = "."
Variables ¶
This section is empty.
Functions ¶
func RenameConsumerGroupWithScope ¶ added in v0.0.130
RenameConsumerGroupWithScope renames the consumer group with the given scope. If the group does not have a valid format, it returns an error.
Types ¶
type ConsumerGroup ¶
type ConsumerGroup string
func ExtractScopeFromConsumerGroup ¶ added in v0.0.130
func ExtractScopeFromConsumerGroup(group string) (scope string, groupWithoutScope ConsumerGroup, err error)
ExtractScopeFromConsumerGroup extracts the scope and the consumer group name from the given consumer group string. It expects the format to be `{scope}.{group}`. If the scope is missing, it returns an error.
func (ConsumerGroup) ConsumerGroup ¶
func (group ConsumerGroup) ConsumerGroup(scope string) string
ConsumerGroup returns the consumer group name with the given scope. If the scope is empty, it returns the consumer group name as is. This should be used when interacting with the concrete pubsubs (e.g. Kafka).
type Message ¶
type Message struct {
ID string
// When specified, Key is used as the Kafka message key.
Key []byte
Metadata MessageMetadata
Payload []byte
// The offset of the message. This is readonly and only set when consuming.
Offset int64
}
Message intentionally has no json marshalling fields as we want to pass by our own kgox.DefaultMarshaler
func NewMessage ¶
func NewMessage(payload []byte, opts ...NewMessageOption) *Message
NewMessage creates a new Message with the given payload and options.
Parameters:
- payload: The payload of the message as a byte slice.
- opts: Optional parameters to customize the creation of the message. By default, a new UUID is generated for the message.
Returns:
- *Message: A pointer to the created Message.
func (*Message) ExtractTraceContext ¶ added in v0.0.82
func (*Message) InjectTraceContext ¶ added in v0.0.82
type MessageMetadata ¶
type MessageReferenceTracer ¶ added in v0.0.114
type MessageReferenceTracer interface {
Message() *Message
ReferenceMsgSpanName() string
// StartMessageProcessingSpan starts a new span with a bidirectional link with
// the original span associated with the creation of the provided Message.
//
// For the created span, a link will be added pointing to the original message's span.
// For the original message's span, a subspan will be created with `${msgReferenceSpanName}`
// which links back to the current span. If `msgReferenceSpanName` is empty, the default
// name "async processing reference" will be used.
//
// This function should be called for any asynchronous processing of the message to enable
// advanced tracing and provide a clear view of the message's lifecycle and its associated
// operations.
StartMessageProcessingSpan(ctx context.Context, name string, opts ...trace.SpanStartOption) (context.Context, trace.Span)
// AttachMessageProcessingSpan creates a bidirectional link between the current span
// and the original span associated with the creation of the provided Message.
// This is useful when the original message's span is already created and you want to
// attach the current span to it.
//
// This function should be called for any asynchronous processing of the message to enable
// advanced tracing and provide a clear view of the message's lifecycle and its associated
// operations.
AttachMessageProcessingSpan(ctx context.Context, span trace.Span)
// contains filtered or unexported methods
}
func NewMessageReferenceTracer ¶ added in v0.0.114
type NewMessageOption ¶ added in v0.0.141
type NewMessageOption func(*newMessageOptions)
func WithID ¶
func WithID(id string) NewMessageOption
WithID sets the ID of the message. A ksuid will be generated if no ID is provided.
func WithKey ¶ added in v0.0.141
func WithKey(key []byte) NewMessageOption
WithKey sets the key of the message.
func WithMetadata ¶
func WithMetadata(m MessageMetadata) NewMessageOption
WithMetadata sets the metadata of the message.
func WithStringKey ¶ added in v0.0.141
func WithStringKey(key string) NewMessageOption
WithStringKey sets the key of the message using a string.
type Topic ¶
type Topic string
func BaseTopicFromName ¶ added in v0.0.80
Expect topic to be format `{scope}.{topic}.{consumer-group}.retry` or `{scope}.{topic}` If the scope is missing, this function will return a wrong result
func ExtractScopeFromTopic ¶ added in v0.0.130
ExtractScopeFromTopic extracts the scope and topic from a topic string. It expects the topic to be in the format `{scope}.{topic}`. If the topic does not have a valid format, it returns an error.
func TopicFromName ¶
func (Topic) GenerateRetryTopic ¶ added in v0.0.80
func (t Topic) GenerateRetryTopic(consumerGroup ConsumerGroup) Topic
type TraceContextPropagator ¶ added in v0.0.81
type TraceContextPropagator struct {
// contains filtered or unexported fields
}
TraceContextPropagator is responsible for injecting and extracting trace context into and from message metadata.
func NewTraceContextPropagator ¶ added in v0.0.81
func NewTraceContextPropagator() TraceContextPropagator
func (*TraceContextPropagator) Extract ¶ added in v0.0.81
func (t *TraceContextPropagator) Extract(ctx context.Context, metadata MessageMetadata) context.Context
Extract extracts the trace context from the message metadata and returns a new context with the extracted trace context.