transport

package
v0.0.75 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 27, 2026 License: Apache-2.0 Imports: 23 Imported by: 0

Documentation

Overview

internal/transport/dakr_client.go

internal/transport/interface.go

internal/transport/sender.go

internal/transport/telemetry_sender.go

Index

Constants

View Source
const (
	DefaultOperatorType = "zxporter"

	// Header keys for client identification in transport layer
	HeaderClient          = "X-Client"
	HeaderOperatorType    = "X-Operator-Type"
	HeaderOperatorVersion = "X-Operator-Version"
	HeaderOperatorGitSHA  = "X-Operator-Git-SHA"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type ClientHeaders added in v0.0.63

type ClientHeaders struct {
	// contains filtered or unexported fields
}

ClientHeaders holds the operator identification information for transport headers. This struct is used by the interceptor to access current client information.

func NewClientHeaders added in v0.0.63

func NewClientHeaders(clusterToken string) *ClientHeaders

NewClientHeaders creates a new ClientHeaders instance

func (*ClientHeaders) AttachToRequest added in v0.0.63

func (h *ClientHeaders) AttachToRequest(header http.Header)

AttachToRequest adds all client identification headers to a Connect request. This is the core method that sets: - Authorization: Bearer token for authentication - X-Client: Combined operator type and version (e.g., "zxporter/1.0.0") - X-Operator-Type: The operator type identifier - X-Operator-Version: The operator version - X-Operator-Git-SHA: The git commit SHA of the operator build

func (*ClientHeaders) GetClusterToken added in v0.0.63

func (h *ClientHeaders) GetClusterToken() string

GetClusterToken returns the current cluster token (thread-safe)

func (*ClientHeaders) SetClusterToken added in v0.0.63

func (h *ClientHeaders) SetClusterToken(token string)

SetClusterToken updates the cluster token (thread-safe)

type DakrClient

type DakrClient interface {
	// SendResource sends any resource data to Dakr
	SendResource(ctx context.Context, resource collector.CollectedResource) (string, error)
	// SendResourceBatch sends a batch of resources of the same type to Dakr
	SendResourceBatch(
		ctx context.Context,
		resources []collector.CollectedResource,
		resourceType collector.ResourceType,
	) (string, error)
	// SendTelemetryMetrics sends telemetry metrics to Dakr
	SendTelemetryMetrics(ctx context.Context, metrics []*dto.MetricFamily) (int32, error)
	// SendClusterSnapshotStream sends cluster snapshot data via streaming for large payloads
	SendClusterSnapshotStream(
		ctx context.Context,
		snapshot *gen.ClusterSnapshot,
		snapshotID string,
		timestamp time.Time,
	) (string, *gen.ClusterSnapshot, error)
	// telemetry_logger.TelemetryLogSender sends a batch of log entries to Dakr
	telemetry_logger.TelemetryLogSender
	// ExchangePATForClusterToken exchanges a PAT token for a cluster token
	ExchangePATForClusterToken(
		ctx context.Context,
		patToken, clusterName, k8sProvider string,
	) (string, string, error)

	// SendNetworkTrafficMetrics pushes network traffic metrics from a node
	SendNetworkTrafficMetrics(
		ctx context.Context,
		req *gen.SendNetworkTrafficMetricsRequest,
	) (*gen.SendNetworkTrafficMetricsResponse, error)

	// ReportHealth
	ReportHealth(ctx context.Context, req *gen.ReportHealthRequest) error
}

DakrClient defines methods for sending data to Dakr

func NewDakrClient

func NewDakrClient(dakrBaseURL string, clusterToken string, logger logr.Logger) DakrClient

NewDakrClient creates a new client for Dakr service

func NewSimpleDakrClient

func NewSimpleDakrClient(logger logr.Logger) DakrClient

NewSimpleDakrClient creates a new simple Dakr client for development/testing

type DirectDakrSender

type DirectDakrSender struct {
	// contains filtered or unexported fields
}

DirectDakrSender sends resources directly to Dakr without buffering

func (*DirectDakrSender) Send

Send transmits a resource directly to Dakr

func (*DirectDakrSender) SendBatch

func (s *DirectDakrSender) SendBatch(
	ctx context.Context,
	resources []collector.CollectedResource,
	resourceType collector.ResourceType,
) (string, error)

SendBatch transmits a batch of resources of the same type to Dakr

func (*DirectDakrSender) SendClusterSnapshotStream

func (s *DirectDakrSender) SendClusterSnapshotStream(
	ctx context.Context,
	snapshot *gen.ClusterSnapshot,
	snapshotID string,
	timestamp time.Time,
) (string, *gen.ClusterSnapshot, error)

Update sender.go with fallback logic

func (*DirectDakrSender) SetClusterID

func (s *DirectDakrSender) SetClusterID(clusterID string)

func (*DirectDakrSender) SetTeamID

func (s *DirectDakrSender) SetTeamID(teamID string)

func (*DirectDakrSender) Start

func (s *DirectDakrSender) Start(ctx context.Context) error

Start initializes the sender (no-op for direct sender)

func (*DirectDakrSender) Stop

func (s *DirectDakrSender) Stop() error

Stop cleans up resources (no-op for direct sender)

type DirectSender

type DirectSender interface {
	// SendBatch transmits a batch of resources of the same type to the target system
	SendBatch(
		ctx context.Context,
		resource []collector.CollectedResource,
		resourceType collector.ResourceType,
	) (string, error)

	// Send transmits a resource to the target system
	Send(ctx context.Context, resource collector.CollectedResource) (string, error)

	// SendClusterSnapshotStream sends large cluster snapshot data using streaming
	SendClusterSnapshotStream(
		ctx context.Context,
		snapshot *gen.ClusterSnapshot,
		snapshotID string,
		timestamp time.Time,
	) (string, *gen.ClusterSnapshot, error)

	// telemetry_logger.TelemetryLogSender sends a batch of log entries.
	telemetry_logger.TelemetryLogSender
}

DirectSender defines methods for sending data to external systems directly

func NewDirectSender

func NewDirectSender(dakrClient DakrClient, logger logr.Logger) DirectSender

NewDirectSender creates a new DirectSender. It expects a non-nil DakrClient.

type RealDakrClient

type RealDakrClient struct {
	// contains filtered or unexported fields
}

RealDakrClient implements communication with Dakr service

func (*RealDakrClient) ExchangePATForClusterToken

func (c *RealDakrClient) ExchangePATForClusterToken(
	ctx context.Context,
	patToken, clusterName, k8sProvider string,
) (string, string, error)

ExchangePATForClusterToken exchanges a PAT token for a cluster token

func (*RealDakrClient) ReportHealth added in v0.0.64

func (c *RealDakrClient) ReportHealth(ctx context.Context, req *gen.ReportHealthRequest) error

ReportHealth reports the health status of the operator to Dakr

func (*RealDakrClient) SendClusterSnapshotStream

func (c *RealDakrClient) SendClusterSnapshotStream(
	ctx context.Context,
	snapshot *gen.ClusterSnapshot,
	snapshotID string,
	timestamp time.Time,
) (string, *gen.ClusterSnapshot, error)

SendClusterSnapshotStream sends cluster snapshot data in chunks via streaming

func (*RealDakrClient) SendNetworkTrafficMetrics

SendNetworkTrafficMetrics pushes network traffic metrics from a node

func (*RealDakrClient) SendResource

func (c *RealDakrClient) SendResource(
	ctx context.Context,
	resource collector.CollectedResource,
) (string, error)

SendResource sends the resource to Dakr through gRPC

func (*RealDakrClient) SendResourceBatch

func (c *RealDakrClient) SendResourceBatch(
	ctx context.Context,
	resources []collector.CollectedResource,
	resourceType collector.ResourceType,
) (string, error)

SendResourceBatch sends a batch of resources to Dakr through gRPC

func (*RealDakrClient) SendTelemetryLogs

SendTelemetryLogs sends a batch of log entries to the Dakr service.

func (*RealDakrClient) SendTelemetryMetrics

func (c *RealDakrClient) SendTelemetryMetrics(
	ctx context.Context,
	metrics []*dto.MetricFamily,
) (int32, error)

SendTelemetryMetrics sends telemetry metrics to Dakr

type RetryPolicy

type RetryPolicy struct {
	MaxAttempts    int
	InitialBackoff time.Duration
	MaxBackoff     time.Duration
	IsRetryable    func(err error) bool
}

RetryPolicy defines the parameters for retrying.

type Sender

type Sender interface {
	// Send transmits a resource to the target system
	Send(ctx context.Context, resource collector.CollectedResource) (string, error)

	// Start initializes the sender (establishing connections, etc.)
	Start(ctx context.Context) error

	// Stop cleans up resources
	Stop() error
}

Sender defines methods for sending data to external systems

func NewDirectDakrSender

func NewDirectDakrSender(dakrClient DakrClient, logger logr.Logger) Sender

type SimpleDakrClient

type SimpleDakrClient struct {
	// contains filtered or unexported fields
}

SimpleDakrClient is a placeholder implementation of DakrClient

func (*SimpleDakrClient) ExchangePATForClusterToken

func (c *SimpleDakrClient) ExchangePATForClusterToken(
	ctx context.Context,
	patToken, clusterName, k8sProvider string,
) (string, string, error)

ExchangePATForClusterToken implements SimpleDakrClient.

func (*SimpleDakrClient) ReportHealth added in v0.0.64

func (c *SimpleDakrClient) ReportHealth(ctx context.Context, req *gen.ReportHealthRequest) error

ReportHealth implements SimpleDakrClient.

func (*SimpleDakrClient) SendClusterSnapshotStream

func (c *SimpleDakrClient) SendClusterSnapshotStream(
	ctx context.Context,
	snapshot *gen.ClusterSnapshot,
	snapshotID string,
	timestamp time.Time,
) (string, *gen.ClusterSnapshot, error)

Update sender.go with fallback logic

func (*SimpleDakrClient) SendNetworkTrafficMetrics

SendNetworkTrafficMetrics implements SimpleDakrClient.

func (*SimpleDakrClient) SendResource

func (c *SimpleDakrClient) SendResource(
	ctx context.Context,
	resource collector.CollectedResource,
) (string, error)

SendResource logs the resource data (for development/testing)

func (*SimpleDakrClient) SendResourceBatch

func (c *SimpleDakrClient) SendResourceBatch(
	ctx context.Context,
	resources []collector.CollectedResource,
	resourceType collector.ResourceType,
) (string, error)

SendResourceBatch logs the batch resource data (for development/testing)

func (*SimpleDakrClient) SendTelemetryLogs

SendTelemetryLogs implements SimpleDakrClient.

func (*SimpleDakrClient) SendTelemetryMetrics

func (c *SimpleDakrClient) SendTelemetryMetrics(
	ctx context.Context,
	metrics []*dto.MetricFamily,
) (int32, error)

SendTelemetryMetrics logs the telemetry metrics data (for development/testing)

type TelemetrySender

type TelemetrySender struct {
	// contains filtered or unexported fields
}

TelemetrySender is responsible for periodically sending metrics to the DAKR server

func NewTelemetrySender

func NewTelemetrySender(
	logger logr.Logger,
	dakrClient DakrClient,
	metrics *collector.TelemetryMetrics,
	interval time.Duration,
	healthManager *health.HealthManager,
) *TelemetrySender

NewTelemetrySender creates a new TelemetrySender

func (*TelemetrySender) Start

func (s *TelemetrySender) Start(ctx context.Context) error

Start begins the periodic sending of metrics

func (*TelemetrySender) Stop

func (s *TelemetrySender) Stop() error

Stop halts the periodic sending of metrics

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL