subscriber

package
v0.0.0-...-c42fbe7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 19, 2026 License: Apache-2.0 Imports: 35 Imported by: 0

Documentation

Index

Constants

View Source
const SLSFlusherConfigForAPMGateway = `` /* 861-byte string literal not displayed */
View Source
const SLSFlusherConfigTemplate = `` /* 204-byte string literal not displayed */

Variables

This section is empty.

Functions

func InitSubscriber

func InitSubscriber(ctx context.Context, name string, cfgStr string) (context.Context, error)

func RegisterCreator

func RegisterCreator(name string, creator Creator)

func TryReplacePhysicalAddress

func TryReplacePhysicalAddress(addr string) (string, error)

Types

type ClickHouseSubscriber

type ClickHouseSubscriber struct {
	Address     string `mapstructure:"address" comment:"the clickhouse address"`
	Username    string `mapstructure:"username" comment:"the clickhouse username"`
	Password    string `mapstructure:"password" comment:"the clickhouse password"`
	Database    string `mapstructure:"database" comment:"the clickhouse database name to query from"`
	Table       string `mapstructure:"table" comment:"the clickhouse table name to query from"`
	CreateTable bool   `mapstructure:"create_db" comment:"if create the database, default is true"`
	// contains filtered or unexported fields
}

func (*ClickHouseSubscriber) Description

func (i *ClickHouseSubscriber) Description() string

func (*ClickHouseSubscriber) FlusherConfig

func (i *ClickHouseSubscriber) FlusherConfig() string

func (*ClickHouseSubscriber) GetData

func (i *ClickHouseSubscriber) GetData(sql string, startTime int32) ([]*protocol.LogGroup, error)

func (*ClickHouseSubscriber) Name

func (i *ClickHouseSubscriber) Name() string

func (*ClickHouseSubscriber) Stop

func (i *ClickHouseSubscriber) Stop() error

type Creator

type Creator func(spec map[string]interface{}) (Subscriber, error)

Creator creates a new subscriber instance according to the spec.

type DorisSubscriber

type DorisSubscriber struct {
	Address     string `mapstructure:"address" comment:"the doris FE address (format: http://host:port)"`
	Username    string `mapstructure:"username" comment:"the doris username"`
	Password    string `mapstructure:"password" comment:"the doris password"`
	Database    string `mapstructure:"database" comment:"the doris database name to query from"`
	Table       string `mapstructure:"table" comment:"the doris table name to query from"`
	CreateTable bool   `mapstructure:"create_table" comment:"if create the table, default is true"`
	// contains filtered or unexported fields
}

func (*DorisSubscriber) Description

func (d *DorisSubscriber) Description() string

func (*DorisSubscriber) FlusherConfig

func (d *DorisSubscriber) FlusherConfig() string

func (*DorisSubscriber) GetData

func (d *DorisSubscriber) GetData(sqlStr string, startTime int32) ([]*protocol.LogGroup, error)

func (*DorisSubscriber) Name

func (d *DorisSubscriber) Name() string

func (*DorisSubscriber) Stop

func (d *DorisSubscriber) Stop() error

type ElasticSearchSubscriber

type ElasticSearchSubscriber struct {
	Address  string `mapstructure:"address" comment:"the elasticsearch address"`
	Username string `mapstructure:"username" comment:"the elasticsearch username"`
	Password string `mapstructure:"password" comment:"the elasticsearch password"`
	Index    string `mapstructure:"index" comment:"the elasticsearch index name to query from"`
	// contains filtered or unexported fields
}

func (*ElasticSearchSubscriber) Description

func (i *ElasticSearchSubscriber) Description() string

func (*ElasticSearchSubscriber) FlusherConfig

func (i *ElasticSearchSubscriber) FlusherConfig() string

func (*ElasticSearchSubscriber) GetData

func (i *ElasticSearchSubscriber) GetData(sql string, startTime int32) ([]*protocol.LogGroup, error)

func (*ElasticSearchSubscriber) Name

func (i *ElasticSearchSubscriber) Name() string

func (*ElasticSearchSubscriber) Stop

func (i *ElasticSearchSubscriber) Stop() error

type GRPCService

type GRPCService struct {
	protocol.UnimplementedLogReportServiceServer
	// contains filtered or unexported fields
}

func (*GRPCService) Collect

type GrpcSubscriber

type GrpcSubscriber struct {
	Address    string `mapstructure:"address" comment:"the gRPC server address, default value is :9000"`
	Network    string `mapstructure:"network" comment:"the gRPC server network, default value is tcp"`
	DelayStart string `mapstructure:"delay_start" comment:"the delay start time duration for fault injection, such as 5s"`
	// contains filtered or unexported fields
}

func (*GrpcSubscriber) Description

func (g *GrpcSubscriber) Description() string

func (*GrpcSubscriber) FlusherConfig

func (g *GrpcSubscriber) FlusherConfig() string

func (*GrpcSubscriber) GetData

func (g *GrpcSubscriber) GetData(string, int32) ([]*protocol.LogGroup, error)

func (*GrpcSubscriber) Name

func (g *GrpcSubscriber) Name() string

func (*GrpcSubscriber) Start

func (g *GrpcSubscriber) Start() error

func (*GrpcSubscriber) Stop

func (g *GrpcSubscriber) Stop() error

func (*GrpcSubscriber) SubscribeChan

func (g *GrpcSubscriber) SubscribeChan() <-chan *protocol.LogGroup

type InfluxdbSubscriber

type InfluxdbSubscriber struct {
	DbHost      string `mapstructure:"db_host" comment:"the influxdb host address"`
	DbUsername  string `mapstructure:"db_username" comment:"the influxdb username"`
	DbPassword  string `mapstructure:"db_password" comment:"the influxdb password"`
	DbName      string `mapstructure:"db_name" comment:"the influxdb database name to query from"`
	Measurement string `mapstructure:"measurement" comment:"the measurement to query from"`
	CreateDb    bool   `mapstructure:"create_db" comment:"if create the database, default is true"`
	// contains filtered or unexported fields
}

func (*InfluxdbSubscriber) Description

func (i *InfluxdbSubscriber) Description() string

func (*InfluxdbSubscriber) FlusherConfig

func (i *InfluxdbSubscriber) FlusherConfig() string

func (*InfluxdbSubscriber) GetData

func (i *InfluxdbSubscriber) GetData(_ string, startTime int32) ([]*protocol.LogGroup, error)

func (*InfluxdbSubscriber) Name

func (i *InfluxdbSubscriber) Name() string

func (*InfluxdbSubscriber) Stop

func (i *InfluxdbSubscriber) Stop() error

type KafkaSubscriber

type KafkaSubscriber struct {
	Brokers []string `mapstructure:"brokers" comment:"list of kafka brokers"`
	Topic   string   `mapstructure:"topic" comment:"kafka topic to consume from"`
	GroupID string   `mapstructure:"group_id" comment:"kafka consumer group id"`
	Version string   `mapstructure:"version" comment:"kafka broker version, e.g. 0.10.2.0 (optional)"`
}

func (*KafkaSubscriber) Description

func (k *KafkaSubscriber) Description() string

func (*KafkaSubscriber) FlusherConfig

func (k *KafkaSubscriber) FlusherConfig() string

func (*KafkaSubscriber) GetData

func (k *KafkaSubscriber) GetData(sql string, startTime int32) ([]*protocol.LogGroup, error)

func (*KafkaSubscriber) Name

func (k *KafkaSubscriber) Name() string

func (*KafkaSubscriber) Stop

func (k *KafkaSubscriber) Stop() error

type LokiSubscriber

type LokiSubscriber struct {
	Address      string            `mapstructure:"address" comment:"the loki address"`
	TargetLabels map[string]string `mapstructure:"target_labels" comment:"interval between queries select upserts records from loki"`
	TenantID     string            `mapstructure:"tenant_id" comment:"tenant id of loki"`
	// contains filtered or unexported fields
}

func (*LokiSubscriber) Description

func (l *LokiSubscriber) Description() string

func (*LokiSubscriber) FlusherConfig

func (l *LokiSubscriber) FlusherConfig() string

func (*LokiSubscriber) GetData

func (l *LokiSubscriber) GetData(sql string, startTime int32) ([]*protocol.LogGroup, error)

func (*LokiSubscriber) Name

func (l *LokiSubscriber) Name() string

func (*LokiSubscriber) Stop

func (l *LokiSubscriber) Stop() error

type QueryData

type QueryData struct {
	Result []QueryResult
}

type QueryResponse

type QueryResponse struct {
	Status string
	Data   QueryData
}

type QueryResult

type QueryResult struct {
	Stream map[string]string
	Values [][]string
}

type SLSSubscriber

type SLSSubscriber struct {
	TelemetryType     string
	Aliuid            string
	Region            string
	Endpoint          string
	QueryEndpoint     string
	Project           string
	Logstore          string
	Scenario          string
	APMProject        string
	APMWorkspace      string
	APMTraceLogstore  string
	APMMetricLogstore string
	// contains filtered or unexported fields
}

func (*SLSSubscriber) ApplyConfig

func (s *SLSSubscriber) ApplyConfig(configName, machineGroup string) error

func (*SLSSubscriber) Description

func (s *SLSSubscriber) Description() string

func (*SLSSubscriber) FlusherConfig

func (s *SLSSubscriber) FlusherConfig() string

func (*SLSSubscriber) GetData

func (s *SLSSubscriber) GetData(query string, startTime int32) ([]*protocol.LogGroup, error)

func (*SLSSubscriber) Name

func (s *SLSSubscriber) Name() string

func (*SLSSubscriber) RemoveConfig

func (s *SLSSubscriber) RemoveConfig(configName, machineGroup string) error

func (*SLSSubscriber) Stop

func (s *SLSSubscriber) Stop() error

func (*SLSSubscriber) UpdateConfig

func (s *SLSSubscriber) UpdateConfig(configName, configYaml string) error

type Subscriber

type Subscriber interface {
	doc.Doc
	// Name of subscriber
	Name() string
	// Stop
	Stop() error
	// Get data
	GetData(sql string, startTime int32) ([]*protocol.LogGroup, error)
	// FlusherConfig returns the default flusher config for loongcollector container to transfer the received or self telemetry data.
	FlusherConfig() string
}

Subscriber receives the logs transfer by loongcollector.

var TestSubscriber Subscriber

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL