Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NewTelemetrySyncTask ¶
func NewTelemetrySyncTask(wrk worker.TelemetryMultiWorker, logger logging.LoggerInterface, period int) *asynctask.AsyncTask
NewTelemetrySyncTask constructs a task used to periodically record sdk configs and stats into the split servers
Types ¶
type Config ¶
type Config struct {
Name string
Logger logging.LoggerInterface
Worker Worker
InputBufferSize int
ProcessConcurrency int
ProcessBatchSize int
PostConcurrency int
MaxAccumWait time.Duration
HTTPTimeout time.Duration
}
Config contains the set of options/parameters to setup the eviction component
type EventWorkerConfig ¶
type EventWorkerConfig struct {
Logger logging.LoggerInterface
Storage storage.EventMultiSdkConsumer
EvictionMonitor evcalc.Monitor
URL string
Apikey string
FetchSize int
}
EventWorkerConfig bundles options
type EventsPipelineWorker ¶
type EventsPipelineWorker struct {
// contains filtered or unexported fields
}
EventsPipelineWorker implements all the required methods to work with a pipelined task
func NewEventsWorker ¶
func NewEventsWorker(cfg *EventWorkerConfig) (*EventsPipelineWorker, error)
NewEventsWorker builds a pipeline-suited events worker
func (*EventsPipelineWorker) BuildRequest ¶
func (i *EventsPipelineWorker) BuildRequest(data interface{}) (*http.Request, func(), error)
BuildRequest takes an intermediate object and generates an http request to post events
func (*EventsPipelineWorker) Fetch ¶
func (i *EventsPipelineWorker) Fetch() ([]string, error)
Fetch fetches raw events This interface is kinda inconsistent, since we really want byte streams to be deserialized, but because redis returns strings, we end up using that to avoid making copies. We should eventually revisit the redis client interface and see how feasible it is to return bytes directly.
func (*EventsPipelineWorker) Process ¶
func (i *EventsPipelineWorker) Process(raws [][]byte, sink chan<- interface{}) error
Process parses the raw data and packages the events
type ImpressionWorkerConfig ¶
type ImpressionWorkerConfig struct {
Logger logging.LoggerInterface
Storage storage.ImpressionMultiSdkConsumer
ImpressionCounter *provisional.ImpressionsCounter
ImpressionsListener impressionlistener.ImpressionBulkListener
Telemetry storage.TelemetryRuntimeProducer
EvictionMonitor evcalc.Monitor
URL string
Apikey string
FetchSize int
ImpressionsMode string
}
ImpressionWorkerConfig bundles options
type ImpressionsPipelineWorker ¶
type ImpressionsPipelineWorker struct {
// contains filtered or unexported fields
}
ImpressionsPipelineWorker implements all the required methods to work with a pipelined task
func NewImpressionWorker ¶
func NewImpressionWorker(cfg *ImpressionWorkerConfig) (*ImpressionsPipelineWorker, error)
NewImpressionWorker builds a pipeline-suited impressions worker
func (*ImpressionsPipelineWorker) BuildRequest ¶
func (i *ImpressionsPipelineWorker) BuildRequest(data interface{}) (*http.Request, func(), error)
BuildRequest takes an intermediate object and generates an http request to post impressions
func (*ImpressionsPipelineWorker) Fetch ¶
func (i *ImpressionsPipelineWorker) Fetch() ([]string, error)
Fetch fetches raw impressions This interface is kinda inconsistent, since we really want byte streams to be deserialized, but because redis returns strings, we end up using that to avoid making copies. We should eventually revisit the redis client interface and see how feasible it is to return bytes directly.
func (*ImpressionsPipelineWorker) Process ¶
func (i *ImpressionsPipelineWorker) Process(raws [][]byte, sink chan<- interface{}) error
Process parses the raw data and packages the impressions
type PipelinedSyncTask ¶
type PipelinedSyncTask struct {
// contains filtered or unexported fields
}
PipelinedSyncTask implements a fetch-process-evict buffered flow the decoupling of such operations and use of buffers in between allows different steps to be scaled individually in order to maximize throughput
func NewPipelinedTask ¶
func NewPipelinedTask(config *Config) (*PipelinedSyncTask, error)
NewPipelinedTask constructs a pipelined task
func (*PipelinedSyncTask) IsRunning ¶
func (p *PipelinedSyncTask) IsRunning() bool
IsRunning returns whether the task is running or not
func (*PipelinedSyncTask) Stop ¶
func (p *PipelinedSyncTask) Stop(blocking bool) error
Stop the task and drain the pipe