Documentation
¶
Index ¶
- Constants
- func BatchedIOBackendTestSuite(t *testing.T, factory func() BatchedIOBackend)
- func EvictWholeProtosessionStrategy(hit *hits.Hit, conflict *IdentifierConflictResponse, ...)
- func GetIsolatedClientID(hit *hits.Hit) hits.ClientID
- func GetIsolatedSessionStamp(hit *hits.Hit) (string, bool)
- func GetIsolatedUserIDStamp(hit *hits.Hit) (string, bool)
- func GetOriginalAuthoritativeClientID(hit *hits.Hit) (hits.ClientID, bool)
- func Handler(ctx context.Context, backend BatchedIOBackend, ...) func(_ map[string]string, h *hits.HitProcessingTask) *worker.Error
- func IsMarkedForEviction(hit *hits.Hit) bool
- func MarkForEviction(hit *hits.Hit, targetClientID hits.ClientID)
- func RewriteIDAndUpdateInPlaceStrategy(hit *hits.Hit, conflict *IdentifierConflictResponse, ...)
- func SendPingWithTime(f func(_ map[string]string, h *hits.HitProcessingTask) *worker.Error, ...) error
- func SetIsolatedClientID(hit *hits.Hit, isolatedID hits.ClientID)
- func SetIsolatedSessionStamp(hit *hits.Hit, stamp string)
- func SetIsolatedUserIDStamp(hit *hits.Hit, stamp string)
- type AppendHitsToProtoSessionRequest
- type AppendHitsToProtoSessionResponse
- type BatchedIOBackend
- type BucketProcessorFunc
- type Closer
- type EvictionStrategy
- type GetAllProtosessionsForBucketRequest
- type GetAllProtosessionsForBucketResponse
- type GetProtoSessionHitsRequest
- type GetProtoSessionHitsResponse
- type IdentifierConflictRequest
- type IdentifierConflictResponse
- type IdentifierIsolationGuard
- type IdentifierIsolationGuardFactory
- type MarkProtoSessionClosingForGivenBucketRequest
- type MarkProtoSessionClosingForGivenBucketResponse
- type Middleware
- type Orchestrator
- type OrchestratorOptionsFunc
- type PerBucketMutexes
- type ProtosessionError
- type RemoveAllHitRelatedMetadataRequest
- type RemoveAllHitRelatedMetadataResponse
- type RemoveBucketMetadataRequest
- type RemoveBucketMetadataResponse
- type RemoveProtoSessionHitsRequest
- type RemoveProtoSessionHitsResponse
- type TestBatchedIOBackend
- func (t *TestBatchedIOBackend) Cleanup(_ context.Context, hitsRequests []*RemoveProtoSessionHitsRequest, ...) ([]*RemoveProtoSessionHitsResponse, []*RemoveAllHitRelatedMetadataResponse, ...)
- func (t *TestBatchedIOBackend) GetAllProtosessionsForBucket(_ context.Context, requests []*GetAllProtosessionsForBucketRequest) []*GetAllProtosessionsForBucketResponse
- func (t *TestBatchedIOBackend) GetIdentifierConflicts(_ context.Context, requests []*IdentifierConflictRequest) []*IdentifierConflictResponse
- func (t *TestBatchedIOBackend) HandleBatch(_ context.Context, appendHitsRequests []*AppendHitsToProtoSessionRequest, ...) ([]*AppendHitsToProtoSessionResponse, []*GetProtoSessionHitsResponse, ...)
- func (t *TestBatchedIOBackend) Stop(_ context.Context) error
- type TestBatchedIOBackendOption
- func WithAppendHitsHandler(h func(*AppendHitsToProtoSessionRequest) *AppendHitsToProtoSessionResponse) TestBatchedIOBackendOption
- func WithCleanupMachineryHandler(handler func() error) TestBatchedIOBackendOption
- func WithGetAllProtosessionsForBucketHandler(...) TestBatchedIOBackendOption
- func WithGetProtoSessionHitsHandler(h func(*GetProtoSessionHitsRequest) *GetProtoSessionHitsResponse) TestBatchedIOBackendOption
- func WithIdentifierConflictHandler(h func(*IdentifierConflictRequest) *IdentifierConflictResponse) TestBatchedIOBackendOption
- func WithMarkProtoSessionClosingHandler(...) TestBatchedIOBackendOption
- func WithRemoveAllHitRelatedMetadataHandler(...) TestBatchedIOBackendOption
- func WithRemoveBucketMetadataHandler(h func(*RemoveBucketMetadataRequest) *RemoveBucketMetadataResponse) TestBatchedIOBackendOption
- func WithRemoveProtoSessionHitsHandler(h func(*RemoveProtoSessionHitsRequest) *RemoveProtoSessionHitsResponse) TestBatchedIOBackendOption
- type TestCloser
- type TestCloserOption
- type TimingWheel
- type TimingWheelStateBackend
Constants ¶
const ( MetaOriginalAuthoritativeClientIDKey = "original_authoritative_client_id" MetaMarkedForEvictionKey = "marked_for_eviction" MetaIsolatedClientIDKey = "isolated_client_id" MetaIsolatedSessionStampKey = "isolated_session_stamp" MetaIsolatedUserIDStampKey = "isolated_user_id_stamp" )
const (
// NextBucketKeyPrefix is the key prefix for storing the next bucket to process.
NextBucketKeyPrefix = "timingwheel.next"
)
Variables ¶
This section is empty.
Functions ¶
func BatchedIOBackendTestSuite ¶ added in v0.13.0
func BatchedIOBackendTestSuite(t *testing.T, factory func() BatchedIOBackend)
BatchedIOBackendTestSuite tests BatchedIOBackend implementations
func EvictWholeProtosessionStrategy ¶ added in v0.13.0
func EvictWholeProtosessionStrategy( hit *hits.Hit, conflict *IdentifierConflictResponse, hitsToBeSaved *[]*hits.Hit, protosessionsForEviction map[hits.ClientID][]*hits.Hit, )
EvictWholeProtosessionStrategy evicts entire protosession and re-queues it
func GetIsolatedSessionStamp ¶ added in v0.18.3
func GetIsolatedUserIDStamp ¶ added in v0.18.3
func GetOriginalAuthoritativeClientID ¶ added in v0.13.0
func Handler ¶
func Handler( ctx context.Context, backend BatchedIOBackend, tickerStateBackend TimingWheelStateBackend, closer Closer, requeuer receiver.Storage, settingsRegistry properties.SettingsRegistry, options ...OrchestratorOptionsFunc, ) func(_ map[string]string, h *hits.HitProcessingTask) *worker.Error
Handler returns a function that processes hit processing tasks.
func IsMarkedForEviction ¶ added in v0.13.0
func MarkForEviction ¶ added in v0.13.0
func RewriteIDAndUpdateInPlaceStrategy ¶ added in v0.13.0
func RewriteIDAndUpdateInPlaceStrategy( hit *hits.Hit, conflict *IdentifierConflictResponse, hitsToBeSaved *[]*hits.Hit, protosessionsForEviction map[hits.ClientID][]*hits.Hit, )
RewriteIDAndUpdateInPlaceStrategy rewrites the hit's ID and updates it in place
func SendPingWithTime ¶ added in v0.13.0
func SendPingWithTime( f func(_ map[string]string, h *hits.HitProcessingTask) *worker.Error, t time.Time, ) error
SendPingWithTime sends a ping with the given time, advancing the worker time.
func SetIsolatedClientID ¶ added in v0.18.3
func SetIsolatedSessionStamp ¶ added in v0.18.3
func SetIsolatedUserIDStamp ¶ added in v0.18.3
Types ¶
type AppendHitsToProtoSessionRequest ¶ added in v0.13.0
AppendHitsToProtoSessionRequest represents a request to append hits to a proto-session
func NewAppendHitsToProtoSessionRequest ¶ added in v0.13.0
func NewAppendHitsToProtoSessionRequest( protoSessionID hits.ClientID, hits []*hits.Hit, ) *AppendHitsToProtoSessionRequest
NewAppendHitsToProtoSessionRequest creates a new append hits request
type AppendHitsToProtoSessionResponse ¶ added in v0.13.0
type AppendHitsToProtoSessionResponse struct {
Err error
}
AppendHitsToProtoSessionResponse represents the result of appending hits
func NewAppendHitsToProtoSessionResponse ¶ added in v0.13.0
func NewAppendHitsToProtoSessionResponse(err error) *AppendHitsToProtoSessionResponse
NewAppendHitsToProtoSessionResponse creates a new append hits response
type BatchedIOBackend ¶ added in v0.13.0
type BatchedIOBackend interface {
GetIdentifierConflicts(
ctx context.Context,
requests []*IdentifierConflictRequest,
) []*IdentifierConflictResponse
HandleBatch(
ctx context.Context,
appendHitsRequests []*AppendHitsToProtoSessionRequest,
getProtoSessionHitsRequests []*GetProtoSessionHitsRequest,
markProtoSessionClosingForGivenBucketRequests []*MarkProtoSessionClosingForGivenBucketRequest,
) (
[]*AppendHitsToProtoSessionResponse,
[]*GetProtoSessionHitsResponse,
[]*MarkProtoSessionClosingForGivenBucketResponse,
)
GetAllProtosessionsForBucket(
ctx context.Context,
requests []*GetAllProtosessionsForBucketRequest,
) []*GetAllProtosessionsForBucketResponse
Cleanup(
ctx context.Context,
hitsRequests []*RemoveProtoSessionHitsRequest,
metadataRequests []*RemoveAllHitRelatedMetadataRequest,
bucketMetadataRequests []*RemoveBucketMetadataRequest,
) (
[]*RemoveProtoSessionHitsResponse,
[]*RemoveAllHitRelatedMetadataResponse,
[]*RemoveBucketMetadataResponse,
)
// Stops the backend, terminates all the gorutines, frees all the resources
Stop(context.Context) error
}
BatchedIOBackend provides batched I/O operations for proto-sessions
func NewDeduplicatingBatchedIOBackend ¶ added in v0.13.0
func NewDeduplicatingBatchedIOBackend(underlying BatchedIOBackend) BatchedIOBackend
NewDeduplicatingBatchedIOBackend wraps a BatchedIOBackend and deduplicates requests
func NewTestBatchedIOBackend ¶ added in v0.13.0
func NewTestBatchedIOBackend(opts ...TestBatchedIOBackendOption) BatchedIOBackend
NewTestBatchedIOBackend creates a test backend with success-like defaults
type BucketProcessorFunc ¶ added in v0.13.0
BucketProcessorFunc processes a single bucket. Returns nil on success (wheel advances), or an error on failure (wheel does not advance).
type Closer ¶
Closer defines an interface for closing and processing hit sessions
func NewPrintingCloser ¶
func NewPrintingCloser() Closer
NewPrintingCloser creates a new Closer implementation that prints the hits to stdout
func NewShardingCloser ¶ added in v0.13.0
NewShardingCloser distributes batches of proto-sessions across N child Closers
func NewTestCloser ¶ added in v0.13.0
func NewTestCloser(opts ...TestCloserOption) Closer
NewTestCloser creates a test closer with success-like defaults
type EvictionStrategy ¶ added in v0.13.0
type EvictionStrategy func( hit *hits.Hit, conflict *IdentifierConflictResponse, hitsToBeSaved *[]*hits.Hit, protosessionsForEviction map[hits.ClientID][]*hits.Hit, )
EvictionStrategy processes conflicting hits and determines eviction behavior
type GetAllProtosessionsForBucketRequest ¶ added in v0.13.0
type GetAllProtosessionsForBucketRequest struct {
BucketID int64
}
GetAllProtosessionsForBucketRequest represents a request to get all proto-sessions for a bucket
func NewGetAllProtosessionsForBucketRequest ¶ added in v0.13.0
func NewGetAllProtosessionsForBucketRequest(bucketID int64) *GetAllProtosessionsForBucketRequest
NewGetAllProtosessionsForBucketRequest creates a new get all protosessions for bucket request
type GetAllProtosessionsForBucketResponse ¶ added in v0.13.0
GetAllProtosessionsForBucketResponse represents all proto-sessions for a bucket
func NewGetAllProtosessionsForBucketResponse ¶ added in v0.13.0
func NewGetAllProtosessionsForBucketResponse( protoSessions [][]*hits.Hit, err error, ) *GetAllProtosessionsForBucketResponse
NewGetAllProtosessionsForBucketResponse creates a new get all protosessions for bucket response
type GetProtoSessionHitsRequest ¶ added in v0.13.0
GetProtoSessionHitsRequest represents a request to get hits for a proto-session
func NewGetProtoSessionHitsRequest ¶ added in v0.13.0
func NewGetProtoSessionHitsRequest(protoSessionID hits.ClientID) *GetProtoSessionHitsRequest
NewGetProtoSessionHitsRequest creates a new get proto-session hits request
type GetProtoSessionHitsResponse ¶ added in v0.13.0
GetProtoSessionHitsResponse represents the result of getting proto-session hits
func NewGetProtoSessionHitsResponse ¶ added in v0.13.0
func NewGetProtoSessionHitsResponse(hits []*hits.Hit, err error) *GetProtoSessionHitsResponse
NewGetProtoSessionHitsResponse creates a new get proto-session hits response
type IdentifierConflictRequest ¶ added in v0.13.0
type IdentifierConflictRequest struct {
Hit *hits.Hit
IdentifierType string
ExtractIdentifier func(*hits.Hit) string
}
IdentifierConflictRequest represents a request to check for identifier conflicts
func GetConflictCheckRequests ¶ added in v0.13.0
func GetConflictCheckRequests( hit *hits.Hit, settings *properties.Settings, ) []*IdentifierConflictRequest
GetConflictCheckRequests returns a list of identifier conflict requests for a given hit and settings
func NewIdentifierConflictRequest ¶ added in v0.13.0
func NewIdentifierConflictRequest( hit *hits.Hit, identifierType string, extractIdentifier func(*hits.Hit) string, ) *IdentifierConflictRequest
NewIdentifierConflictRequest creates a new identifier conflict request
type IdentifierConflictResponse ¶ added in v0.13.0
type IdentifierConflictResponse struct {
Err error
HasConflict bool
ConflictsWith hits.ClientID
Request *IdentifierConflictRequest
}
IdentifierConflictResponse represents the result of identifier conflict check
func NewIdentifierConflictResponse ¶ added in v0.13.0
func NewIdentifierConflictResponse( request *IdentifierConflictRequest, err error, hasConflict bool, conflictsWith hits.ClientID, ) *IdentifierConflictResponse
NewIdentifierConflictResponse creates a new identifier conflict response
type IdentifierIsolationGuard ¶ added in v0.18.3
type IdentifierIsolationGuard interface {
IsolatedClientID(hit *hits.Hit) hits.ClientID
IsolatedSessionStamp(hit *hits.Hit) string
IsolatedUserID(hit *hits.Hit) string
}
func NewNoIsolationGuard ¶ added in v0.18.3
func NewNoIsolationGuard() IdentifierIsolationGuard
type IdentifierIsolationGuardFactory ¶ added in v0.18.3
type IdentifierIsolationGuardFactory interface {
New(settings *properties.Settings) IdentifierIsolationGuard
}
func NewDefaultIdentifierIsolationGuardFactory ¶ added in v0.18.3
func NewDefaultIdentifierIsolationGuardFactory() IdentifierIsolationGuardFactory
func NewNoIsolationGuardFactory ¶ added in v0.18.3
func NewNoIsolationGuardFactory() IdentifierIsolationGuardFactory
NewNoIsolationGuardFactory creates a new no isolation guard factory - DANGER - mixes data from different properties in the same proto-session!
type MarkProtoSessionClosingForGivenBucketRequest ¶ added in v0.13.0
type MarkProtoSessionClosingForGivenBucketRequest struct {
ProtoSessionID hits.ClientID
BucketID int64
}
MarkProtoSessionClosingForGivenBucketRequest marks a proto-session for closing in a bucket
func NewMarkProtoSessionClosingForGivenBucketRequest ¶ added in v0.13.0
func NewMarkProtoSessionClosingForGivenBucketRequest( protoSessionID hits.ClientID, bucketID int64, ) *MarkProtoSessionClosingForGivenBucketRequest
NewMarkProtoSessionClosingForGivenBucketRequest creates a new mark closing request
type MarkProtoSessionClosingForGivenBucketResponse ¶ added in v0.13.0
type MarkProtoSessionClosingForGivenBucketResponse struct {
Err error
}
MarkProtoSessionClosingForGivenBucketResponse represents the result of marking closing
func NewMarkProtoSessionClosingForGivenBucketResponse ¶ added in v0.13.0
func NewMarkProtoSessionClosingForGivenBucketResponse(err error) *MarkProtoSessionClosingForGivenBucketResponse
NewMarkProtoSessionClosingForGivenBucketResponse creates a new mark closing response
type Middleware ¶
Middleware defines an interface for task processing middleware
type Orchestrator ¶ added in v0.13.0
type Orchestrator struct {
// contains filtered or unexported fields
}
func NewOrchestrator ¶ added in v0.13.0
func NewOrchestrator( ctx context.Context, backend BatchedIOBackend, tickerStateBackend TimingWheelStateBackend, closer Closer, requeuer receiver.Storage, settingsRegistry properties.SettingsRegistry, options ...OrchestratorOptionsFunc, ) *Orchestrator
type OrchestratorOptionsFunc ¶ added in v0.18.3
type OrchestratorOptionsFunc func(*Orchestrator)
func WithEvictionStrategy ¶ added in v0.18.3
func WithEvictionStrategy( evictionStrategy EvictionStrategy, ) OrchestratorOptionsFunc
func WithIdentifierIsolationGuardFactory ¶ added in v0.18.3
func WithIdentifierIsolationGuardFactory( identifierIsolationGuardFactory IdentifierIsolationGuardFactory, ) OrchestratorOptionsFunc
type PerBucketMutexes ¶ added in v0.13.0
func (PerBucketMutexes) Drop ¶ added in v0.13.0
func (m PerBucketMutexes) Drop(bucketNumber int64)
func (PerBucketMutexes) Lock ¶ added in v0.13.0
func (m PerBucketMutexes) Lock(bucketNumber int64)
func (PerBucketMutexes) TryLock ¶ added in v0.13.0
func (m PerBucketMutexes) TryLock(bucketNumber int64) bool
type ProtosessionError ¶ added in v0.13.0
func NewErrorCausingTaskDrop ¶ added in v0.17.0
func NewErrorCausingTaskDrop(err error) ProtosessionError
func NewErrorCausingTaskRetry ¶ added in v0.17.0
func NewErrorCausingTaskRetry(err error) ProtosessionError
type RemoveAllHitRelatedMetadataRequest ¶ added in v0.13.0
type RemoveAllHitRelatedMetadataRequest struct {
IdentifierType string
ExtractIdentifier func(*hits.Hit) string
Hit *hits.Hit
}
RemoveAllHitRelatedMetadataRequest represents a request to remove all hit-related metadata
func GetRemoveHitRelatedMetadataRequests ¶ added in v0.13.0
func GetRemoveHitRelatedMetadataRequests( protoSession []*hits.Hit, settings *properties.Settings, ) []*RemoveAllHitRelatedMetadataRequest
GetRemoveHitRelatedMetadataRequests returns a list of remove all hit related metadata requests for a given proto session and settings
func NewRemoveAllHitRelatedMetadataRequest ¶ added in v0.13.0
func NewRemoveAllHitRelatedMetadataRequest( hit *hits.Hit, identifierType string, extractIdentifier func(*hits.Hit) string, ) *RemoveAllHitRelatedMetadataRequest
NewRemoveAllHitRelatedMetadataRequest creates a new remove hit-related metadata request
type RemoveAllHitRelatedMetadataResponse ¶ added in v0.13.0
type RemoveAllHitRelatedMetadataResponse struct {
Err error
}
RemoveAllHitRelatedMetadataResponse represents the result of removing all hit-related metadata
func NewRemoveAllHitRelatedMetadataResponse ¶ added in v0.13.0
func NewRemoveAllHitRelatedMetadataResponse(err error) *RemoveAllHitRelatedMetadataResponse
NewRemoveAllHitRelatedMetadataResponse creates a new remove hit-related metadata response
type RemoveBucketMetadataRequest ¶ added in v0.13.0
type RemoveBucketMetadataRequest struct {
BucketID int64
}
RemoveBucketMetadataRequest represents a request to remove bucket metadata
func NewRemoveBucketMetadataRequest ¶ added in v0.13.0
func NewRemoveBucketMetadataRequest(bucketID int64) *RemoveBucketMetadataRequest
NewRemoveBucketMetadataRequest creates a new remove bucket metadata request
type RemoveBucketMetadataResponse ¶ added in v0.13.0
type RemoveBucketMetadataResponse struct {
Err error
}
RemoveBucketMetadataResponse represents the result of removing bucket metadata
func NewRemoveBucketMetadataResponse ¶ added in v0.13.0
func NewRemoveBucketMetadataResponse(err error) *RemoveBucketMetadataResponse
NewRemoveBucketMetadataResponse creates a new remove bucket metadata response
type RemoveProtoSessionHitsRequest ¶ added in v0.13.0
RemoveProtoSessionHitsRequest represents a request to remove proto-session hits
func NewRemoveProtoSessionHitsRequest ¶ added in v0.13.0
func NewRemoveProtoSessionHitsRequest(protoSessionID hits.ClientID) *RemoveProtoSessionHitsRequest
NewRemoveProtoSessionHitsRequest creates a new remove proto-session hits request
type RemoveProtoSessionHitsResponse ¶ added in v0.13.0
type RemoveProtoSessionHitsResponse struct {
Err error
}
RemoveProtoSessionHitsResponse represents the result of removing proto-session hits
func NewRemoveProtoSessionHitsResponse ¶ added in v0.13.0
func NewRemoveProtoSessionHitsResponse(err error) *RemoveProtoSessionHitsResponse
NewRemoveProtoSessionHitsResponse creates a new remove proto-session hits response
type TestBatchedIOBackend ¶ added in v0.13.0
type TestBatchedIOBackend struct {
// contains filtered or unexported fields
}
TestBatchedIOBackend is a configurable test implementation of BatchedIOBackend
func (*TestBatchedIOBackend) Cleanup ¶ added in v0.13.0
func (t *TestBatchedIOBackend) Cleanup( _ context.Context, hitsRequests []*RemoveProtoSessionHitsRequest, metadataRequests []*RemoveAllHitRelatedMetadataRequest, bucketMetadataRequests []*RemoveBucketMetadataRequest, ) ( []*RemoveProtoSessionHitsResponse, []*RemoveAllHitRelatedMetadataResponse, []*RemoveBucketMetadataResponse, )
Cleanup implements BatchedIOBackend
func (*TestBatchedIOBackend) GetAllProtosessionsForBucket ¶ added in v0.13.0
func (t *TestBatchedIOBackend) GetAllProtosessionsForBucket( _ context.Context, requests []*GetAllProtosessionsForBucketRequest, ) []*GetAllProtosessionsForBucketResponse
GetAllProtosessionsForBucket implements BatchedIOBackend
func (*TestBatchedIOBackend) GetIdentifierConflicts ¶ added in v0.13.0
func (t *TestBatchedIOBackend) GetIdentifierConflicts( _ context.Context, requests []*IdentifierConflictRequest, ) []*IdentifierConflictResponse
GetIdentifierConflicts implements BatchedIOBackend
func (*TestBatchedIOBackend) HandleBatch ¶ added in v0.13.0
func (t *TestBatchedIOBackend) HandleBatch( _ context.Context, appendHitsRequests []*AppendHitsToProtoSessionRequest, getProtoSessionHitsRequests []*GetProtoSessionHitsRequest, markProtoSessionClosingForGivenBucketRequests []*MarkProtoSessionClosingForGivenBucketRequest, ) ( []*AppendHitsToProtoSessionResponse, []*GetProtoSessionHitsResponse, []*MarkProtoSessionClosingForGivenBucketResponse, )
HandleBatch implements BatchedIOBackend
type TestBatchedIOBackendOption ¶ added in v0.13.0
type TestBatchedIOBackendOption func(*TestBatchedIOBackend)
TestBatchedIOBackendOption configures TestBatchedIOBackend
func WithAppendHitsHandler ¶ added in v0.13.0
func WithAppendHitsHandler( h func(*AppendHitsToProtoSessionRequest) *AppendHitsToProtoSessionResponse, ) TestBatchedIOBackendOption
WithAppendHitsHandler sets custom handler for append hits requests
func WithCleanupMachineryHandler ¶ added in v0.13.0
func WithCleanupMachineryHandler(handler func() error) TestBatchedIOBackendOption
WithCleanupMachineryHandler sets custom handler for cleanup
func WithGetAllProtosessionsForBucketHandler ¶ added in v0.13.0
func WithGetAllProtosessionsForBucketHandler( h func(*GetAllProtosessionsForBucketRequest) *GetAllProtosessionsForBucketResponse, ) TestBatchedIOBackendOption
WithGetAllProtosessionsForBucketHandler sets custom handler for bucket proto sessions requests
func WithGetProtoSessionHitsHandler ¶ added in v0.13.0
func WithGetProtoSessionHitsHandler( h func(*GetProtoSessionHitsRequest) *GetProtoSessionHitsResponse, ) TestBatchedIOBackendOption
WithGetProtoSessionHitsHandler sets custom handler for get proto session hits requests
func WithIdentifierConflictHandler ¶ added in v0.13.0
func WithIdentifierConflictHandler( h func(*IdentifierConflictRequest) *IdentifierConflictResponse, ) TestBatchedIOBackendOption
WithIdentifierConflictHandler sets custom handler for identifier conflict requests
func WithMarkProtoSessionClosingHandler ¶ added in v0.13.0
func WithMarkProtoSessionClosingHandler( h func(*MarkProtoSessionClosingForGivenBucketRequest) *MarkProtoSessionClosingForGivenBucketResponse, ) TestBatchedIOBackendOption
WithMarkProtoSessionClosingHandler sets custom handler for mark closing requests
func WithRemoveAllHitRelatedMetadataHandler ¶ added in v0.13.0
func WithRemoveAllHitRelatedMetadataHandler( h func(*RemoveAllHitRelatedMetadataRequest) *RemoveAllHitRelatedMetadataResponse, ) TestBatchedIOBackendOption
WithRemoveAllHitRelatedMetadataHandler sets custom handler for remove metadata requests
func WithRemoveBucketMetadataHandler ¶ added in v0.13.0
func WithRemoveBucketMetadataHandler( h func(*RemoveBucketMetadataRequest) *RemoveBucketMetadataResponse, ) TestBatchedIOBackendOption
WithRemoveBucketMetadataHandler sets custom handler for remove bucket metadata requests
func WithRemoveProtoSessionHitsHandler ¶ added in v0.13.0
func WithRemoveProtoSessionHitsHandler( h func(*RemoveProtoSessionHitsRequest) *RemoveProtoSessionHitsResponse, ) TestBatchedIOBackendOption
WithRemoveProtoSessionHitsHandler sets custom handler for remove hits requests
type TestCloser ¶ added in v0.13.0
type TestCloser struct {
// contains filtered or unexported fields
}
TestCloser is a configurable test implementation of Closer
type TestCloserOption ¶ added in v0.13.0
type TestCloserOption func(*TestCloser)
TestCloserOption configures TestCloser
func WithCloseHandler ¶ added in v0.13.0
func WithCloseHandler(handler func([][]*hits.Hit) error) TestCloserOption
WithCloseHandler sets custom handler for closing proto-sessions
type TimingWheel ¶ added in v0.13.0
type TimingWheel struct {
// contains filtered or unexported fields
}
TimingWheel implements a timing wheel for scheduling protosession closures.
func NewTimingWheel ¶ added in v0.13.0
func NewTimingWheel( backend TimingWheelStateBackend, tickInterval time.Duration, processor BucketProcessorFunc, ) *TimingWheel
NewTimingWheel creates a timing wheel with the given tick interval.
func (*TimingWheel) BucketNumber ¶ added in v0.13.0
func (tw *TimingWheel) BucketNumber(theTime time.Time) int64
func (*TimingWheel) CurrentTime ¶ added in v0.13.0
func (tw *TimingWheel) CurrentTime() time.Time
CurrentTime returns the timing wheel's current time.
func (*TimingWheel) Start ¶ added in v0.13.0
func (tw *TimingWheel) Start(ctx context.Context)
Start begins the timing wheel loop in a goroutine.
func (*TimingWheel) Stop ¶ added in v0.13.0
func (tw *TimingWheel) Stop()
Stop signals the timing wheel to stop and waits for it to finish.
func (*TimingWheel) UpdateTime ¶ added in v0.13.0
func (tw *TimingWheel) UpdateTime(t time.Time)
UpdateTime updates the timing wheel's current time if the new time is after the existing time.
type TimingWheelStateBackend ¶ added in v0.13.0
type TimingWheelStateBackend interface {
// GetNextBucket returns the next bucket to process.
// Returns -1 if no bucket has been processed yet (first run).
GetNextBucket(ctx context.Context) (int64, error)
// SaveNextBucket persists the next bucket number to process.
SaveNextBucket(ctx context.Context, bucketNumber int64) error
}
TimingWheelStateBackend provides abstract storage for timing wheel state.
func NewGenericKVTimingWheelBackend ¶ added in v0.13.0
func NewGenericKVTimingWheelBackend( name string, kv storage.KV, ) TimingWheelStateBackend
NewGenericKVTimingWheelBackend creates a TickerStateBackend using generic storage interfaces.