dataproxy

package
v1.0.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 2, 2025 License: Apache-2.0, BSD-3-Clause, MIT Imports: 29 Imported by: 1

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrInvalidGroupID    = errors.New("invalid group ID")
	ErrInvalidURL        = errors.New("invalid URL")
	ErrNoEndpoint        = errors.New("service has no endpoints")
	ErrNoAvailableWorker = errors.New("no available worker")
	ErrInvalidMessage    = errors.New("invalid message, GroupID/StreamID/Payload is empty or contains illegal characters")
)

variables

View Source
var (
	// DefaultURL is the default Manager URL for discovering the DataProxy cluster
	DefaultURL = "http://127.0.0.1:8083/inlong/manager/openapi/dataproxy/getIpList"
)

Functions

func NewDiscoverer

func NewDiscoverer(url, groupID string, lookupInterval time.Duration, log logger.Logger, auth Auth) (discoverer.Discoverer, error)

NewDiscoverer news a DataProxy discoverer

Types

type Auth

type Auth interface {
	GetToken(ctx context.Context, groupID string) (key string, token string, err error)
}

Auth dataproxy authentication interface

type Callback

type Callback func(message Message, err error)

Callback is the callback function signature of the DataProxy producer

type Client

type Client interface {
	// Send sends a message and wait for the result.
	Send(ctx context.Context, msg Message) error
	// SendAsync sends a message asynchronously, when the message is sent or timeout, the callback will be called.
	SendAsync(ctx context.Context, msg Message, callback Callback)
	// Close flushes all the message to the server and wait for the results or timeout, then close the producer.
	Close()
}

Client is the interface of a DataProxy client

func NewClient

func NewClient(opts ...Option) (Client, error)

NewClient Creates a dataproxy-go client instance

type Message

type Message struct {
	GroupID  string            // InLong group ID
	StreamID string            // InLong stream ID
	Payload  []byte            // the content of the message
	Headers  map[string]string // message headers, won't be sent to the server right now
	MetaData interface{}       // any data you want, won't be sent to the server, but you can get it in the callback
}

Message is the message to send

func (*Message) IsValid added in v1.0.1

func (m *Message) IsValid() bool

IsValid checks if the message is valid

type Option

type Option func(*Options)

Option is the Options helper.

func WithAddColumns

func WithAddColumns(cols map[string]string) Option

WithAddColumns sets AddColumns

func WithAuth

func WithAuth(auth Auth) Option

WithAuth sets Auth

func WithBatchingMaxMessages

func WithBatchingMaxMessages(n int) Option

WithBatchingMaxMessages sets BatchingMaxMessages

func WithBatchingMaxPublishDelay

func WithBatchingMaxPublishDelay(t time.Duration) Option

WithBatchingMaxPublishDelay sets BatchingMaxPublishDelay

func WithBatchingMaxSize

func WithBatchingMaxSize(n int) Option

WithBatchingMaxSize sets BatchingMaxSize

func WithBlockIfQueueIsFull

func WithBlockIfQueueIsFull(b bool) Option

WithBlockIfQueueIsFull sets BlockIfQueueIsFull

func WithBufferPool

func WithBufferPool(bp bufferpool.BufferPool) Option

WithBufferPool sets BufferPool

func WithBufferPoolSize

func WithBufferPoolSize(n int) Option

WithBufferPoolSize sets BufferPoolSize

func WithBytePool

func WithBytePool(bp bufferpool.BytePool) Option

WithBytePool sets BytePool

func WithBytePoolSize

func WithBytePoolSize(n int) Option

WithBytePoolSize sets BytePoolSize

func WithBytePoolWidth

func WithBytePoolWidth(n int) Option

WithBytePoolWidth sets BytePoolWidth

func WithConnTimeout

func WithConnTimeout(t time.Duration) Option

WithConnTimeout sets ConnTimeout

func WithGroupID

func WithGroupID(g string) Option

WithGroupID sets GroupID

func WithLogger

func WithLogger(log logger.Logger) Option

WithLogger sets Logger

func WithMaxConnLifetime

func WithMaxConnLifetime(lifetime time.Duration) Option

WithMaxConnLifetime sets MaxConnLifetime

func WithMaxPendingMessages

func WithMaxPendingMessages(n int) Option

WithMaxPendingMessages sets MaxPendingMessages

func WithMaxRetries

func WithMaxRetries(n int) Option

WithMaxRetries sets MaxRetries

func WithMetricsName

func WithMetricsName(name string) Option

WithMetricsName sets Logger

func WithMetricsRegistry

func WithMetricsRegistry(reg prometheus.Registerer) Option

WithMetricsRegistry sets Logger

func WithReadBufferSize

func WithReadBufferSize(n int) Option

WithReadBufferSize sets ReadBufferSize

func WithSendTimeout

func WithSendTimeout(t time.Duration) Option

WithSendTimeout sets SendTimeout

func WithSocketRecvBufferSize

func WithSocketRecvBufferSize(n int) Option

WithSocketRecvBufferSize sets SocketRecvBufferSize

func WithSocketSendBufferSize

func WithSocketSendBufferSize(n int) Option

WithSocketSendBufferSize sets SocketSendBufferSize

func WithURL

func WithURL(u string) Option

WithURL sets URL

func WithUpdateInterval

func WithUpdateInterval(u time.Duration) Option

WithUpdateInterval sets UpdateInterval

func WithWorkerNum

func WithWorkerNum(n int) Option

WithWorkerNum sets WorkerNum

func WithWriteBufferSize

func WithWriteBufferSize(n int) Option

WithWriteBufferSize sets WriteBufferSize

type Options

type Options struct {
	GroupID                 string                // InLong group ID
	URL                     string                // the Manager URL for discovering the DataProxy cluster
	UpdateInterval          time.Duration         // interval to refresh the endpoint list, default: 5m
	ConnTimeout             time.Duration         // connection timeout: default: 3000ms
	WriteBufferSize         int                   // write buffer size in bytes, default: 8M
	ReadBufferSize          int                   // read buffer size in bytes, default: 1M
	SocketSendBufferSize    int                   // socket send buffer size in bytes, default: 8M
	SocketRecvBufferSize    int                   // socket receive buffer size in bytes, default: 1M
	BufferPool              bufferpool.BufferPool // encoding/decoding buffer pool, if not given, SDK will init a new one
	BytePool                bufferpool.BytePool   // encoding/decoding byte pool, if not given, SDK will init a new one
	BufferPoolSize          int                   // buffer pool size, default: 409600
	BytePoolSize            int                   // byte pool size, default: 409600
	BytePoolWidth           int                   // byte pool width, default: equals to BatchingMaxSize
	Logger                  logger.Logger         // debug logger, default: stdout
	MetricsName             string                // the unique metrics name of this SDK, used to isolate metrics in the case that more than 1 client are initialized in one process
	MetricsRegistry         prometheus.Registerer // metrics registry, default: prometheus.DefaultRegisterer
	WorkerNum               int                   // worker number, default: 8
	SendTimeout             time.Duration         // send timeout, default: 30000ms
	MaxRetries              int                   // max retry count, default: 2
	BatchingMaxPublishDelay time.Duration         // the time period within which the messages sent will be batched, default: 20ms
	BatchingMaxMessages     int                   // the maximum number of messages permitted in a batch, default: 50
	BatchingMaxSize         int                   // the maximum number of bytes permitted in a batch, default: 40K
	MaxPendingMessages      int                   // the max size of the queue holding the messages pending to receive an acknowledgment from the broker, default: 204800
	BlockIfQueueIsFull      bool                  // whether Send and SendAsync block if producer's message queue is full, default: false
	AddColumns              map[string]string     // addition columns to add to the message, for example: __addcol1__worldid=xxx&__addcol2__ip=yyy, all the message will be added 2 more columns with worldid=xxx and ip=yyy

	Auth            Auth          // dataproxy authentication interface
	MaxConnLifetime time.Duration // connection max lifetime, default: 0, set to 5m/10m when the servers provide service though CLBs (Cloud Load Balancers)
	// contains filtered or unexported fields
}

Options is the DataProxy go client configs

func (*Options) ValidateAndSetDefault

func (options *Options) ValidateAndSetDefault() error

ValidateAndSetDefault validates an options and set up the default values

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL