processor

package
v0.0.0-...-031a617 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 8, 2025 License: MIT Imports: 20 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ApplyResult

type ApplyResult struct {
	ShardID         int32
	WriteAccounts   map[int64]*model.WriteAccount
	Logs            []*model.EventApplyResult
	NewEvents       []*DecodedEvent
	ProcessedEvents []*DecodedEvent
	Ctx             context.Context
}

func (*ApplyResult) Merge

func (ap *ApplyResult) Merge(other *ApplyResult)

type BatchEvents

type BatchEvents struct {
	NewEvents       []*DecodedEvent
	ProcessedEvents []*DecodedEvent
	Total           int
}

BatchEvents holds the result of decoding events

type ChangeApplier

type ChangeApplier struct {
	DBRepo    domain.DBRepository
	RedisRepo domain.RedisRepository
	CacheRepo domain.CacheRepository
}

func (ChangeApplier) ApplyChanges

func (ChangeApplier) GetReadBalanceMap

func (ca ChangeApplier) GetReadBalanceMap(ctx context.Context, listAccountBalancesOptions model.ListAccountBalancesOptions) (map[int64]*model.ReadAccount, error)

type ChangeDecimal

type ChangeDecimal struct {
	ChangeId       int64
	CurrencyCode   string
	AvailableDelta decimal.Decimal
	FrozenDelta    decimal.Decimal

	FallbackCurrencyCode   string
	FallbackAvailableDelta decimal.Decimal
	FallbackFrozenDelta    decimal.Decimal
	ApplyFallback          bool
}

func NewChangeDecimal

func NewChangeDecimal(change *eventpb.BalanceChange) (ChangeDecimal, error)

type DecodedEvent

type DecodedEvent struct {
	Msg            mq.ConsumedMessage
	Event          *eventpb.BalanceChangeEvent
	ChangeDecimals []*ChangeDecimal
}

type EventProcessor

type EventProcessor struct {
	*LeaderElector

	CacheRepo        domain.CacheRepository
	DBRepo           domain.DBRepository
	RedisRepo        domain.RedisRepository
	EventValidator   EventValidator
	ChangeApplier    ChangeApplier
	PersistentWorker *PersistentWorker
	ResultPublisher  ResultPublisher
	WorkerPool       utils.WorkerPool
	// contains filtered or unexported fields
}

func NewEventProcessor

func NewEventProcessor(params Params) (*EventProcessor, error)

func (*EventProcessor) Close

func (ep *EventProcessor) Close(ctx context.Context)

func (*EventProcessor) CloseNow

func (ep *EventProcessor) CloseNow(ctx context.Context)

func (*EventProcessor) IsClosed

func (ep *EventProcessor) IsClosed() bool

func (*EventProcessor) IsLeader

func (ep *EventProcessor) IsLeader() bool

func (*EventProcessor) ProcessEvents

func (ep *EventProcessor) ProcessEvents(ctx context.Context, msgs []mq.ConsumedMessage) error

func (*EventProcessor) ProcessEventsErr

func (ep *EventProcessor) ProcessEventsErr(ctx context.Context, msgs []mq.ConsumedMessage, err error)

func (*EventProcessor) PublishApplyResults

func (ep *EventProcessor) PublishApplyResults(ctx context.Context, changedReadBalance map[int64]*model.ReadAccount, batchEvent *BatchEvents, eventResults []*model.EventApplyResult)

func (*EventProcessor) ReleaseLeader

func (ep *EventProcessor) ReleaseLeader(ctx context.Context)

func (*EventProcessor) SetLeader

func (ep *EventProcessor) SetLeader()

func (*EventProcessor) SyncBalanceToRedisCache

func (ep *EventProcessor) SyncBalanceToRedisCache(ctx context.Context, changedAccountBalance map[int64]*model.ReadAccount, epoch int64)

func (*EventProcessor) TryToBecomeLeader

func (ep *EventProcessor) TryToBecomeLeader(ctx context.Context) (partition int64, offset int64, ok bool)

type EventValidator

type EventValidator struct {
	// contains filtered or unexported fields
}

func (EventValidator) DecodeEventsAndGroupByShardID

func (ev EventValidator) DecodeEventsAndGroupByShardID(ctx context.Context, msgs []mq.ConsumedMessage) (shardIDBatchEvents map[int32]*BatchEvents, invalidFormatEvents []*eventpb.BalanceChangeEvent, err error)

type EventValidatorRepository

type EventValidatorRepository interface {
	GetIdempotencyKeysStatus(ctx context.Context, keys []string) (map[string]model.ChangeLogStatus, error)
}

type LeaderElector

type LeaderElector struct {
	// contains filtered or unexported fields
}

func (*LeaderElector) ClearLeader

func (le *LeaderElector) ClearLeader()

func (*LeaderElector) GetFencingToken

func (le *LeaderElector) GetFencingToken() int64

func (*LeaderElector) GetLeaderSvcID

func (le *LeaderElector) GetLeaderSvcID() string

func (*LeaderElector) GetPartition

func (le *LeaderElector) GetPartition() int32

func (*LeaderElector) IsLeader

func (le *LeaderElector) IsLeader() bool

func (*LeaderElector) SetLeader

func (le *LeaderElector) SetLeader()

type Params

type Params struct {
	fx.In
	DBRepo     domain.DBRepository
	RedisRepo  domain.RedisRepository
	CacheRepo  domain.CacheRepository
	Producer   mq.Producer
	WorkerPool utils.WorkerPool
}

type PersistentWorker

type PersistentWorker struct {
	*LeaderElector
	// contains filtered or unexported fields
}

func NewPersistentWorker

func NewPersistentWorker(dbRepo domain.DBRepository, redisRepo domain.RedisRepository, le *LeaderElector) *PersistentWorker

func (*PersistentWorker) DequeueResult

func (pw *PersistentWorker) DequeueResult() []*ApplyResult

func (*PersistentWorker) DoNowAndWait

func (pw *PersistentWorker) DoNowAndWait(ctx context.Context) error

func (*PersistentWorker) EnqueueResult

func (pw *PersistentWorker) EnqueueResult(ctx context.Context, result *ApplyResult)

func (*PersistentWorker) GracefulStop

func (pw *PersistentWorker) GracefulStop()

func (*PersistentWorker) PersistentApplyResult

func (pw *PersistentWorker) PersistentApplyResult(ctx context.Context, result *ApplyResult) error

func (*PersistentWorker) Start

func (pw *PersistentWorker) Start()

func (*PersistentWorker) Stop

func (pw *PersistentWorker) Stop()

type ResultPublisher

type ResultPublisher struct {
	Producer mq.Producer
}

func (*ResultPublisher) PublishResults

func (rp *ResultPublisher) PublishResults(ctx context.Context, changedReadBalance map[int64]*model.ReadAccount, batchEvent *BatchEvents, eventResults []*model.EventApplyResult)

func (*ResultPublisher) PublishStressTestStats

func (rp *ResultPublisher) PublishStressTestStats(ctx context.Context, stats *model.StressTestStats)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL