Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type App ¶ added in v0.12.0
type App struct {
// TCP address that gRPC API server should listen on.
GRPCAddr string `yaml:"grpc_addr"`
// TCP address that HTTP API server should listen on.
TCPAddr string `yaml:"tcp_addr"`
// Unix domain socket address that HTTP API server should listen on.
// Listening on a unix domain socket is disabled by default.
UnixAddr string `yaml:"unix_addr"`
// An arbitrary number of proxies to different Kafka/ZooKeeper clusters can
// be configured. Each proxy configuration is identified by a cluster name.
Proxies map[string]*Proxy `yaml:"proxies"`
// Default cluster is the one to be used in API calls that do not start with
// prefix `/clusters/<cluster>`. If it is not explicitly provided, then the
// one mentioned in the `Proxies` section first is assumed.
DefaultCluster string `yaml:"default_cluster"`
}
App defines Kafka-Pixy application configuration. It mirrors the structure of the JSON configuration file.
func DefaultApp ¶ added in v0.12.0
DefaultApp returns default application configuration where default proxy has the specified cluster.
func FromYAML ¶ added in v0.12.0
FromYAML parses configuration from a YAML string and performs basic validation of parameters.
func FromYAMLFile ¶ added in v0.12.0
FromYAML parses configuration from a YAML file and performs basic validation of parameters.
type Compression ¶ added in v0.14.0
type Compression sarama.CompressionCodec
func (*Compression) UnmarshalText ¶ added in v0.14.0
func (c *Compression) UnmarshalText(text []byte) error
type KafkaVersion ¶ added in v0.14.0
type KafkaVersion struct {
// contains filtered or unexported fields
}
func (*KafkaVersion) IsAtLeast ¶ added in v0.14.0
func (kv *KafkaVersion) IsAtLeast(v sarama.KafkaVersion) bool
func (*KafkaVersion) Set ¶ added in v0.14.0
func (kv *KafkaVersion) Set(v sarama.KafkaVersion)
func (*KafkaVersion) UnmarshalText ¶ added in v0.14.0
func (kv *KafkaVersion) UnmarshalText(text []byte) error
type PartitionerConstructor ¶ added in v0.16.0
type PartitionerConstructor string
func (PartitionerConstructor) ToPartitionerConstructor ¶ added in v0.16.0
func (pc PartitionerConstructor) ToPartitionerConstructor() (sarama.PartitionerConstructor, error)
type Proxy ¶ added in v0.12.0
type Proxy struct {
// Unique ID that identifies a Kafka-Pixy instance in both ZooKeeper and
// Kafka. It is automatically generated by default and it is recommended to
// leave it like that.
ClientID string `yaml:"client_id"`
Kafka struct {
// List of seed Kafka peers that Kafka-Pixy should access to resolve
// the Kafka cluster topology.
SeedPeers []string `yaml:"seed_peers"`
// Version of the Kafka cluster. Supported versions are 0.10.2.1 - 2.0.0
Version KafkaVersion
} `yaml:"kafka"`
ZooKeeper struct {
// List of seed ZooKeeper peers that Kafka-Pixy should access to
// resolve the ZooKeeper cluster topology.
SeedPeers []string `yaml:"seed_peers"`
// A root directory in ZooKeeper to store consumers data.
Chroot string `yaml:"chroot"`
// ZooKeeper session timeout has to be a minimum of 2 times the
// tickTime (as set in the server configuration) and a maximum of 20
// times the tickTime. The default ZooKeeper tickTime is 2 seconds.
//
// See http://zookeeper.apache.org/doc/trunk/zookeeperProgrammers.html#ch_zkSessions
SessionTimeout time.Duration `yaml:"session_timeout"`
} `yaml:"zoo_keeper"`
// Networking timeouts. These all pass through to sarama's `config.Net`
// field.
Net struct {
// How long to wait for the initial connection.
DialTimeout time.Duration `yaml:"dial_timeout"`
// How long to wait for a response.
ReadTimeout time.Duration `yaml:"read_timeout"`
// How long to wait for a transmit.
WriteTimeout time.Duration `yaml:"write_timeout"`
} `yaml:"net"`
Producer struct {
// Size of all buffered channels created by the producer module.
ChannelBufferSize int `yaml:"channel_buffer_size"`
// Size of maximum message in bytes
MaxMessageBytes int `yaml:"max_message_bytes"`
// The type of compression to use on messages.
Compression Compression `yaml:"compression"`
// The best-effort number of bytes needed to trigger a flush.
FlushBytes int `yaml:"flush_bytes"`
// The best-effort frequency of flushes.
FlushFrequency time.Duration `yaml:"flush_frequency"`
// How long to wait for the cluster to settle between retries.
RetryBackoff time.Duration `yaml:"retry_backoff"`
// The total number of times to retry sending a message.
RetryMax int `yaml:"retry_max"`
// The level of acknowledgement reliability needed from the broker.
RequiredAcks RequiredAcks `yaml:"required_acks"`
// Period of time that Kafka-Pixy should keep trying to submit buffered
// messages to Kafka. It is recommended to make it large enough to survive
// a ZooKeeper leader election in your setup.
ShutdownTimeout time.Duration `yaml:"shutdown_timeout"`
// How to assign incoming messages to a Kafka partition. Defaults to
// using a hash of the specified message key, or random if the key is
// unspecified.
Partitioner PartitionerConstructor `yaml:"partitioner"`
// The timeout to specify on individual produce requests to the broker.
// The broker will wait for replication to complete up to this duration
// before returning an error.
Timeout time.Duration `yaml:"timeout"`
} `yaml:"producer"`
Consumer struct {
// If set, Kafka-Pixy will not configure a consumer, and any attempts to
// call the consumer APIs will return an error.
Disabled bool `yaml:"disabled"`
// Period of time that Kafka-Pixy should wait for an acknowledgement
// before retrying.
AckTimeout time.Duration `yaml:"ack_timeout"`
// Size of all buffered channels created by the consumer module.
ChannelBufferSize int `yaml:"channel_buffer_size"`
// The number of bytes of messages to attempt to fetch for each
// topic-partition in each fetch request. These bytes will be read into
// memory for each partition, so this helps control the memory used by
// the consumer. The fetch request size must be at least as large as
// the maximum message size the server allows or else it is possible
// for the producer to send messages larger than the consumer can fetch.
FetchMaxBytes int `yaml:"fetch_max_bytes"`
// The maximum amount of time the server will block before answering
// the fetch request if there isn't data immediately available.
FetchMaxWait time.Duration `yaml:"fetch_max_wait"`
// Consume request will wait at most this long for a message from a
// topic to become available before expiring.
LongPollingTimeout time.Duration `yaml:"long_polling_timeout"`
// The maximum number of unacknowledged messages allowed for a
// particular group-topic-partition at a time. When this number is
// reached subsequent consume requests will return long polling timeout
// errors, until some of the pending messages are acknowledged.
MaxPendingMessages int `yaml:"max_pending_messages"`
// The maximum number of retries Kafka-Pixy will make to offer an
// unack message. Messages that exceeded the number of retries are
// discarded by Kafka-Pixy and acknowledged in Kafka. Zero retries
// means that messages will be offered just once.
//
// If you want Kafka-Pixy to retry indefinitely, then set this
// parameter to -1.
MaxRetries int `yaml:"max_retries"`
// How frequently to commit offsets to Kafka.
OffsetsCommitInterval time.Duration `yaml:"offsets_commit_interval"`
// Kafka-Pixy should wait this long after it gets notification that a
// consumer joined/left a consumer group it is a member of before
// rebalancing.
RebalanceDelay time.Duration `yaml:"rebalance_delay"`
// If a request to a Kafka-Pixy fails for any reason, then it should
// wait this long before retrying.
RetryBackoff time.Duration `yaml:"retry_backoff"`
// Period of time that Kafka-Pixy should keep subscription to
// a topic by a group in absence of requests from the consumer group.
SubscriptionTimeout time.Duration `yaml:"subscription_timeout"`
} `yaml:"consumer"`
}
Proxy defines configuration of a proxy to a particular Kafka/ZooKeeper cluster.
func DefaultProxy ¶ added in v0.12.0
func DefaultProxy() *Proxy
DefaultCluster returns configuration used by default.
func (*Proxy) SaramaClientCfg ¶ added in v0.14.0
func (*Proxy) SaramaProducerCfg ¶ added in v0.14.0
SaramaProducerCfg returns a config for sarama producer.
type RequiredAcks ¶ added in v0.14.0
type RequiredAcks sarama.RequiredAcks
func (*RequiredAcks) UnmarshalText ¶ added in v0.14.0
func (ra *RequiredAcks) UnmarshalText(text []byte) error
Click to show internal directories.
Click to hide internal directories.