telemetry

package
v1.0.13 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 18, 2026 License: Apache-2.0 Imports: 20 Imported by: 0

Documentation

Index

Constants

View Source
const (
	KratosPipelineSSATokenEndpoint = "https://w6rojyggn16dpp37xuunjjnvczxdhjrkobq393rkkae.ssa.nvidia.com" //nolint:gosec
	KratosAPIEndpoint              = "https://api.kratos.nvidia.com"
	SsaTokenRefreshTime            = 850
)

Variables

View Source
var AwsRegion = "us-west-2"
View Source
var BulkSyncReadScopes = []string{"bulkuploads-read"}

Functions

This section is empty.

Types

type Exporter

type Exporter interface {
	SendSync(ctx context.Context, events []cloudevent.Event) error
	SendAsync(ctx context.Context, event cloudevent.Event) error
	SendAsyncList(ctx context.Context, events []cloudevent.Event) error
}

Exporter describes the functionality that is required to send cloud events

type KratosExporter

type KratosExporter struct {
	// contains filtered or unexported fields
}

KratosExporter is a Kratos client that can post events to the Kratos data platform

func NewKratosExporter

func NewKratosExporter(cfg *KratosExporterConfig) (*KratosExporter, error)

NewKratosExporter creates a new Kratos Telemetry Client

func (*KratosExporter) Close

func (k *KratosExporter) Close()

Close closes the internal buffered channel and waits for all events in the channel to be flushed to Kratos data collector. When the pod receives a SIGKILL/SIGTERM the client should invoke the Close() method and then all the events in the buffer will be flushed out.

func (*KratosExporter) SendAsync

func (k *KratosExporter) SendAsync(ctx context.Context, event cloudevent.Event) error

SendAsync adds an event to the buffer which is flushed periodically If the buffer is full, it drops the events with warning

func (*KratosExporter) SendAsyncList

func (k *KratosExporter) SendAsyncList(ctx context.Context, events []cloudevent.Event) error

SendAsyncMultiple adds events to buffer which are flushed periodically If the buffer is full, it drops the events with warning

func (*KratosExporter) SendSync

func (k *KratosExporter) SendSync(ctx context.Context, events []cloudevent.Event) error

SendSync sends the events synchronously, with a configurable batching mechanism

type KratosExporterConfig

type KratosExporterConfig struct {
	// ClientCfg is the configuration of the base client
	ClientCfg *clients.BaseClientConfig
	// CollectorID is the ID of the kratos collector
	CollectorID string
	// NumRetries determines the number of retry attempts excluding the first request
	// zero of the value means no retry, and maximum allowed value of NumRetries is 2
	// The default value is 2, thus total attempts including the first attempt is 3.
	NumRetries int
	// FlushInterval determines the time interval in SECOND unit of the periodic flush operation when sending events asynchronously
	FlushInterval int
	// MaxBatchSize determines the maximum allowed number of events per request to be sent to the Kratos collector
	// The default value is 1000, and the goal is to decrease the number of hitting to the Kratos collector
	MaxBatchSize int
	// APIVersion specifies desired analytics API version
	APIVersion string
	// Base delay in seconds for exponential backoff retries
	BaseDelayInSeconds int
	// MaxEvents is the max number of events that the buffer can hold
	MaxEvents int
	// MaxWorkers is the number of worker go routines which are used to flush periodically
	MaxWorkers int
	// NumRetriesExponentialBackoff determines the number of retry attempts for exponential backoff
	// while sending events to Kratos
	NumRetriesExponentialBackoff int
	// MaxBufferSize determines the max buffer size of the task channel
	MaxBufferSize int
	// CustomHTTPClient allows providing a custom HTTP client to use instead of creating a new one
	CustomHTTPClient *http.Client
	// SyncMaxRetries defines the  number of retry attempts for the sync mode
	SyncMaxRetries int
	// SyncBaseDelayInSeconds defines delay in seconds for exponential backoff retries for the sync mode
	SyncBaseDelayInSeconds int
	// MaxBatchSizeInBytes defines the maximum batch size in bytes
	MaxBatchSizeInBytes int
	// SyncBatchTimeoutMs defines the timeout in ms for sync batch
	SyncBatchTimeoutMs int
	// SyncBatchQueueSize defines the initial size of the sync batch queue
	SyncBatchQueueSize int
}

KratosExporterConfig consists of various config necessary for creating a KratosTelemetry Client

func NewKratosExporterConfig

func NewKratosExporterConfig(kratosSSAHost string, kratosClientID string, kratosClientSecret string, collectorId string, opts ...Option) (*KratosExporterConfig, error)

NewKratosExporterConfig creates a new KratosTelemetry config object with required parameters

func NewKratosExporterConfigWithCredentialsFile

func NewKratosExporterConfigWithCredentialsFile(kratosSSAHost string, credsFile string, collectorId string, opts ...Option) (*KratosExporterConfig, error)

NewKratosExporterConfigWithCredsFile creates a new KratosTelemetry config object with required parameters and cred files

func (*KratosExporterConfig) AddCommandFlags

func (cfg *KratosExporterConfig) AddCommandFlags(cmd *cobra.Command) bool

AddCommandFlags is a helper method to add config flags to cobra command

type KratosPipelineClient

type KratosPipelineClient struct {
	// contains filtered or unexported fields
}

func NewKratosPipelineClient

func NewKratosPipelineClient(kratosSsaHost string, ssaClientId string, ssaClientSecret string) *KratosPipelineClient

Create a Kratos pipeline client

func (*KratosPipelineClient) GetS3Client

func (p *KratosPipelineClient) GetS3Client(ctx context.Context, tenantId string, namespaceId string) (*s3.Client, error)

Create the s3 session, s3 object with the specified tenant ID and namespace ID.

type KratosStsToken

type KratosStsToken struct {
	Status          int    `json:"status"`
	AccessKeyId     string `json:"accessKeyId"`
	SecretAccessKey string `json:"secretAccessKey"`
	SessionToken    string `json:"sessionToken"`
}

type Option

type Option func(config *KratosExporterConfig) error

Option provides more configuration for KratosTelemetry

func WithBaseDelayInSeconds

func WithBaseDelayInSeconds(delay int) Option

WithBaseDelayInSeconds configures the base delay while retrying using exponential backoff

func WithEndpoint

func WithEndpoint(endpoint string) Option

func WithFlushInterval

func WithFlushInterval(interval int) Option

WithFlushInterval configures the flush interval when periodic sending the events storing in the buffer. the value must be larger than zero

func WithMaxBatchSize

func WithMaxBatchSize(size int) Option

WithMaxBatchSize configures the maximum number of events per request

func WithMaxBatchSizeInBytes

func WithMaxBatchSizeInBytes(size int) Option

WithMaxBatchSizeInBytes configures the maximum batch size in bytes

func WithMaxBufferSize

func WithMaxBufferSize(bufferSize int) Option

WithMaxBufferSize configures the max buffer size of the task channel

func WithMaxEvents

func WithMaxEvents(capacity int) Option

WithMaxEvents configures the maximum size of the buffer

func WithMaxWorkers

func WithMaxWorkers(count int) Option

WithMaxWorkers configures the number of worker go routines which are used to flush periodically

func WithNumRetries

func WithNumRetries(retries int) Option

WithNumRetries configures the retry functionality of the KratosTelemetry client

func WithNumRetriesExponentialBackoff

func WithNumRetriesExponentialBackoff(retryCount int) Option

WithNumRetriesExponentialBackoff configures the number of worker go routines which are used to flush periodically

func WithRefreshConfig

func WithRefreshConfig(cfg auth.RefreshConfig) Option

WithRefreshConfig adds authn refresh configuration to the KratosTelemetry client

func WithSyncBaseDelayInSeconds

func WithSyncBaseDelayInSeconds(delay int) Option

WithSyncBaseDelayInSeconds configures the base delay in seconds for exponential backoff retries in sync mode

func WithSyncBatchQueueSize

func WithSyncBatchQueueSize(size int) Option

WithSyncBatchQueueSize configures the initial size of the sync batch queue

func WithSyncBatchTimeoutMs

func WithSyncBatchTimeoutMs(timeout int) Option

WithSyncBatchTimeoutMs configures the timeout in milliseconds for sync batch

func WithSyncMaxRetries

func WithSyncMaxRetries(retries int) Option

WithSyncMaxRetries configures the maximum number of retry attempts for sync mode

func WithTLS

func WithTLS(cfg auth.TLSConfigOptions) Option

WithTLS adds TLS configuration to the KratosTelemetry client

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL