messagex

package
v0.0.153 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 23, 2026 License: Apache-2.0 Imports: 7 Imported by: 0

Documentation

Index

Constants

View Source
const (
	IDHeaderKey         = "_clinia_message_id"
	RetryCountHeaderKey = "_clinia_retry_count"
)
View Source
const (
	TopicSeparator   = "."
	TopicRetrySuffix = TopicSeparator + "retry"
)
View Source
const ConsumerGroupSeparator = "."

Variables

This section is empty.

Functions

func RenameConsumerGroupWithScope added in v0.0.130

func RenameConsumerGroupWithScope(group string, scope string) (string, error)

RenameConsumerGroupWithScope renames the consumer group with the given scope. If the group does not have a valid format, it returns an error.

func RenameTopicWithScope added in v0.0.130

func RenameTopicWithScope(topic string, scope string) (string, error)

RenameTopicWithScope renames a topic by replacing the existing scope with the given scope.

Types

type ConsumerGroup

type ConsumerGroup string

func ExtractScopeFromConsumerGroup added in v0.0.130

func ExtractScopeFromConsumerGroup(group string) (scope string, groupWithoutScope ConsumerGroup, err error)

ExtractScopeFromConsumerGroup extracts the scope and the consumer group name from the given consumer group string. It expects the format to be `{scope}.{group}`. If the scope is missing, it returns an error.

func (ConsumerGroup) ConsumerGroup

func (group ConsumerGroup) ConsumerGroup(scope string) string

ConsumerGroup returns the consumer group name with the given scope. If the scope is empty, it returns the consumer group name as is. This should be used when interacting with the concrete pubsubs (e.g. Kafka).

type Message

type Message struct {
	ID string
	// When specified, Key is used as the Kafka message key.
	Key      []byte
	Metadata MessageMetadata
	Payload  []byte
	// The offset of the message. This is readonly and only set when consuming.
	Offset int64
}

Message intentionally has no json marshalling fields as we want to pass by our own kgox.DefaultMarshaler

func NewMessage

func NewMessage(payload []byte, opts ...NewMessageOption) *Message

NewMessage creates a new Message with the given payload and options.

Parameters:

  • payload: The payload of the message as a byte slice.
  • opts: Optional parameters to customize the creation of the message. By default, a new UUID is generated for the message.

Returns:

  • *Message: A pointer to the created Message.

func (*Message) Copy added in v0.0.80

func (m *Message) Copy() *Message

func (*Message) ExtractTraceContext added in v0.0.82

func (m *Message) ExtractTraceContext(ctx context.Context) context.Context

func (*Message) InjectTraceContext added in v0.0.82

func (m *Message) InjectTraceContext(ctx context.Context)

func (*Message) WithSpan added in v0.0.79

func (m *Message) WithSpan(ctx context.Context, tracer trace.Tracer, spanPrefix string, opts ...trace.SpanStartOption) (context.Context, trace.Span)

type MessageMetadata

type MessageMetadata map[string]string

type MessageReferenceTracer added in v0.0.114

type MessageReferenceTracer interface {
	Message() *Message
	ReferenceMsgSpanName() string

	// StartMessageProcessingSpan starts a new span with a bidirectional link with
	// the original span associated with the creation of the provided Message.
	//
	// For the created span, a link will be added pointing to the original message's span.
	// For the original message's span, a subspan will be created with `${msgReferenceSpanName}`
	// which links back to the current span. If `msgReferenceSpanName` is empty, the default
	// name "async processing reference" will be used.
	//
	// This function should be called for any asynchronous processing of the message to enable
	// advanced tracing and provide a clear view of the message's lifecycle and its associated
	// operations.
	StartMessageProcessingSpan(ctx context.Context, name string, opts ...trace.SpanStartOption) (context.Context, trace.Span)
	// AttachMessageProcessingSpan creates a bidirectional link between the current span
	// and the original span associated with the creation of the provided Message.
	// This is useful when the original message's span is already created and you want to
	// attach the current span to it.
	//
	// This function should be called for any asynchronous processing of the message to enable
	// advanced tracing and provide a clear view of the message's lifecycle and its associated
	// operations.
	AttachMessageProcessingSpan(ctx context.Context, span trace.Span)
	// contains filtered or unexported methods
}

func NewMessageReferenceTracer added in v0.0.114

func NewMessageReferenceTracer(msg *Message, referenceMsgSpanName string, tr trace.Tracer) (MessageReferenceTracer, error)

type NewMessageOption added in v0.0.141

type NewMessageOption func(*newMessageOptions)

func WithID

func WithID(id string) NewMessageOption

WithID sets the ID of the message. A ksuid will be generated if no ID is provided.

func WithKey added in v0.0.141

func WithKey(key []byte) NewMessageOption

WithKey sets the key of the message.

func WithMetadata

func WithMetadata(m MessageMetadata) NewMessageOption

WithMetadata sets the metadata of the message.

func WithStringKey added in v0.0.141

func WithStringKey(key string) NewMessageOption

WithStringKey sets the key of the message using a string.

type Topic

type Topic string

func BaseTopicFromName added in v0.0.80

func BaseTopicFromName(topicName string) Topic

Expect topic to be format `{scope}.{topic}.{consumer-group}.retry` or `{scope}.{topic}` If the scope is missing, this function will return a wrong result

func ExtractScopeFromTopic added in v0.0.130

func ExtractScopeFromTopic(topic string) (scope string, topicWithoutScope Topic, err error)

ExtractScopeFromTopic extracts the scope and topic from a topic string. It expects the topic to be in the format `{scope}.{topic}`. If the topic does not have a valid format, it returns an error.

func NewTopic

func NewTopic(topic string) (Topic, error)

func TopicFromName

func TopicFromName(topicName string) Topic

func (Topic) GenerateRetryTopic added in v0.0.80

func (t Topic) GenerateRetryTopic(consumerGroup ConsumerGroup) Topic

func (Topic) TopicName

func (t Topic) TopicName(scope string) string

TopicName returns the topic name with the given scope. If the scope is empty, it returns the topic name as is. This should be used when interacting with the concrete pubsubs (e.g. Kafka).

type TraceContextPropagator added in v0.0.81

type TraceContextPropagator struct {
	// contains filtered or unexported fields
}

TraceContextPropagator is responsible for injecting and extracting trace context into and from message metadata.

func NewTraceContextPropagator added in v0.0.81

func NewTraceContextPropagator() TraceContextPropagator

func (*TraceContextPropagator) Extract added in v0.0.81

Extract extracts the trace context from the message metadata and returns a new context with the extracted trace context.

func (*TraceContextPropagator) Inject added in v0.0.81

func (t *TraceContextPropagator) Inject(ctx context.Context, m *Message)

Inject injects the trace context from the provided context into the message metadata.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL