defaultforwarderimpl

package
v0.81.0-rc.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 19, 2026 License: Apache-2.0 Imports: 46 Imported by: 0

Documentation

Overview

Package defaultforwarderimpl implements the default forwarder component.

Index

Constants

View Source
const (
	// Stopped represent the internal state of an unstarted Forwarder.
	Stopped uint32 = iota
	// Started represent the internal state of an started Forwarder.
	Started
	// Disabled represent the internal state of a disabled Forwarder.
	Disabled
)
View Source
const (
	// CoreFeatures bitmask to enable specific core features
	CoreFeatures = defaultforwarderdef.CoreFeatures
	// TraceFeatures bitmask to enable specific trace features
	TraceFeatures = defaultforwarderdef.TraceFeatures
	// ProcessFeatures bitmask to enable specific process features
	ProcessFeatures = defaultforwarderdef.ProcessFeatures
	// SysProbeFeatures bitmask to enable specific system-probe features
	SysProbeFeatures = defaultforwarderdef.SysProbeFeatures
)

Variables

This section is empty.

Functions

func HasFeature

func HasFeature(features, flag Features) bool

HasFeature lets you know if a specific feature flag is set in a feature set

func NewForwarder

func NewForwarder(config config.Component, log log.Component, lc compdef.Lifecycle, ignoreLifeCycleError bool, options *Options) provides

NewForwarder returns a new forwarder component.

func NewForwarderFromDeps

func NewForwarderFromDeps(dep dependencies) (provides, error)

NewForwarderFromDeps is an exported wrapper around newForwarder for use with fx.

func NewHTTPClient

func NewHTTPClient(config config.Component, numberOfWorkers int, log log.Component) *http.Client

NewHTTPClient creates a new http.Client

func NewHTTPTransport

func NewHTTPTransport(config config.Component, numberOfWorkers int, log log.Component) *http.Transport

NewHTTPTransport creates a new http.Transport

func NewMockForwarder

func NewMockForwarder(config config.Component, log log.Component, secrets secrets.Component) provides

NewMockForwarder provides a mock forwarder component for use with fx.

Types

type DefaultForwarder

type DefaultForwarder struct {

	// NumberOfWorkers Number of concurrent HTTP request made by the DefaultForwarder (default 4).
	NumberOfWorkers int
	// contains filtered or unexported fields
}

DefaultForwarder is the default implementation of the defaultforwarderdef.Forwarder.

func NewDefaultForwarder

func NewDefaultForwarder(config config.Component, log log.Component, options *Options) *DefaultForwarder

NewDefaultForwarder returns a new DefaultForwarder. TODO: (components) Remove this method and other exported methods in comp/forwarder.

func (*DefaultForwarder) GetDomainResolvers

func (f *DefaultForwarder) GetDomainResolvers() []pkgresolver.DomainResolver

GetDomainResolvers returns the list of resolvers used by this forwarder.

func (*DefaultForwarder) Start

func (f *DefaultForwarder) Start() error

Start initialize and runs the forwarder.

func (*DefaultForwarder) State

func (f *DefaultForwarder) State() uint32

State returns the internal state of the forwarder (Started or Stopped)

func (*DefaultForwarder) Stop

func (f *DefaultForwarder) Stop()

Stop all the component of a forwarder and free resources

func (*DefaultForwarder) SubmitAgentChecksMetadata

func (f *DefaultForwarder) SubmitAgentChecksMetadata(payload transaction.BytesPayloads, extra http.Header) error

SubmitAgentChecksMetadata will send a agentchecks_metadata tag type payload to Datadog backend.

func (*DefaultForwarder) SubmitConnectionChecks

func (f *DefaultForwarder) SubmitConnectionChecks(payload transaction.BytesPayloads, extra http.Header) (chan defaultforwarderdef.Response, error)

SubmitConnectionChecks sends connection checks

func (*DefaultForwarder) SubmitContainerChecks

func (f *DefaultForwarder) SubmitContainerChecks(payload transaction.BytesPayloads, extra http.Header) (chan defaultforwarderdef.Response, error)

SubmitContainerChecks sends container checks

func (*DefaultForwarder) SubmitHostMetadata

func (f *DefaultForwarder) SubmitHostMetadata(payload transaction.BytesPayloads, extra http.Header) error

SubmitHostMetadata will send a host_metadata tag type payload to Datadog backend.

func (*DefaultForwarder) SubmitMetadata

func (f *DefaultForwarder) SubmitMetadata(payload transaction.BytesPayloads, extra http.Header) error

SubmitMetadata will send a metadata type payload to Datadog backend.

func (*DefaultForwarder) SubmitOrchestratorChecks

func (f *DefaultForwarder) SubmitOrchestratorChecks(payload transaction.BytesPayloads, extra http.Header, payloadType int) error

SubmitOrchestratorChecks sends orchestrator checks

func (*DefaultForwarder) SubmitOrchestratorManifests

func (f *DefaultForwarder) SubmitOrchestratorManifests(payload transaction.BytesPayloads, extra http.Header) error

SubmitOrchestratorManifests sends orchestrator manifests

func (*DefaultForwarder) SubmitProcessChecks

func (f *DefaultForwarder) SubmitProcessChecks(payload transaction.BytesPayloads, extra http.Header) (chan defaultforwarderdef.Response, error)

SubmitProcessChecks sends process checks

func (*DefaultForwarder) SubmitProcessDiscoveryChecks

func (f *DefaultForwarder) SubmitProcessDiscoveryChecks(payload transaction.BytesPayloads, extra http.Header) (chan defaultforwarderdef.Response, error)

SubmitProcessDiscoveryChecks sends process discovery checks

func (*DefaultForwarder) SubmitRTContainerChecks

func (f *DefaultForwarder) SubmitRTContainerChecks(payload transaction.BytesPayloads, extra http.Header) (chan defaultforwarderdef.Response, error)

SubmitRTContainerChecks sends real time container checks

func (*DefaultForwarder) SubmitRTProcessChecks

func (f *DefaultForwarder) SubmitRTProcessChecks(payload transaction.BytesPayloads, extra http.Header) (chan defaultforwarderdef.Response, error)

SubmitRTProcessChecks sends real time process checks

func (*DefaultForwarder) SubmitSeries

func (f *DefaultForwarder) SubmitSeries(payloads transaction.BytesPayloads, extra http.Header) error

SubmitSeries will send timeseries to the v2 endpoint

func (*DefaultForwarder) SubmitSketchSeries

func (f *DefaultForwarder) SubmitSketchSeries(payload transaction.BytesPayloads, extra http.Header) error

SubmitSketchSeries will send payloads to Datadog backend - PROTOTYPE FOR PERCENTILE

func (*DefaultForwarder) SubmitTransaction

func (f *DefaultForwarder) SubmitTransaction(t *transaction.HTTPTransaction) error

SubmitTransaction adds a transaction to the queue for sending.

func (*DefaultForwarder) SubmitV1CheckRuns

func (f *DefaultForwarder) SubmitV1CheckRuns(payload transaction.BytesPayloads, extra http.Header) error

SubmitV1CheckRuns will send service checks to v1 endpoint (this will be removed once the backend handles v2 endpoints).

func (*DefaultForwarder) SubmitV1Intake

func (f *DefaultForwarder) SubmitV1Intake(payload transaction.BytesPayloads, kind transaction.Kind, extra http.Header) error

SubmitV1Intake will send payloads to the universal `/intake/` endpoint used by Agent v.5

func (*DefaultForwarder) SubmitV1IntakeDirect

func (f *DefaultForwarder) SubmitV1IntakeDirect(ctx context.Context, payload transaction.BytesPayloads, kind transaction.Kind, extra http.Header) error

SubmitV1IntakeDirect sends payloads to the universal `/intake/` endpoint synchronously, bypassing the forwarder worker queue.

This is intended for bounded one-shot lifecycle telemetry during shutdown. It reuses the normal HTTPTransaction construction path, but avoids the worker queue so the caller can know whether the request completed before returning.

func (*DefaultForwarder) SubmitV1Series

func (f *DefaultForwarder) SubmitV1Series(payloads transaction.BytesPayloads, extra http.Header) error

SubmitV1Series will send timeserie to v1 endpoint (this will be remove once the backend handles v2 endpoints).

type Features

type Features = defaultforwarderdef.Features

Features is a bitmask to enable specific forwarder features. This is a type alias for defaultforwarderdef.Features.

func ClearFeature

func ClearFeature(features, flag Features) Features

ClearFeature clears forwarder features from a feature set

func SetFeature

func SetFeature(features, flag Features) Features

SetFeature sets forwarder features in a feature set

func ToggleFeature

func ToggleFeature(features, flag Features) Features

ToggleFeature toggles forwarder features in a feature set

type Forwarder

type Forwarder = defaultforwarderdef.Forwarder

Forwarder interface allows packages to send payload to the backend. This is a type alias for defaultforwarderdef.Forwarder.

type Options

type Options struct {
	NumberOfWorkers                int
	RetryQueuePayloadsTotalMaxSize int
	DisableAPIKeyChecking          bool
	EnabledFeatures                Features
	APIKeyValidationInterval       time.Duration
	DomainResolvers                map[string]pkgresolver.DomainResolver
	ConnectionResetInterval        time.Duration
	Secrets                        secrets.Component
	// contains filtered or unexported fields
}

Options contain the configuration options for the DefaultForwarder

func NewOptions

func NewOptions(config config.Component, log log.Component, keysPerDomain map[string][]utils.APIKeys) (*Options, error)

NewOptions creates a configuration for the forwarder with OPW enabled.

Deprecated, use NewOptionsWithOPW instead.

func NewOptionsWithOPW

func NewOptionsWithOPW(config config.Component, log log.Component, eds utils.EndpointDescriptorSet) (*Options, error)

NewOptionsWithOPW creates a configuration for the forwarder with OPW enabled.

func NewOptionsWithResolvers

func NewOptionsWithResolvers(config config.Component, log log.Component, domainResolvers map[string]pkgresolver.DomainResolver) *Options

NewOptionsWithResolvers creates new Options with default values

func (*Options) SetEnabledFeatures

func (o *Options) SetEnabledFeatures(features []Features)

SetEnabledFeatures sets the features enabled

type PointCountTelemetry

type PointCountTelemetry interface {
	OnPointSuccessfullySent(count int)
	OnPointDropped(count int)
}

PointCountTelemetry tracks the number of points that were either successfully delivered or dropped by the forwarder.

type Response

type Response = defaultforwarderdef.Response

Response contains the response details of a successfully posted transaction. This is a type alias for defaultforwarderdef.Response.

type SharedConnection

type SharedConnection struct {
	// contains filtered or unexported fields
}

SharedConnection holds a shared http.Client that is used by each worker. Access to the client is protected by an RWMutex.

func NewSharedConnection

func NewSharedConnection(
	log log.Component,
	isLocal bool,
	numberOfWorkers int,
	config config.Component,
	transport http.RoundTripper,
) *SharedConnection

NewSharedConnection creates a new shared connection with the given http.Client.

func (*SharedConnection) GetClient

func (sc *SharedConnection) GetClient() *http.Client

GetClient returns the http.Client.

func (*SharedConnection) ResetClient

func (sc *SharedConnection) ResetClient()

ResetClient replaces the client with a newly created one.

type SyncForwarder

type SyncForwarder struct {
	// contains filtered or unexported fields
}

SyncForwarder is a very simple Forwarder synchronously sending the data to the intake.

func NewSyncForwarder

func NewSyncForwarder(config config.Component, log log.Component, secrets secrets.Component, endpoints utils.EndpointDescriptorSet, timeout time.Duration) (*SyncForwarder, error)

NewSyncForwarder returns a new synchronous forwarder.

func (*SyncForwarder) GetDomainResolvers

func (f *SyncForwarder) GetDomainResolvers() []resolver.DomainResolver

GetDomainResolvers returns the list of resolvers used by this forwarder.

func (*SyncForwarder) Start

func (f *SyncForwarder) Start() error

Start starts the sync forwarder: nothing to do.

func (*SyncForwarder) Stop

func (f *SyncForwarder) Stop()

Stop stops the sync forwarder: nothing to do.

func (*SyncForwarder) SubmitAgentChecksMetadata

func (f *SyncForwarder) SubmitAgentChecksMetadata(payload transaction.BytesPayloads, extra http.Header) error

SubmitAgentChecksMetadata will send a agentchecks_metadata tag type payload to Datadog backend.

func (*SyncForwarder) SubmitConnectionChecks

func (f *SyncForwarder) SubmitConnectionChecks(payload transaction.BytesPayloads, extra http.Header) (chan Response, error)

SubmitConnectionChecks sends connection checks

func (*SyncForwarder) SubmitContainerChecks

func (f *SyncForwarder) SubmitContainerChecks(payload transaction.BytesPayloads, extra http.Header) (chan Response, error)

SubmitContainerChecks sends container checks

func (*SyncForwarder) SubmitHostMetadata

func (f *SyncForwarder) SubmitHostMetadata(payload transaction.BytesPayloads, extra http.Header) error

SubmitHostMetadata will send a host_metadata tag type payload to Datadog backend.

func (*SyncForwarder) SubmitMetadata

func (f *SyncForwarder) SubmitMetadata(payload transaction.BytesPayloads, extra http.Header) error

SubmitMetadata will send a metadata type payload to Datadog backend.

func (*SyncForwarder) SubmitOrchestratorChecks

func (f *SyncForwarder) SubmitOrchestratorChecks(payload transaction.BytesPayloads, extra http.Header, payloadType int) error

SubmitOrchestratorChecks sends orchestrator checks

func (*SyncForwarder) SubmitOrchestratorManifests

func (f *SyncForwarder) SubmitOrchestratorManifests(payload transaction.BytesPayloads, extra http.Header) error

SubmitOrchestratorManifests sends orchestrator manifests

func (*SyncForwarder) SubmitProcessChecks

func (f *SyncForwarder) SubmitProcessChecks(payload transaction.BytesPayloads, extra http.Header) (chan Response, error)

SubmitProcessChecks sends process checks

func (*SyncForwarder) SubmitProcessDiscoveryChecks

func (f *SyncForwarder) SubmitProcessDiscoveryChecks(payload transaction.BytesPayloads, extra http.Header) (chan Response, error)

SubmitProcessDiscoveryChecks sends process discovery checks

func (*SyncForwarder) SubmitRTContainerChecks

func (f *SyncForwarder) SubmitRTContainerChecks(payload transaction.BytesPayloads, extra http.Header) (chan Response, error)

SubmitRTContainerChecks sends real time container checks

func (*SyncForwarder) SubmitRTProcessChecks

func (f *SyncForwarder) SubmitRTProcessChecks(payload transaction.BytesPayloads, extra http.Header) (chan Response, error)

SubmitRTProcessChecks sends real time process checks

func (*SyncForwarder) SubmitTransaction

func (f *SyncForwarder) SubmitTransaction(txn *transaction.HTTPTransaction) error

SubmitTransaction adds a transaction to the queue for sending.

func (*SyncForwarder) SubmitV1CheckRuns

func (f *SyncForwarder) SubmitV1CheckRuns(payload transaction.BytesPayloads, extra http.Header) error

SubmitV1CheckRuns will send service checks to v1 endpoint (this will be removed once the backend handles v2 endpoints).

func (*SyncForwarder) SubmitV1Intake

func (f *SyncForwarder) SubmitV1Intake(payload transaction.BytesPayloads, kind transaction.Kind, extra http.Header) error

SubmitV1Intake will send payloads to the universal `/intake/` endpoint used by Agent v.5

func (*SyncForwarder) SubmitV1IntakeDirect

func (f *SyncForwarder) SubmitV1IntakeDirect(_ context.Context, payload transaction.BytesPayloads, kind transaction.Kind, extra http.Header) error

SubmitV1IntakeDirect sends payloads synchronously to the universal `/intake/` endpoint.

func (*SyncForwarder) SubmitV1Series

func (f *SyncForwarder) SubmitV1Series(payload transaction.BytesPayloads, extra http.Header) error

SubmitV1Series will send timeserie to v1 endpoint (this will be remove once the backend handles v2 endpoints).

type Worker

type Worker struct {

	// Client the http client used to processed transactions.
	Client *SharedConnection
	// HighPrio is the channel used to receive high priority transaction from the Forwarder.
	HighPrio <-chan transaction.Transaction
	// LowPrio is the channel used to receive low priority transaction from the Forwarder.
	LowPrio <-chan transaction.Transaction
	// RequeueChan is the channel used to send failed transaction back to the Forwarder.
	RequeueChan chan<- transaction.Transaction
	// contains filtered or unexported fields
}

Worker consumes Transaction (aka transactions) from the Forwarder and processes them. If the transaction fails to be processed the Worker will send it back to the Forwarder to be retried later.

func NewWorker

func NewWorker(
	config config.Component,
	log log.Component,
	secrets secrets.Component,
	highPrioChan <-chan transaction.Transaction,
	lowPrioChan <-chan transaction.Transaction,
	requeueChan chan<- transaction.Transaction,
	blocked *blockedEndpoints,
	pointCountTelemetry PointCountTelemetry,
	httpClient *SharedConnection,
) *Worker

NewWorker returns a new worker to consume Transaction from inputChan and push back erroneous ones into requeueChan.

func (*Worker) Start

func (w *Worker) Start()

Start starts a Worker.

func (*Worker) Stop

func (w *Worker) Stop(purgeHighPrio bool)

Stop stops the worker.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL