Documentation
¶
Index ¶
- type ApplyResult
- type BatchEvents
- type ChangeApplier
- type ChangeDecimal
- type DecodedEvent
- type EventProcessor
- func (ep *EventProcessor) Close(ctx context.Context)
- func (ep *EventProcessor) CloseNow(ctx context.Context)
- func (ep *EventProcessor) IsClosed() bool
- func (ep *EventProcessor) IsLeader() bool
- func (ep *EventProcessor) ProcessEvents(ctx context.Context, msgs []mq.ConsumedMessage) error
- func (ep *EventProcessor) ProcessEventsErr(ctx context.Context, msgs []mq.ConsumedMessage, err error)
- func (ep *EventProcessor) PublishApplyResults(ctx context.Context, changedReadBalance map[int64]*model.ReadAccount, ...)
- func (ep *EventProcessor) ReleaseLeader(ctx context.Context)
- func (ep *EventProcessor) SetLeader()
- func (ep *EventProcessor) SyncBalanceToRedisCache(ctx context.Context, changedAccountBalance map[int64]*model.ReadAccount, ...)
- func (ep *EventProcessor) TryToBecomeLeader(ctx context.Context) (partition int64, offset int64, ok bool)
- type EventValidator
- type EventValidatorRepository
- type LeaderElector
- type Params
- type PersistentWorker
- func (pw *PersistentWorker) DequeueResult() []*ApplyResult
- func (pw *PersistentWorker) DoNowAndWait(ctx context.Context) error
- func (pw *PersistentWorker) EnqueueResult(ctx context.Context, result *ApplyResult)
- func (pw *PersistentWorker) GracefulStop()
- func (pw *PersistentWorker) PersistentApplyResult(ctx context.Context, result *ApplyResult) error
- func (pw *PersistentWorker) Start()
- func (pw *PersistentWorker) Stop()
- type ResultPublisher
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ApplyResult ¶
type ApplyResult struct {
ShardID int32
WriteAccounts map[int64]*model.WriteAccount
Logs []*model.EventApplyResult
NewEvents []*DecodedEvent
ProcessedEvents []*DecodedEvent
Ctx context.Context
}
func (*ApplyResult) Merge ¶
func (ap *ApplyResult) Merge(other *ApplyResult)
type BatchEvents ¶
type BatchEvents struct {
NewEvents []*DecodedEvent
ProcessedEvents []*DecodedEvent
Total int
}
BatchEvents holds the result of decoding events
type ChangeApplier ¶
type ChangeApplier struct {
DBRepo domain.DBRepository
RedisRepo domain.RedisRepository
CacheRepo domain.CacheRepository
}
func (ChangeApplier) ApplyChanges ¶
func (ca ChangeApplier) ApplyChanges(ctx context.Context, events []*DecodedEvent) (map[int64]*model.WriteAccount, map[int64]*model.ReadAccount, []*model.EventApplyResult, error)
func (ChangeApplier) GetReadBalanceMap ¶
func (ca ChangeApplier) GetReadBalanceMap(ctx context.Context, listAccountBalancesOptions model.ListAccountBalancesOptions) (map[int64]*model.ReadAccount, error)
type ChangeDecimal ¶
type ChangeDecimal struct {
ChangeId int64
CurrencyCode string
AvailableDelta decimal.Decimal
FrozenDelta decimal.Decimal
FallbackCurrencyCode string
FallbackAvailableDelta decimal.Decimal
FallbackFrozenDelta decimal.Decimal
ApplyFallback bool
}
func NewChangeDecimal ¶
func NewChangeDecimal(change *eventpb.BalanceChange) (ChangeDecimal, error)
type DecodedEvent ¶
type DecodedEvent struct {
Msg mq.ConsumedMessage
Event *eventpb.BalanceChangeEvent
ChangeDecimals []*ChangeDecimal
}
type EventProcessor ¶
type EventProcessor struct {
*LeaderElector
CacheRepo domain.CacheRepository
DBRepo domain.DBRepository
RedisRepo domain.RedisRepository
EventValidator EventValidator
ChangeApplier ChangeApplier
PersistentWorker *PersistentWorker
ResultPublisher ResultPublisher
WorkerPool utils.WorkerPool
// contains filtered or unexported fields
}
func NewEventProcessor ¶
func NewEventProcessor(params Params) (*EventProcessor, error)
func (*EventProcessor) Close ¶
func (ep *EventProcessor) Close(ctx context.Context)
func (*EventProcessor) CloseNow ¶
func (ep *EventProcessor) CloseNow(ctx context.Context)
func (*EventProcessor) IsClosed ¶
func (ep *EventProcessor) IsClosed() bool
func (*EventProcessor) IsLeader ¶
func (ep *EventProcessor) IsLeader() bool
func (*EventProcessor) ProcessEvents ¶
func (ep *EventProcessor) ProcessEvents(ctx context.Context, msgs []mq.ConsumedMessage) error
func (*EventProcessor) ProcessEventsErr ¶
func (ep *EventProcessor) ProcessEventsErr(ctx context.Context, msgs []mq.ConsumedMessage, err error)
func (*EventProcessor) PublishApplyResults ¶
func (ep *EventProcessor) PublishApplyResults(ctx context.Context, changedReadBalance map[int64]*model.ReadAccount, batchEvent *BatchEvents, eventResults []*model.EventApplyResult)
func (*EventProcessor) ReleaseLeader ¶
func (ep *EventProcessor) ReleaseLeader(ctx context.Context)
func (*EventProcessor) SetLeader ¶
func (ep *EventProcessor) SetLeader()
func (*EventProcessor) SyncBalanceToRedisCache ¶
func (ep *EventProcessor) SyncBalanceToRedisCache(ctx context.Context, changedAccountBalance map[int64]*model.ReadAccount, epoch int64)
func (*EventProcessor) TryToBecomeLeader ¶
type EventValidator ¶
type EventValidator struct {
// contains filtered or unexported fields
}
func (EventValidator) DecodeEventsAndGroupByShardID ¶
func (ev EventValidator) DecodeEventsAndGroupByShardID(ctx context.Context, msgs []mq.ConsumedMessage) (shardIDBatchEvents map[int32]*BatchEvents, invalidFormatEvents []*eventpb.BalanceChangeEvent, err error)
type LeaderElector ¶
type LeaderElector struct {
// contains filtered or unexported fields
}
func (*LeaderElector) ClearLeader ¶
func (le *LeaderElector) ClearLeader()
func (*LeaderElector) GetFencingToken ¶
func (le *LeaderElector) GetFencingToken() int64
func (*LeaderElector) GetLeaderSvcID ¶
func (le *LeaderElector) GetLeaderSvcID() string
func (*LeaderElector) GetPartition ¶
func (le *LeaderElector) GetPartition() int32
func (*LeaderElector) IsLeader ¶
func (le *LeaderElector) IsLeader() bool
func (*LeaderElector) SetLeader ¶
func (le *LeaderElector) SetLeader()
type Params ¶
type Params struct {
fx.In
DBRepo domain.DBRepository
RedisRepo domain.RedisRepository
CacheRepo domain.CacheRepository
Producer mq.Producer
WorkerPool utils.WorkerPool
}
type PersistentWorker ¶
type PersistentWorker struct {
*LeaderElector
// contains filtered or unexported fields
}
func NewPersistentWorker ¶
func NewPersistentWorker(dbRepo domain.DBRepository, redisRepo domain.RedisRepository, le *LeaderElector) *PersistentWorker
func (*PersistentWorker) DequeueResult ¶
func (pw *PersistentWorker) DequeueResult() []*ApplyResult
func (*PersistentWorker) DoNowAndWait ¶
func (pw *PersistentWorker) DoNowAndWait(ctx context.Context) error
func (*PersistentWorker) EnqueueResult ¶
func (pw *PersistentWorker) EnqueueResult(ctx context.Context, result *ApplyResult)
func (*PersistentWorker) GracefulStop ¶
func (pw *PersistentWorker) GracefulStop()
func (*PersistentWorker) PersistentApplyResult ¶
func (pw *PersistentWorker) PersistentApplyResult(ctx context.Context, result *ApplyResult) error
func (*PersistentWorker) Start ¶
func (pw *PersistentWorker) Start()
func (*PersistentWorker) Stop ¶
func (pw *PersistentWorker) Stop()
type ResultPublisher ¶
func (*ResultPublisher) PublishResults ¶
func (rp *ResultPublisher) PublishResults(ctx context.Context, changedReadBalance map[int64]*model.ReadAccount, batchEvent *BatchEvents, eventResults []*model.EventApplyResult)
func (*ResultPublisher) PublishStressTestStats ¶
func (rp *ResultPublisher) PublishStressTestStats(ctx context.Context, stats *model.StressTestStats)
Click to show internal directories.
Click to hide internal directories.