Documentation
¶
Overview ¶
Package retry provides retry mechanisms for the forwarder.
Index ¶
- Variables
- type DiskUsageLimit
- type EndpointProto
- func (*EndpointProto) Descriptor() ([]byte, []int)deprecated
- func (x *EndpointProto) GetName() string
- func (x *EndpointProto) GetRoute() []byte
- func (*EndpointProto) ProtoMessage()
- func (x *EndpointProto) ProtoReflect() protoreflect.Message
- func (x *EndpointProto) Reset()
- func (x *EndpointProto) String() string
- type FileRemovalPolicy
- type FileRemovalPolicyTelemetry
- type HTTPTransactionsSerializer
- type HeaderValuesProto
- type HttpTransactionProto
- func (*HttpTransactionProto) Descriptor() ([]byte, []int)deprecated
- func (x *HttpTransactionProto) GetCreatedAt() int64
- func (x *HttpTransactionProto) GetDestination() TransactionDestinationProto
- func (x *HttpTransactionProto) GetDomain() string
- func (x *HttpTransactionProto) GetEndpoint() *EndpointProto
- func (x *HttpTransactionProto) GetErrorCount() int64
- func (x *HttpTransactionProto) GetHeaders() map[string]*HeaderValuesProto
- func (x *HttpTransactionProto) GetPayload() []byte
- func (x *HttpTransactionProto) GetPointCount() int32
- func (x *HttpTransactionProto) GetPriority() TransactionPriorityProto
- func (x *HttpTransactionProto) GetRetryable() bool
- func (*HttpTransactionProto) ProtoMessage()
- func (x *HttpTransactionProto) ProtoReflect() protoreflect.Message
- func (x *HttpTransactionProto) Reset()
- func (x *HttpTransactionProto) String() string
- type HttpTransactionProtoCollection
- func (*HttpTransactionProtoCollection) Descriptor() ([]byte, []int)deprecated
- func (x *HttpTransactionProtoCollection) GetValues() []*HttpTransactionProto
- func (x *HttpTransactionProtoCollection) GetVersion() int32
- func (*HttpTransactionProtoCollection) ProtoMessage()
- func (x *HttpTransactionProtoCollection) ProtoReflect() protoreflect.Message
- func (x *HttpTransactionProtoCollection) Reset()
- func (x *HttpTransactionProtoCollection) String() string
- type PointCountTelemetry
- type QueueCapacityStats
- type QueueDurationCapacity
- type TransactionDestinationProto
- func (TransactionDestinationProto) Descriptor() protoreflect.EnumDescriptor
- func (x TransactionDestinationProto) Enum() *TransactionDestinationProto
- func (TransactionDestinationProto) EnumDescriptor() ([]byte, []int)deprecated
- func (x TransactionDestinationProto) Number() protoreflect.EnumNumber
- func (x TransactionDestinationProto) String() string
- func (TransactionDestinationProto) Type() protoreflect.EnumType
- type TransactionDiskStorage
- type TransactionPriorityProto
- func (TransactionPriorityProto) Descriptor() protoreflect.EnumDescriptor
- func (x TransactionPriorityProto) Enum() *TransactionPriorityProto
- func (TransactionPriorityProto) EnumDescriptor() ([]byte, []int)deprecated
- func (x TransactionPriorityProto) Number() protoreflect.EnumNumber
- func (x TransactionPriorityProto) String() string
- func (TransactionPriorityProto) Type() protoreflect.EnumType
- type TransactionPrioritySorter
- type TransactionRetryQueue
- func (tc *TransactionRetryQueue) Add(t transaction.Transaction) (int, error)
- func (tc *TransactionRetryQueue) ExtractTransactions() ([]transaction.Transaction, error)
- func (tc *TransactionRetryQueue) FlushToDisk() error
- func (tc *TransactionRetryQueue) GetDiskSpaceUsed() int64
- func (tc *TransactionRetryQueue) GetMaxMemSizeInBytes() int
- func (tc *TransactionRetryQueue) GetTransactionCount() int
- type TransactionRetryQueueTelemetry
Constants ¶
This section is empty.
Variables ¶
var ( TransactionPriorityProto_name = map[int32]string{ 0: "NORMAL", 1: "HIGH", } TransactionPriorityProto_value = map[string]int32{ "NORMAL": 0, "HIGH": 1, } )
Enum value maps for TransactionPriorityProto.
var ( TransactionDestinationProto_name = map[int32]string{ 0: "ALL_REGIONS", 1: "PRIMARY_ONLY", 2: "SECONDARY_ONLY", } TransactionDestinationProto_value = map[string]int32{ "ALL_REGIONS": 0, "PRIMARY_ONLY": 1, "SECONDARY_ONLY": 2, } )
Enum value maps for TransactionDestinationProto.
var File_HttpTransactionProto_proto protoreflect.FileDescriptor
Functions ¶
This section is empty.
Types ¶
type DiskUsageLimit ¶
type DiskUsageLimit struct {
// contains filtered or unexported fields
}
DiskUsageLimit provides `computeAvailableSpace` which returns the amount of disk space that can be used to store transactions.
func NewDiskUsageLimit ¶
func NewDiskUsageLimit( diskPath string, disk diskUsageRetriever, maxSizeInBytes int64, maxDiskRatio float64) *DiskUsageLimit
NewDiskUsageLimit creates a new instance of NewDiskUsageLimit
type EndpointProto ¶
type EndpointProto struct {
// Using bytes instead of string to allow non-UTF-8 placeholder characters for API key scrubbing
Route []byte `protobuf:"bytes,1,opt,name=route,proto3" json:"route,omitempty"`
Name string `protobuf:"bytes,2,opt,name=name,proto3" json:"name,omitempty"`
// contains filtered or unexported fields
}
func (*EndpointProto) Descriptor
deprecated
func (*EndpointProto) Descriptor() ([]byte, []int)
Deprecated: Use EndpointProto.ProtoReflect.Descriptor instead.
func (*EndpointProto) GetName ¶
func (x *EndpointProto) GetName() string
func (*EndpointProto) GetRoute ¶
func (x *EndpointProto) GetRoute() []byte
func (*EndpointProto) ProtoMessage ¶
func (*EndpointProto) ProtoMessage()
func (*EndpointProto) ProtoReflect ¶ added in v0.76.0
func (x *EndpointProto) ProtoReflect() protoreflect.Message
func (*EndpointProto) Reset ¶
func (x *EndpointProto) Reset()
func (*EndpointProto) String ¶
func (x *EndpointProto) String() string
type FileRemovalPolicy ¶
type FileRemovalPolicy struct {
// contains filtered or unexported fields
}
FileRemovalPolicy handles the removal policy for `.retry` files.
func NewFileRemovalPolicy ¶
func NewFileRemovalPolicy( rootPath string, outdatedFileDayCount int, telemetry FileRemovalPolicyTelemetry) (*FileRemovalPolicy, error)
NewFileRemovalPolicy creates a new instance of FileRemovalPolicy
func (*FileRemovalPolicy) RegisterDomain ¶
func (p *FileRemovalPolicy) RegisterDomain(domainName string) (string, error)
RegisterDomain registers a domain name.
func (*FileRemovalPolicy) RemoveOutdatedFiles ¶
func (p *FileRemovalPolicy) RemoveOutdatedFiles() ([]string, error)
RemoveOutdatedFiles removes the outdated files when a file is older than outDatedFileDayCount days.
func (*FileRemovalPolicy) RemoveUnknownDomains ¶
func (p *FileRemovalPolicy) RemoveUnknownDomains() ([]string, error)
RemoveUnknownDomains remove unknown domains.
type FileRemovalPolicyTelemetry ¶
type FileRemovalPolicyTelemetry struct{}
FileRemovalPolicyTelemetry handles the telemetry for FileRemovalPolicy.
type HTTPTransactionsSerializer ¶
type HTTPTransactionsSerializer struct {
// contains filtered or unexported fields
}
HTTPTransactionsSerializer serializes Transaction instances. To support a new Transaction implementation, add a new method `func (s *HTTPTransactionsSerializer) Add(transaction NEW_TYPE) error {`
func NewHTTPTransactionsSerializer ¶
func NewHTTPTransactionsSerializer(log log.Component, resolver resolver.DomainResolver) *HTTPTransactionsSerializer
NewHTTPTransactionsSerializer creates a new instance of HTTPTransactionsSerializer
func (*HTTPTransactionsSerializer) Add ¶
func (s *HTTPTransactionsSerializer) Add(transaction *transaction.HTTPTransaction) error
Add adds a transaction to the serializer. This function uses references on HTTPTransaction.Payload and HTTPTransaction.Headers and so the transaction must not be updated until a call to `GetBytesAndReset`.
func (*HTTPTransactionsSerializer) Deserialize ¶
func (s *HTTPTransactionsSerializer) Deserialize(bytes []byte) ([]transaction.Transaction, int, error)
Deserialize deserializes from bytes.
func (*HTTPTransactionsSerializer) GetBytesAndReset ¶
func (s *HTTPTransactionsSerializer) GetBytesAndReset() ([]byte, error)
GetBytesAndReset returns as bytes the serialized transactions and reset the transaction collection.
type HeaderValuesProto ¶
type HeaderValuesProto struct {
// Using bytes instead of string to allow non-UTF-8 placeholder characters for API key scrubbing
Values [][]byte `protobuf:"bytes,1,rep,name=values,proto3" json:"values,omitempty"`
// contains filtered or unexported fields
}
func (*HeaderValuesProto) Descriptor
deprecated
func (*HeaderValuesProto) Descriptor() ([]byte, []int)
Deprecated: Use HeaderValuesProto.ProtoReflect.Descriptor instead.
func (*HeaderValuesProto) GetValues ¶
func (x *HeaderValuesProto) GetValues() [][]byte
func (*HeaderValuesProto) ProtoMessage ¶
func (*HeaderValuesProto) ProtoMessage()
func (*HeaderValuesProto) ProtoReflect ¶ added in v0.76.0
func (x *HeaderValuesProto) ProtoReflect() protoreflect.Message
func (*HeaderValuesProto) Reset ¶
func (x *HeaderValuesProto) Reset()
func (*HeaderValuesProto) String ¶
func (x *HeaderValuesProto) String() string
type HttpTransactionProto ¶
type HttpTransactionProto struct {
Domain string `protobuf:"bytes,1,opt,name=Domain,proto3" json:"Domain,omitempty"`
Endpoint *EndpointProto `protobuf:"bytes,2,opt,name=Endpoint,proto3" json:"Endpoint,omitempty"`
Headers map[string]*HeaderValuesProto `` /* 141-byte string literal not displayed */
Payload []byte `protobuf:"bytes,4,opt,name=Payload,proto3" json:"Payload,omitempty"`
ErrorCount int64 `protobuf:"varint,5,opt,name=ErrorCount,proto3" json:"ErrorCount,omitempty"`
CreatedAt int64 `protobuf:"varint,6,opt,name=CreatedAt,proto3" json:"CreatedAt,omitempty"`
Retryable bool `protobuf:"varint,7,opt,name=Retryable,proto3" json:"Retryable,omitempty"`
Priority TransactionPriorityProto `protobuf:"varint,8,opt,name=priority,proto3,enum=retry.TransactionPriorityProto" json:"priority,omitempty"`
PointCount int32 `protobuf:"varint,9,opt,name=PointCount,proto3" json:"PointCount,omitempty"`
Destination TransactionDestinationProto `protobuf:"varint,10,opt,name=Destination,proto3,enum=retry.TransactionDestinationProto" json:"Destination,omitempty"`
// contains filtered or unexported fields
}
func (*HttpTransactionProto) Descriptor
deprecated
func (*HttpTransactionProto) Descriptor() ([]byte, []int)
Deprecated: Use HttpTransactionProto.ProtoReflect.Descriptor instead.
func (*HttpTransactionProto) GetCreatedAt ¶
func (x *HttpTransactionProto) GetCreatedAt() int64
func (*HttpTransactionProto) GetDestination ¶ added in v0.56.0
func (x *HttpTransactionProto) GetDestination() TransactionDestinationProto
func (*HttpTransactionProto) GetDomain ¶
func (x *HttpTransactionProto) GetDomain() string
func (*HttpTransactionProto) GetEndpoint ¶
func (x *HttpTransactionProto) GetEndpoint() *EndpointProto
func (*HttpTransactionProto) GetErrorCount ¶
func (x *HttpTransactionProto) GetErrorCount() int64
func (*HttpTransactionProto) GetHeaders ¶
func (x *HttpTransactionProto) GetHeaders() map[string]*HeaderValuesProto
func (*HttpTransactionProto) GetPayload ¶
func (x *HttpTransactionProto) GetPayload() []byte
func (*HttpTransactionProto) GetPointCount ¶
func (x *HttpTransactionProto) GetPointCount() int32
func (*HttpTransactionProto) GetPriority ¶
func (x *HttpTransactionProto) GetPriority() TransactionPriorityProto
func (*HttpTransactionProto) GetRetryable ¶
func (x *HttpTransactionProto) GetRetryable() bool
func (*HttpTransactionProto) ProtoMessage ¶
func (*HttpTransactionProto) ProtoMessage()
func (*HttpTransactionProto) ProtoReflect ¶ added in v0.76.0
func (x *HttpTransactionProto) ProtoReflect() protoreflect.Message
func (*HttpTransactionProto) Reset ¶
func (x *HttpTransactionProto) Reset()
func (*HttpTransactionProto) String ¶
func (x *HttpTransactionProto) String() string
type HttpTransactionProtoCollection ¶
type HttpTransactionProtoCollection struct {
Version int32 `protobuf:"varint,1,opt,name=version,proto3" json:"version,omitempty"`
Values []*HttpTransactionProto `protobuf:"bytes,2,rep,name=values,proto3" json:"values,omitempty"`
// contains filtered or unexported fields
}
func (*HttpTransactionProtoCollection) Descriptor
deprecated
func (*HttpTransactionProtoCollection) Descriptor() ([]byte, []int)
Deprecated: Use HttpTransactionProtoCollection.ProtoReflect.Descriptor instead.
func (*HttpTransactionProtoCollection) GetValues ¶
func (x *HttpTransactionProtoCollection) GetValues() []*HttpTransactionProto
func (*HttpTransactionProtoCollection) GetVersion ¶
func (x *HttpTransactionProtoCollection) GetVersion() int32
func (*HttpTransactionProtoCollection) ProtoMessage ¶
func (*HttpTransactionProtoCollection) ProtoMessage()
func (*HttpTransactionProtoCollection) ProtoReflect ¶ added in v0.76.0
func (x *HttpTransactionProtoCollection) ProtoReflect() protoreflect.Message
func (*HttpTransactionProtoCollection) Reset ¶
func (x *HttpTransactionProtoCollection) Reset()
func (*HttpTransactionProtoCollection) String ¶
func (x *HttpTransactionProtoCollection) String() string
type PointCountTelemetry ¶
type PointCountTelemetry struct {
// contains filtered or unexported fields
}
PointCountTelemetry sends the number of points successfully sent and the number of points dropped.
func NewPointCountTelemetry ¶
func NewPointCountTelemetry(domain string) *PointCountTelemetry
NewPointCountTelemetry creates a new instance of PointCountTelemetry.
func (*PointCountTelemetry) OnPointDropped ¶
func (t *PointCountTelemetry) OnPointDropped(count int)
OnPointDropped increases the telemetry that counts the number of points droppped
func (*PointCountTelemetry) OnPointSuccessfullySent ¶
func (t *PointCountTelemetry) OnPointSuccessfullySent(count int)
OnPointSuccessfullySent increases the telemetry that counts the number of points successfully sent.
type QueueCapacityStats ¶
QueueCapacityStats represents statistics about the capacity of the retry queue.
type QueueDurationCapacity ¶
type QueueDurationCapacity struct {
// contains filtered or unexported fields
}
QueueDurationCapacity provides a method to know how much data (express as a duration) the in-memory retry queue and the disk storage retry queue can store. For each domain, the capacity in bytes is the sum of: - the in-memory retry queue capacity. We assume there is enough memory for all in-memory retry queues (one by domain) - the available disk storage * `domain relative speed` where `domain relative speed` is the number of bytes per second for this domain, divided by the total number of bytes per second for all domains. If a domain receives twice the traffic compared to anoter one, twice disk storage capacity is allocated to this domain. Disk storage is shared across domain. If there is no traffic during the time period for a domain, no statistic is reported.
func NewQueueDurationCapacity ¶
func NewQueueDurationCapacity( historyDuration time.Duration, bucketDuration time.Duration, maxMemSizeInBytes int, optionalDiskSpace diskSpace) *QueueDurationCapacity
NewQueueDurationCapacity creates a new instance of *QueueDurationCapacity. if `optionalDiskSpace` is not nil, the capacity also use the storage on disk. `historyDuration` is the duration used to compute the number of bytes received per second. `bucketDuration` is the size of a bucket.
func (*QueueDurationCapacity) ComputeCapacity ¶
func (r *QueueDurationCapacity) ComputeCapacity(t time.Time) (map[string]QueueCapacityStats, error)
ComputeCapacity computes the capacity of the retry queue express as a duration. Return statistics by domain name.
func (*QueueDurationCapacity) OnTransaction ¶
func (r *QueueDurationCapacity) OnTransaction( transaction *transaction.HTTPTransaction, mainDomain string, now time.Time) error
OnTransaction must be called for each transaction. Note: because of alternateDomains, `mainDomain` is not necessary the same as transaction.Domain.
type TransactionDestinationProto ¶ added in v0.56.0
type TransactionDestinationProto int32
const ( TransactionDestinationProto_ALL_REGIONS TransactionDestinationProto = 0 TransactionDestinationProto_PRIMARY_ONLY TransactionDestinationProto = 1 TransactionDestinationProto_SECONDARY_ONLY TransactionDestinationProto = 2 )
func (TransactionDestinationProto) Descriptor ¶ added in v0.76.0
func (TransactionDestinationProto) Descriptor() protoreflect.EnumDescriptor
func (TransactionDestinationProto) Enum ¶ added in v0.76.0
func (x TransactionDestinationProto) Enum() *TransactionDestinationProto
func (TransactionDestinationProto) EnumDescriptor
deprecated
added in
v0.56.0
func (TransactionDestinationProto) EnumDescriptor() ([]byte, []int)
Deprecated: Use TransactionDestinationProto.Descriptor instead.
func (TransactionDestinationProto) Number ¶ added in v0.76.0
func (x TransactionDestinationProto) Number() protoreflect.EnumNumber
func (TransactionDestinationProto) String ¶ added in v0.56.0
func (x TransactionDestinationProto) String() string
func (TransactionDestinationProto) Type ¶ added in v0.76.0
func (TransactionDestinationProto) Type() protoreflect.EnumType
type TransactionDiskStorage ¶
type TransactionDiskStorage interface {
Store([]transaction.Transaction) error
ExtractLast() ([]transaction.Transaction, error)
GetDiskSpaceUsed() int64
}
TransactionDiskStorage is an interface to store and load transactions from disk
type TransactionPriorityProto ¶
type TransactionPriorityProto int32
const ( TransactionPriorityProto_NORMAL TransactionPriorityProto = 0 TransactionPriorityProto_HIGH TransactionPriorityProto = 1 )
func (TransactionPriorityProto) Descriptor ¶ added in v0.76.0
func (TransactionPriorityProto) Descriptor() protoreflect.EnumDescriptor
func (TransactionPriorityProto) Enum ¶ added in v0.76.0
func (x TransactionPriorityProto) Enum() *TransactionPriorityProto
func (TransactionPriorityProto) EnumDescriptor
deprecated
func (TransactionPriorityProto) EnumDescriptor() ([]byte, []int)
Deprecated: Use TransactionPriorityProto.Descriptor instead.
func (TransactionPriorityProto) Number ¶ added in v0.76.0
func (x TransactionPriorityProto) Number() protoreflect.EnumNumber
func (TransactionPriorityProto) String ¶
func (x TransactionPriorityProto) String() string
func (TransactionPriorityProto) Type ¶ added in v0.76.0
func (TransactionPriorityProto) Type() protoreflect.EnumType
type TransactionPrioritySorter ¶
type TransactionPrioritySorter interface {
Sort([]transaction.Transaction)
}
TransactionPrioritySorter is an interface to sort transactions.
type TransactionRetryQueue ¶
type TransactionRetryQueue struct {
// contains filtered or unexported fields
}
TransactionRetryQueue stores transactions in memory and flush them to disk when the memory limit is exceeded.
func BuildTransactionRetryQueue ¶
func BuildTransactionRetryQueue( log log.Component, maxMemSizeInBytes int, flushToStorageRatio float64, optionalDomainFolderPath string, optionalDiskUsageLimit *DiskUsageLimit, dropPrioritySorter TransactionPrioritySorter, resolver resolver.DomainResolver, pointCountTelemetry *PointCountTelemetry) *TransactionRetryQueue
BuildTransactionRetryQueue builds a new instance of TransactionRetryQueue
func NewTransactionRetryQueue ¶
func NewTransactionRetryQueue( dropPrioritySorter TransactionPrioritySorter, optionalTransactionStorage TransactionDiskStorage, maxMemSizeInBytes int, flushToStorageRatio float64, telemetry TransactionRetryQueueTelemetry, pointCountTelemetry *PointCountTelemetry) *TransactionRetryQueue
NewTransactionRetryQueue creates a new instance of NewTransactionRetryQueue
func (*TransactionRetryQueue) Add ¶
func (tc *TransactionRetryQueue) Add(t transaction.Transaction) (int, error)
Add adds a new transaction and flush transactions to disk if the memory limit is exceeded. The amount of transactions flushed to disk is control by `flushToStorageRatio` which is the ratio of the transactions to be flushed. Consider the following payload sizes 10, 20, 30, 40, 15 with `maxMemSizeInBytes=100` and `flushToStorageRatio=0.6` When adding the last payload `15`, the buffer becomes full (10+20+30+40+15 > 100) and 100*0.6=60 bytes must be flushed on disk. The first 3 transactions are flushed to the disk as 10 + 20 + 30 >= 60 If disk serialization failed or is not enabled, remove old transactions such as `currentMemSizeInBytes` <= `maxMemSizeInBytes`
func (*TransactionRetryQueue) ExtractTransactions ¶
func (tc *TransactionRetryQueue) ExtractTransactions() ([]transaction.Transaction, error)
ExtractTransactions extracts transactions from the container. If some transactions exist in memory extract them otherwise extract transactions from the disk. No transactions are in memory after calling this method.
func (*TransactionRetryQueue) FlushToDisk ¶
func (tc *TransactionRetryQueue) FlushToDisk() error
FlushToDisk is called on shutdown and persists all transactions to disk. The normal limits on capacity still apply, and the same rules are followed as during normal operation in terms of which transactions are dropped and which are persisted.
func (*TransactionRetryQueue) GetDiskSpaceUsed ¶
func (tc *TransactionRetryQueue) GetDiskSpaceUsed() int64
GetDiskSpaceUsed returns the current disk space used for storing transactions.
func (*TransactionRetryQueue) GetMaxMemSizeInBytes ¶
func (tc *TransactionRetryQueue) GetMaxMemSizeInBytes() int
GetMaxMemSizeInBytes gets the maximum memory usage for storing transactions
func (*TransactionRetryQueue) GetTransactionCount ¶
func (tc *TransactionRetryQueue) GetTransactionCount() int
GetTransactionCount gets the number of transactions in the container
type TransactionRetryQueueTelemetry ¶
type TransactionRetryQueueTelemetry struct {
// contains filtered or unexported fields
}
TransactionRetryQueueTelemetry handles the telemetry for TransactionRetryQueue
func NewTransactionRetryQueueTelemetry ¶
func NewTransactionRetryQueueTelemetry(domainName string) TransactionRetryQueueTelemetry
NewTransactionRetryQueueTelemetry creates a new TransactionRetryQueueTelemetry

