Documentation
¶
Index ¶
- Constants
- Variables
- func GetTLSCertFromFile(path string) (*tls.Certificate, error)
- type AuditLog
- type AuditLogBuilder
- func (auditLogBuilder *AuditLogBuilder) ActionName(actionName string) *AuditLogBuilder
- func (auditLogBuilder *AuditLogBuilder) Actor(actor string) *AuditLogBuilder
- func (auditLogBuilder *AuditLogBuilder) ActorNamespace(actorNamespace string) *AuditLogBuilder
- func (auditLogBuilder *AuditLogBuilder) Build() (*kafka.Message, error)
- func (auditLogBuilder *AuditLogBuilder) Category(category string) *AuditLogBuilder
- func (auditLogBuilder *AuditLogBuilder) ClientID(clientID string) *AuditLogBuilder
- func (auditLogBuilder *AuditLogBuilder) Content(content map[string]interface{}) *AuditLogBuilder
- func (auditLogBuilder *AuditLogBuilder) DeviceID(deviceID string) *AuditLogBuilder
- func (auditLogBuilder *AuditLogBuilder) Diff(diff *AuditLogDiff) *AuditLogBuilder
- func (auditLogBuilder *AuditLogBuilder) IP(ip string) *AuditLogBuilder
- func (auditLogBuilder *AuditLogBuilder) IsActorTypeUser(isActorTypeUser bool) *AuditLogBuilder
- func (auditLogBuilder *AuditLogBuilder) Key(key string) *AuditLogBuilder
- func (auditLogBuilder *AuditLogBuilder) ObjectID(objectID string) *AuditLogBuilder
- func (auditLogBuilder *AuditLogBuilder) ObjectNamespace(objectNamespace string) *AuditLogBuilder
- func (auditLogBuilder *AuditLogBuilder) ObjectType(objectType string) *AuditLogBuilder
- func (auditLogBuilder *AuditLogBuilder) TargetUserID(targetUserID string) *AuditLogBuilder
- type AuditLogDiff
- type AuditLogPayload
- type BlackholeClient
- func (client *BlackholeClient) GetMetadata(_ string, _ time.Duration) (*Metadata, error)
- func (client *BlackholeClient) Publish(publishBuilder *PublishBuilder) error
- func (client *BlackholeClient) PublishAuditLog(auditLogBuilder *AuditLogBuilder) error
- func (client *BlackholeClient) PublishSync(publishBuidler *PublishBuilder) error
- func (client *BlackholeClient) Register(subscribeBuilder *SubscribeBuilder) error
- type BrokerConfig
- type BrokerMetadata
- type Client
- type Event
- type KafkaClient
- func (client *KafkaClient) GetMetadata(topic string, timeout time.Duration) (*Metadata, error)
- func (client *KafkaClient) GetReaderStats() statistics.Stats
- func (client *KafkaClient) GetWriterStats() statistics.Stats
- func (client *KafkaClient) Publish(publishBuilder *PublishBuilder) error
- func (client *KafkaClient) PublishAuditLog(auditLogBuilder *AuditLogBuilder) error
- func (client *KafkaClient) PublishSync(publishBuilder *PublishBuilder) error
- func (client *KafkaClient) Register(subscribeBuilder *SubscribeBuilder) error
- type Metadata
- type PublishBuilder
- func (p *PublishBuilder) AdditionalFields(additionalFields map[string]interface{}) *PublishBuilder
- func (p *PublishBuilder) ClientID(clientID string) *PublishBuilder
- func (p *PublishBuilder) ClientIDs(clientIDs []string) *PublishBuilder
- func (p *PublishBuilder) Context(ctx context.Context) *PublishBuilder
- func (p *PublishBuilder) EventID(eventID int) *PublishBuilder
- func (p *PublishBuilder) EventLevel(eventLevel int) *PublishBuilder
- func (p *PublishBuilder) EventName(eventName string) *PublishBuilder
- func (p *PublishBuilder) EventType(eventType int) *PublishBuilder
- func (p *PublishBuilder) ID(id string) *PublishBuilder
- func (p *PublishBuilder) Key(key string) *PublishBuilder
- func (p *PublishBuilder) Namespace(namespace string) *PublishBuilder
- func (p *PublishBuilder) ParentNamespace(parentNamespace string) *PublishBuilder
- func (p *PublishBuilder) Payload(payload map[string]interface{}) *PublishBuilder
- func (p *PublishBuilder) Privacy(privacy bool) *PublishBuilder
- func (p *PublishBuilder) ServiceName(serviceName string) *PublishBuilder
- func (p *PublishBuilder) SessionID(sessionID string) *PublishBuilder
- func (p *PublishBuilder) SpanContext(spanID string) *PublishBuilder
- func (p *PublishBuilder) TargetNamespace(targetNamespace string) *PublishBuilder
- func (p *PublishBuilder) TargetUserIDs(targetUserIDs []string) *PublishBuilder
- func (p *PublishBuilder) Timeout(timeout time.Duration) *PublishBuilderdeprecated
- func (p *PublishBuilder) Topic(topic string) *PublishBuilder
- func (p *PublishBuilder) TraceID(traceID string) *PublishBuilder
- func (p *PublishBuilder) UnionNamespace(unionNamespace string) *PublishBuilder
- func (p *PublishBuilder) UserID(userID string) *PublishBuilder
- func (p *PublishBuilder) Version(version int) *PublishBuilder
- type PublishErrorCallbackFunc
- type SecurityConfig
- type StdoutClient
- func (client *StdoutClient) GetMetadata(_ string, _ time.Duration) (*Metadata, error)
- func (client *StdoutClient) Publish(publishBuilder *PublishBuilder) error
- func (client *StdoutClient) PublishAuditLog(auditLogBuilder *AuditLogBuilder) error
- func (client *StdoutClient) PublishSync(publishBuilder *PublishBuilder) error
- func (client *StdoutClient) Register(subscribeBuilder *SubscribeBuilder) error
- type SubscribeBuilder
- func (s *SubscribeBuilder) AsyncCommitMessage(async bool) *SubscribeBuilder
- func (s *SubscribeBuilder) Callback(callback func(ctx context.Context, event *Event, err error) error) *SubscribeBuilder
- func (s *SubscribeBuilder) CallbackRaw(f func(ctx context.Context, msgValue []byte, err error) error) *SubscribeBuilder
- func (s *SubscribeBuilder) Context(ctx context.Context) *SubscribeBuilder
- func (s *SubscribeBuilder) EventName(eventName string) *SubscribeBuilder
- func (s *SubscribeBuilder) GroupID(groupID string) *SubscribeBuilder
- func (s *SubscribeBuilder) GroupInstanceID(groupInstanceID string) *SubscribeBuilder
- func (s *SubscribeBuilder) Offset(offset int64) *SubscribeBuilder
- func (s *SubscribeBuilder) SendErrorDLQ(dlq bool) *SubscribeBuilder
- func (s *SubscribeBuilder) Slug() string
- func (s *SubscribeBuilder) Topic(topic string) *SubscribeBuilder
Constants ¶
const ( OffLevel = "off" InfoLevel = "info" DebugLevel = "debug" WarnLevel = "warn" ErrorLevel = "error" )
log level
const (
DefaultSSLCertPath = "/etc/ssl/certs/ca-certificates.crt" // Alpine certificate path
)
const TopicEventPattern = "^[a-zA-Z0-9]+((['_.-][a-zA-Z0-9])?[a-zA-Z0-9]*)*$"
Variables ¶
var ( NotificationEventNamePath = "name" FreeformNotificationUserIDsPath = []string{"payload", "userIds"} )
var (
ErrMessageTooLarge = errors.New("message to large")
)
Functions ¶
func GetTLSCertFromFile ¶
func GetTLSCertFromFile(path string) (*tls.Certificate, error)
GetTLSCertFromFile reads file, divides into key and certificates
Types ¶
type AuditLog ¶
type AuditLog struct {
ID string `json:"_id" valid:"required"`
Category string `json:"category" valid:"required"`
ActionName string `json:"actionName" valid:"required"`
Timestamp int64 `json:"timestamp" valid:"required"`
IP string `json:"ip,omitempty" valid:"optional"`
Actor string `json:"actor" valid:"uuid4WithoutHyphens,required"`
ActorType string `json:"actorType" valid:"required~actorType values: USER CLIENT"`
ClientID string `json:"clientId" valid:"uuid4WithoutHyphens,required"`
ActorNamespace string `json:"actorNamespace" valid:"required"`
ObjectID string `json:"objectId,omitempty" valid:"optional"`
ObjectType string `json:"objectType,omitempty" valid:"optional"`
ObjectNamespace string `json:"objectNamespace" valid:"required~use publisher namespace if resource has no namespace"`
TargetUserID string `json:"targetUserId,omitempty" valid:"uuid4WithoutHyphens,optional"`
DeviceID string `json:"deviceId,omitempty" valid:"optional"`
Payload AuditLogPayload `json:"payload" valid:"required"`
}
type AuditLogBuilder ¶
type AuditLogBuilder struct {
// contains filtered or unexported fields
}
func NewAuditLogBuilder ¶
func NewAuditLogBuilder() *AuditLogBuilder
NewAuditLogBuilder create new AuditLogBuilder instance
func (*AuditLogBuilder) ActionName ¶
func (auditLogBuilder *AuditLogBuilder) ActionName(actionName string) *AuditLogBuilder
func (*AuditLogBuilder) Actor ¶
func (auditLogBuilder *AuditLogBuilder) Actor(actor string) *AuditLogBuilder
func (*AuditLogBuilder) ActorNamespace ¶
func (auditLogBuilder *AuditLogBuilder) ActorNamespace(actorNamespace string) *AuditLogBuilder
func (*AuditLogBuilder) Build ¶
func (auditLogBuilder *AuditLogBuilder) Build() (*kafka.Message, error)
func (*AuditLogBuilder) Category ¶
func (auditLogBuilder *AuditLogBuilder) Category(category string) *AuditLogBuilder
func (*AuditLogBuilder) ClientID ¶
func (auditLogBuilder *AuditLogBuilder) ClientID(clientID string) *AuditLogBuilder
func (*AuditLogBuilder) Content ¶
func (auditLogBuilder *AuditLogBuilder) Content(content map[string]interface{}) *AuditLogBuilder
func (*AuditLogBuilder) DeviceID ¶
func (auditLogBuilder *AuditLogBuilder) DeviceID(deviceID string) *AuditLogBuilder
func (*AuditLogBuilder) Diff ¶
func (auditLogBuilder *AuditLogBuilder) Diff(diff *AuditLogDiff) *AuditLogBuilder
Diff If diff is not nil, please make sure diff.Before and diff.Before are both not nil
func (*AuditLogBuilder) IP ¶
func (auditLogBuilder *AuditLogBuilder) IP(ip string) *AuditLogBuilder
func (*AuditLogBuilder) IsActorTypeUser ¶
func (auditLogBuilder *AuditLogBuilder) IsActorTypeUser(isActorTypeUser bool) *AuditLogBuilder
func (*AuditLogBuilder) Key ¶
func (auditLogBuilder *AuditLogBuilder) Key(key string) *AuditLogBuilder
func (*AuditLogBuilder) ObjectID ¶
func (auditLogBuilder *AuditLogBuilder) ObjectID(objectID string) *AuditLogBuilder
func (*AuditLogBuilder) ObjectNamespace ¶
func (auditLogBuilder *AuditLogBuilder) ObjectNamespace(objectNamespace string) *AuditLogBuilder
func (*AuditLogBuilder) ObjectType ¶
func (auditLogBuilder *AuditLogBuilder) ObjectType(objectType string) *AuditLogBuilder
func (*AuditLogBuilder) TargetUserID ¶
func (auditLogBuilder *AuditLogBuilder) TargetUserID(targetUserID string) *AuditLogBuilder
type AuditLogDiff ¶
type AuditLogPayload ¶
type AuditLogPayload struct {
Content map[string]interface{} `json:"content"`
Diff AuditLogDiff `json:"diff"`
}
type BlackholeClient ¶
type BlackholeClient struct{}
BlackholeClient satisfies the publisher for mocking
func (*BlackholeClient) GetMetadata ¶ added in v4.1.0
func (*BlackholeClient) Publish ¶
func (client *BlackholeClient) Publish(publishBuilder *PublishBuilder) error
func (*BlackholeClient) PublishAuditLog ¶
func (client *BlackholeClient) PublishAuditLog(auditLogBuilder *AuditLogBuilder) error
func (*BlackholeClient) PublishSync ¶
func (client *BlackholeClient) PublishSync(publishBuidler *PublishBuilder) error
func (*BlackholeClient) Register ¶
func (client *BlackholeClient) Register(subscribeBuilder *SubscribeBuilder) error
type BrokerConfig ¶
type BrokerConfig struct {
// Disable auto commit on every consumer polls when the AutoCommitInterval has stepped in.
// It's recommended to enable auto commit as manual commit per message is much slower.
// Default: false (auto commit is enabled)
DisableAutoCommit bool
// Interval between auto commits. This will only take effect when auto commit is enabled.
// Assigning zero value will be overridden by the default value.
// Default: 1 second
AutoCommitInterval time.Duration
// Enable committing the message offset right after consumer polls and before the message is processed.
// Otherwise, the message offset will be committed after it is processed. When auto commit is enabled,
// it will store the offset to be committed by auto-committer later.
// Default: false
CommitBeforeProcessing bool
// The maximum time duration the client may use to deliver a message, including retries
// Assigning zero value will be overridden by the default value.
// Default: 60 seconds
PublishTimeout time.Duration
// BaseConfig is a map to store key-value configuration of a broker.
// It will override other configs that have been set using other BrokerConfig options.
// Only Kafka broker is supported.
// List of supported Kafka configuration: https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md
BaseConfig map[string]interface{}
StrictValidation bool
CACertFile string
DialTimeout time.Duration
SecurityConfig *SecurityConfig
MetricsRegistry prometheus.Registerer // optional registry to report metrics to prometheus (used for kafka stats)
}
BrokerConfig is custom configuration for message broker
type BrokerMetadata ¶ added in v4.1.0
type Client ¶
type Client interface {
Publish(publishBuilder *PublishBuilder) error
PublishSync(publishBuilder *PublishBuilder) error
Register(subscribeBuilder *SubscribeBuilder) error
PublishAuditLog(auditLogBuilder *AuditLogBuilder) error
GetMetadata(topic string, timeout time.Duration) (*Metadata, error)
}
Client is an interface for event stream functionality
type Event ¶
type Event struct {
ID string `json:"id,omitempty"`
EventName string `json:"name,omitempty"`
Namespace string `json:"namespace,omitempty"`
ParentNamespace string `json:"parentNamespace,omitempty"`
UnionNamespace string `json:"unionNamespace,omitempty"`
ClientID string `json:"clientId,omitempty"`
TraceID string `json:"traceId,omitempty"`
SpanContext string `json:"spanContext,omitempty"`
UserID string `json:"userId,omitempty"`
SessionID string `json:"sessionId,omitempty"`
Timestamp string `json:"timestamp,omitempty"`
Version int `json:"version,omitempty"`
EventID int `json:"event_id,omitempty"`
EventType int `json:"event_type,omitempty"`
EventLevel int `json:"event_level,omitempty"`
ServiceName string `json:"service,omitempty"`
ClientIDs []string `json:"client_ids,omitempty"`
TargetUserIDs []string `json:"target_user_ids,omitempty"`
TargetNamespace string `json:"target_namespace,omitempty"`
Privacy bool `json:"privacy,omitempty"`
Topic string `json:"topic,omitempty"`
AdditionalFields map[string]interface{} `json:"additional_fields,omitempty"`
Payload map[string]interface{} `json:"payload,omitempty"`
Partition int `json:",omitempty"`
Offset int64 `json:",omitempty"`
Key string `json:",omitempty"`
}
Event defines the structure of event
func ConstructEvent ¶
func ConstructEvent(publishBuilder *PublishBuilder) (*kafka.Message, *Event, error)
ConstructEvent construct event message
type KafkaClient ¶
type KafkaClient struct {
// mutex to avoid runtime races to access subscribers map
ReadersLock sync.RWMutex
// contains filtered or unexported fields
}
KafkaClient wraps client's functionality for Kafka
func (*KafkaClient) GetMetadata ¶ added in v4.1.0
func (*KafkaClient) GetReaderStats ¶ added in v4.1.3
func (client *KafkaClient) GetReaderStats() statistics.Stats
GetReaderStats returns the latest internal statistics of brokers, topics, and partitions of consumers. The stats values are refreshed at a fixed interval which can be configured by setting the `statistics.interval.ms` config
func (*KafkaClient) GetWriterStats ¶ added in v4.1.3
func (client *KafkaClient) GetWriterStats() statistics.Stats
GetWriterStats returns the latest internal statistics of brokers, topics, and partitions of producers. The stats values are refreshed at a fixed interval which can be configured by setting the `statistics.interval.ms` config
func (*KafkaClient) Publish ¶
func (client *KafkaClient) Publish(publishBuilder *PublishBuilder) error
Publish send event to single or multiple topic with exponential backoff retry
func (*KafkaClient) PublishAuditLog ¶
func (client *KafkaClient) PublishAuditLog(auditLogBuilder *AuditLogBuilder) error
PublishAuditLog send an audit log message
func (*KafkaClient) PublishSync ¶
func (client *KafkaClient) PublishSync(publishBuilder *PublishBuilder) error
PublishSync send an event synchronously (blocking)
func (*KafkaClient) Register ¶
func (client *KafkaClient) Register(subscribeBuilder *SubscribeBuilder) error
Register registers callback function and then subscribe topic nolint: gocognit,funlen
type Metadata ¶ added in v4.1.0
type Metadata struct {
Brokers []BrokerMetadata
}
type PublishBuilder ¶
type PublishBuilder struct {
// contains filtered or unexported fields
}
PublishBuilder defines the structure of message which is sent through message broker
func (*PublishBuilder) AdditionalFields ¶
func (p *PublishBuilder) AdditionalFields(additionalFields map[string]interface{}) *PublishBuilder
AdditionalFields set AdditionalFields of publisher event
func (*PublishBuilder) ClientID ¶
func (p *PublishBuilder) ClientID(clientID string) *PublishBuilder
ClientID set clientID of publisher event
func (*PublishBuilder) ClientIDs ¶
func (p *PublishBuilder) ClientIDs(clientIDs []string) *PublishBuilder
ClientIDs set clientIDs of publisher event
func (*PublishBuilder) Context ¶
func (p *PublishBuilder) Context(ctx context.Context) *PublishBuilder
Context define client context when publish event. default: context.Background()
func (*PublishBuilder) EventID ¶
func (p *PublishBuilder) EventID(eventID int) *PublishBuilder
EventID set eventID of publisher event
func (*PublishBuilder) EventLevel ¶
func (p *PublishBuilder) EventLevel(eventLevel int) *PublishBuilder
EventLevel set eventLevel of publisher event
func (*PublishBuilder) EventName ¶
func (p *PublishBuilder) EventName(eventName string) *PublishBuilder
EventName set name of published event
func (*PublishBuilder) EventType ¶
func (p *PublishBuilder) EventType(eventType int) *PublishBuilder
EventType set eventType of publisher event
func (*PublishBuilder) ID ¶ added in v4.2.0
func (p *PublishBuilder) ID(id string) *PublishBuilder
ID set ID of publiser event
func (*PublishBuilder) Key ¶
func (p *PublishBuilder) Key(key string) *PublishBuilder
Key is a message key that used to determine the partition of the topic if client require strong order for the events
func (*PublishBuilder) Namespace ¶
func (p *PublishBuilder) Namespace(namespace string) *PublishBuilder
Namespace set namespace of published event
func (*PublishBuilder) ParentNamespace ¶
func (p *PublishBuilder) ParentNamespace(parentNamespace string) *PublishBuilder
func (*PublishBuilder) Payload ¶
func (p *PublishBuilder) Payload(payload map[string]interface{}) *PublishBuilder
Payload is a event payload that will be published
func (*PublishBuilder) Privacy ¶
func (p *PublishBuilder) Privacy(privacy bool) *PublishBuilder
Privacy set privacy of publisher event
func (*PublishBuilder) ServiceName ¶
func (p *PublishBuilder) ServiceName(serviceName string) *PublishBuilder
ServiceName set serviceName of publisher event
func (*PublishBuilder) SessionID ¶
func (p *PublishBuilder) SessionID(sessionID string) *PublishBuilder
SessionID set sessionID of publisher event
func (*PublishBuilder) SpanContext ¶
func (p *PublishBuilder) SpanContext(spanID string) *PublishBuilder
SpanContext set jaeger spanContext of publisher event
func (*PublishBuilder) TargetNamespace ¶
func (p *PublishBuilder) TargetNamespace(targetNamespace string) *PublishBuilder
TargetNamespace set targetNamespace of publisher event
func (*PublishBuilder) TargetUserIDs ¶
func (p *PublishBuilder) TargetUserIDs(targetUserIDs []string) *PublishBuilder
TargetUserIDs set targetUserIDs of publisher event
func (*PublishBuilder) Timeout
deprecated
func (p *PublishBuilder) Timeout(timeout time.Duration) *PublishBuilder
Timeout is an upper bound on the time to report success or failure after a call to send() returns. The value of this config should be greater than or equal to the sum of request.timeout.ms and linger.ms.
Deprecated: This config is deprecated. It will only take effect for the first publisher of the client. Configure PublishTimeout from the BrokerConfig instead.
Default value: 60000 ms
func (*PublishBuilder) Topic ¶
func (p *PublishBuilder) Topic(topic string) *PublishBuilder
Topic set channel / topic name
func (*PublishBuilder) TraceID ¶
func (p *PublishBuilder) TraceID(traceID string) *PublishBuilder
TraceID set traceID of publisher event
func (*PublishBuilder) UnionNamespace ¶
func (p *PublishBuilder) UnionNamespace(unionNamespace string) *PublishBuilder
Parent namespace for AGS Starter, leave it empty for AGS Premium
func (*PublishBuilder) UserID ¶
func (p *PublishBuilder) UserID(userID string) *PublishBuilder
UserID set userID of publisher event
func (*PublishBuilder) Version ¶
func (p *PublishBuilder) Version(version int) *PublishBuilder
Version set event schema version
type SecurityConfig ¶
SecurityConfig contains security configuration for message broker
type StdoutClient ¶
type StdoutClient struct {
// contains filtered or unexported fields
}
StdoutClient satisfies the publisher for mocking
func (*StdoutClient) GetMetadata ¶ added in v4.1.0
func (*StdoutClient) Publish ¶
func (client *StdoutClient) Publish(publishBuilder *PublishBuilder) error
Publish print event to console
func (*StdoutClient) PublishAuditLog ¶
func (client *StdoutClient) PublishAuditLog(auditLogBuilder *AuditLogBuilder) error
func (*StdoutClient) PublishSync ¶
func (client *StdoutClient) PublishSync(publishBuilder *PublishBuilder) error
func (*StdoutClient) Register ¶
func (client *StdoutClient) Register(subscribeBuilder *SubscribeBuilder) error
Register print event to console
type SubscribeBuilder ¶
type SubscribeBuilder struct {
// contains filtered or unexported fields
}
SubscribeBuilder defines the structure of message which is sent through message broker
func NewSubscribe ¶
func NewSubscribe() *SubscribeBuilder
NewSubscribe create new SubscribeBuilder instance
func (*SubscribeBuilder) AsyncCommitMessage ¶
func (s *SubscribeBuilder) AsyncCommitMessage(async bool) *SubscribeBuilder
AsyncCommitMessage to asynchronously commit message offset. This setting will be overridden by AutoCommitInterval on BrokerConfig
func (*SubscribeBuilder) Callback ¶
func (s *SubscribeBuilder) Callback( callback func(ctx context.Context, event *Event, err error) error, ) *SubscribeBuilder
Callback to do when the event received
func (*SubscribeBuilder) CallbackRaw ¶
func (s *SubscribeBuilder) CallbackRaw( f func(ctx context.Context, msgValue []byte, err error) error, ) *SubscribeBuilder
CallbackRaw callback that receives the undecoded payload
func (*SubscribeBuilder) Context ¶
func (s *SubscribeBuilder) Context(ctx context.Context) *SubscribeBuilder
Context define client context when subscribe event. default: context.Background()
func (*SubscribeBuilder) EventName ¶
func (s *SubscribeBuilder) EventName(eventName string) *SubscribeBuilder
EventName set event name that will be subscribed
func (*SubscribeBuilder) GroupID ¶
func (s *SubscribeBuilder) GroupID(groupID string) *SubscribeBuilder
GroupID set subscriber groupID. A random groupID will be generated by default.
func (*SubscribeBuilder) GroupInstanceID ¶
func (s *SubscribeBuilder) GroupInstanceID(groupInstanceID string) *SubscribeBuilder
GroupInstanceID set subscriber group instance ID
func (*SubscribeBuilder) Offset ¶
func (s *SubscribeBuilder) Offset(offset int64) *SubscribeBuilder
Offset set Offset of the event to start
func (*SubscribeBuilder) SendErrorDLQ ¶
func (s *SubscribeBuilder) SendErrorDLQ(dlq bool) *SubscribeBuilder
SendErrorDLQ to send error message to DLQ topic. DLQ topic: 'topic' + -dlq
func (*SubscribeBuilder) Slug ¶
func (s *SubscribeBuilder) Slug() string
Slug is a string describing a unique subscriber (topic, eventName, groupID)
func (*SubscribeBuilder) Topic ¶
func (s *SubscribeBuilder) Topic(topic string) *SubscribeBuilder
Topic set topic that will be subscribe