Documentation
¶
Index ¶
- Constants
- func Convert2Avro(value []byte, schema string) ([]byte, error)
- func ConvertFromAvro(binary []byte, schema string) (string, error)
- func CreateMessage(message []byte, schemaID int) ([]byte, error)
- func GetMessageAvroID(messageValue []byte) ([]byte, int)
- func New() venom.Executor
- type Executor
- type Message
- type MessageJSON
- type Result
- type SchemaRegistry
Constants ¶
const (
// Name of executor
Name = "kafka"
)
Variables ¶
This section is empty.
Functions ¶
func Convert2Avro ¶ added in v1.0.0
Convert2Avro will convert value to Avro encoded binary with help of schema
func ConvertFromAvro ¶ added in v1.0.0
ConvertFromAvro will convert value from Avro encoded binary with help of schema to string
func CreateMessage ¶ added in v1.0.0
CreateMessage will convert Avro message to one, which can be sent to Kafka
func GetMessageAvroID ¶ added in v1.0.0
GetMessageAvroID will try to get encoded message Avro ID
Types ¶
type Executor ¶
type Executor struct {
Addrs []string `json:"addrs,omitempty" yaml:"addrs,omitempty"`
// Registry schema address
SchemaRegistryAddr string `json:"schema_registry_addr,omitempty" yaml:"schemaRegistryAddr,omitempty"`
WithAVRO bool `json:"with_avro,omitempty" yaml:"withAVRO,omitempty"`
WithTLS bool `json:"with_tls,omitempty" yaml:"withTLS,omitempty"`
WithSASL bool `json:"with_sasl,omitempty" yaml:"withSASL,omitempty"`
WithSASLHandshaked bool `json:"with_sasl_handshaked,omitempty" yaml:"withSASLHandshaked,omitempty"`
User string `json:"user,omitempty" yaml:"user,omitempty"`
Password string `json:"password,omitempty" yaml:"password,omitempty"`
// TLS Config
InsecureTLS bool `json:"insecure_tls,omitempty" yaml:"insecure_tls,omitempty"`
// ClientType must be "consumer" or "producer"
ClientType string `json:"client_type,omitempty" yaml:"clientType,omitempty"`
// Used when ClientType is consumer
GroupID string `json:"group_id,omitempty" yaml:"groupID,omitempty"`
Topics []string `json:"topics,omitempty" yaml:"topics,omitempty"`
// Represents the timeout for reading messages. In Seconds. Default 5
Timeout int `json:"timeout,omitempty" yaml:"timeout,omitempty"`
// WaitFor represents the time for reading messages without marking the test as failure.
WaitFor int `json:"wait_for,omitempty" yaml:"waitFor,omitempty"`
// Represents the limit of message will be read. After limit, consumer stop read message
MessageLimit int `json:"message_limit,omitempty" yaml:"messageLimit,omitempty"`
// InitialOffset represents the initial offset for the consumer. Possible value : newest, oldest. default: newest
InitialOffset string `json:"initial_offset,omitempty" yaml:"initialOffset,omitempty"`
// MarkOffset allows to mark offset when consuming message
MarkOffset bool `json:"mark_offset,omitempty" yaml:"markOffset,omitempty"`
// KeyFilter determines the key to filter from
KeyFilter string `json:"key_filter,omitempty" yaml:"keyFilter,omitempty"`
// Only one of JSON or Avro are currently supported
ConsumerEncoding string `json:"consumer_encoding,omitempty" yaml:"consumerEncoding,omitempty"`
// Used when ClientType is producer
// Messages represents the message sended by producer
Messages []Message `json:"messages,omitempty" yaml:"messages,omitempty"`
// MessagesFile represents the messages into the file sended by producer (messages field would be ignored)
MessagesFile string `json:"messages_file,omitempty" yaml:"messages_file,omitempty"`
// Kafka version, default is 0.10.2.0
KafkaVersion string `json:"kafka_version,omitempty" yaml:"kafka_version,omitempty"`
// contains filtered or unexported fields
}
Executor represents a Test Exec
func (Executor) GetDefaultAssertions ¶
func (Executor) GetDefaultAssertions() *venom.StepAssertions
GetDefaultAssertions return default assertions for type exec
func (Executor) ZeroValueResult ¶
func (Executor) ZeroValueResult() interface{}
ZeroValueResult return an empty implementation of this executor result
type Message ¶
type Message struct {
Topic string `json:"topic" yaml:"topic"`
Key string `json:"key" yaml:"key"`
Headers map[string]string `json:"headers,omitempty" yaml:"headers,omitempty"`
Value string `json:"value,omitempty" yaml:"value,omitempty"`
ValueFile string `json:"valueFile,omitempty" yaml:"valueFile,omitempty"`
AvroSchemaFile string `json:"avroSchemaFile,omitempty" yaml:"avroSchemaFile,omitempty"`
}
Message represents the object sended or received from kafka
type MessageJSON ¶
type MessageJSON struct {
Topic string
Key interface{}
Value interface{}
}
MessageJSON represents the object sended or received from kafka
type Result ¶
type Result struct {
TimeSeconds float64 `json:"timeseconds,omitempty" yaml:"timeSeconds,omitempty"`
Messages []Message `json:"messages,omitempty" yaml:"messages,omitempty"`
MessagesJSON []interface{} `json:"messagesjson,omitempty" yaml:"messagesJSON,omitempty"`
Err string `json:"err" yaml:"error"`
}
Result represents a step result.
type SchemaRegistry ¶ added in v1.0.0
type SchemaRegistry interface {
GetSchemaByID(id int) (string, error)
RegisterNewSchema(subject, schema string) (int, error)
GetLatestSchema(subject string) (int, string, error)
}
SchemaRegistry will provide interface to SchemaRegistry implementation
func NewSchemaRegistry ¶ added in v1.0.0
func NewSchemaRegistry(schemaRegistryHost string) (SchemaRegistry, error)
NewSchemaRegistry will create new Schema Registry interface
func NewWithClient ¶ added in v1.0.0
func NewWithClient(schemaRegistryHost string, httpClient *http.Client) (SchemaRegistry, error)
NewWithClient will add SchemaRegistry with client