messaging

package
v1.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 22, 2026 License: MIT Imports: 18 Imported by: 1

README

Messaging Package

The messaging package provides a publish-subscribe messaging system for asynchronous communication.

Features

  • Pub/Sub Pattern: Publish messages to topics, subscribe with consumers
  • Redis Streaming: Default implementation using Redis streams
  • GORM Integration: Sync service for database message persistence
  • Message Processing: Pluggable processor functions

Main Components

MessagingService

Core messaging interface:

  • Pub(ctx, topic, payload): Publish message to topic
  • Sub(ctx, topic, consumer, processor): Subscribe to topic with consumer group
Processor

Function type for message processing:

  • Receives topic, consumer, and payload
  • Returns error on processing failure
Implementations
  • Redis Streaming: High-performance Redis-based messaging
  • RAM: In-memory messaging for testing
  • GORM Sync Service: Database-backed message synchronization

Usage

// Publish message
err := msgService.Pub(ctx, "orders", orderData)

// Subscribe
err := msgService.Sub(ctx, "orders", "consumer-1", func(ctx, topic, consumer, payload) {
    // Process message
})

Dependencies

  • Redis for streaming implementation
  • GORM for database sync
  • Zap for logging

Documentation

Index

Constants

View Source
const (
	DefaultMsgLimit          = math.MaxInt16
	DefaultAttKey            = "payload"
	DefaultSchedule          = "@every 30m"
	DefaultDeadLetterDurtion = 8 * time.Hour //if messaging pending for more than this duration, will be put to dead letter
)

Variables

View Source
var (
	AbandonedChan       chan any
	GormCallbackEnabled = false
)
View Source
var (
	DefaultGormToipc     = "scm.gorm.saved"
	GormMessagingEnabled = false // found it impact the gorm(cause gorm failed without any warning or error), so disabled before fixed.
)
View Source
var ConsumerName string
View Source
var ResetTopics []string
View Source
var SyncPageSize = 1000

Functions

func GetMapping added in v0.10.15

func GetMapping() map[string]reflect.Type

func GetPayloadID added in v0.10.10

func GetPayloadID(payload any) (uint, bool)

try to get payload ID value as uint, return false if payload doesn't have ID field

func GetRegisted

func GetRegisted() []any

GetRegisted function is used to obtain a list of registered entities.

func PubEntitiesSince added in v0.10.7

func PubEntitiesSince(ctx context.Context, keys []string, since time.Time, to time.Time) error

func QueryEntities added in v0.10.7

func QueryEntities[T any](ctx context.Context, db *gorm.DB, logger *zap.Logger, since time.Time, to time.Time, index int, queryDeleted bool) ([]any, error)

func Reg

func Reg[T any](payload T)

should never pass any into this function

Types

type DefaultMessgingService

type DefaultMessgingService struct {
	Logger          *zap.Logger
	Client          *redis.Client
	PendingSchedule string
	Settings        map[string]int64 // settings for streaming limit settings. default 10000
}

func (*DefaultMessgingService) ProcessPendings

func (msg *DefaultMessgingService) ProcessPendings(ctx context.Context, topic, group string, processor Processor)

func (*DefaultMessgingService) Pub

func (msg *DefaultMessgingService) Pub(ctx context.Context, topic string, payload any) error

func (*DefaultMessgingService) Sub

func (msg *DefaultMessgingService) Sub(ctx context.Context, topic, group string, processor Processor) error

type DispathFn added in v0.10.14

type DispathFn func(ctx context.Context, topic, consumer string, kp *GormPayload, payload any, tt reflect.Type) error

type GormAction

type GormAction string
const (
	GormActionSave   GormAction = "save"
	GormActionDelete GormAction = "delete"
)

type GormObjSyncService

type GormObjSyncService struct {
	MessageService MessagingService
	DB             *gorm.DB
	Logger         *zap.Logger
	Sharding       Sharding
	Dispath        DispathFn
}

func NewGormObjSyncService

func NewGormObjSyncService(ms MessagingService, logger *zap.Logger, db *gorm.DB) *GormObjSyncService

func (*GormObjSyncService) Abandoned added in v0.10.14

func (ss *GormObjSyncService) Abandoned(kp *GormPayload, errCode, topic, consumer string)

func (*GormObjSyncService) ProcessGormObject added in v0.10.14

func (ss *GormObjSyncService) ProcessGormObject(ctx context.Context, topic, consumer string, kp *GormPayload, payload any, tt reflect.Type) error

func (*GormObjSyncService) ReceiveGormObjectSaved

func (ss *GormObjSyncService) ReceiveGormObjectSaved(ctx context.Context, topic, consumer string, raw []byte) error

type GormPayload

type GormPayload struct {
	Key     string
	Action  GormAction
	Payload string
	SynctAt time.Time
}

func ToKeyAndPayload added in v0.10.14

func ToKeyAndPayload(raw []byte) (*GormPayload, error)

type MessagingAdaptor added in v0.11.0

type MessagingAdaptor[T any] struct {
	ChanAdaaptor *core.ChanAdaptor[T]
	Topic        string
}

func (*MessagingAdaptor[T]) Adaptor added in v0.11.0

func (r *MessagingAdaptor[T]) Adaptor(ctx context.Context, topic, consumer string, payload []byte) error

func (*MessagingAdaptor[T]) AsBridge added in v0.11.0

func (r *MessagingAdaptor[T]) AsBridge(service MessagingService)

type MessagingService

type MessagingService interface {
	Pub(ctx context.Context, topic string, payload any) error
	Sub(ctx context.Context, topic, consumer string, processor Processor) error
}

MessagingService, default impl Redis streaming.

type MessagnePending added in v0.10.10

type MessagnePending struct {
	Topic    string
	Group    string
	Schedule string
	MaxBatch int
	Limit    int
}

type Processor

type Processor func(ctx context.Context, topic, consumer string, payload []byte) error

type QueryFn added in v0.10.7

type QueryFn func(ctx context.Context, db *gorm.DB, logger *zap.Logger, since time.Time, to time.Time, index int, queryDeleted bool) ([]any, error)

type Sharding

type Sharding func(tx *gorm.DB, key string, payload any) (tablename string, err error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL