Documentation
¶
Index ¶
- Constants
- Variables
- type KafkaWriter
- type ToHTTPOpSpec
- type ToHTTPProcedureSpec
- type ToHTTPTransformation
- func (t *ToHTTPTransformation) Finish(id execute.DatasetID, err error)
- func (t *ToHTTPTransformation) Process(id execute.DatasetID, tbl flux.Table) error
- func (t *ToHTTPTransformation) RetractTable(id execute.DatasetID, key flux.GroupKey) error
- func (t *ToHTTPTransformation) UpdateProcessingTime(id execute.DatasetID, pt execute.Time) error
- func (t *ToHTTPTransformation) UpdateWatermark(id execute.DatasetID, pt execute.Time) error
- type ToKafkaOpSpec
- type ToKafkaProcedureSpec
- type ToKafkaTransformation
- func (t *ToKafkaTransformation) Finish(id execute.DatasetID, err error)
- func (t *ToKafkaTransformation) Process(id execute.DatasetID, tbl flux.Table) (err error)
- func (t *ToKafkaTransformation) RetractTable(id execute.DatasetID, key flux.GroupKey) error
- func (t *ToKafkaTransformation) UpdateProcessingTime(id execute.DatasetID, pt execute.Time) error
- func (t *ToKafkaTransformation) UpdateWatermark(id execute.DatasetID, pt execute.Time) error
Constants ¶
const ( ToHTTPKind = "toHTTP" DefaultToHTTPTimeout = 1 * time.Second )
const (
// ToKafkaKind is the Kind for the ToKafka Flux function
ToKafkaKind = "toKafka"
)
Variables ¶
var DefaultKafkaWriterFactory = func(conf kafka.WriterConfig) KafkaWriter { return kafka.NewWriter(conf) }
DefaultKafkaWriterFactory is a terrible name for a way to make a kafkaWriter that is injectable for testing
var DefaultToHTTPUserAgent = "fluxd/dev"
DefaultToHTTPUserAgent is the default user agent used by ToHttp
Functions ¶
This section is empty.
Types ¶
type KafkaWriter ¶
KafkaWriter is an interface for what we need fromDefaultKafkaWriterFactory
type ToHTTPOpSpec ¶
type ToHTTPOpSpec struct {
URL string `json:"url"`
Method string `json:"method"` // default behavior should be POST
Name string `json:"name"`
NameColumn string `json:"nameColumn"` // either name or name_column must be set, if none is set try to use the "_measurement" column.
Headers map[string]string `json:"headers"` // TODO: implement Headers after bug with keys and arrays and objects is fixed (new parser implemented, with string literals as keys)
URLParams map[string]string `json:"urlParams"` // TODO: implement URLParams after bug with keys and arrays and objects is fixed (new parser implemented, with string literals as keys)
Timeout time.Duration `json:"timeout"` // default to something reasonable if zero
NoKeepAlive bool `json:"noKeepAlive"`
TimeColumn string `json:"timeColumn"`
TagColumns []string `json:"tagColumns"`
ValueColumns []string `json:"valueColumns"`
}
func (ToHTTPOpSpec) Kind ¶
func (ToHTTPOpSpec) Kind() flux.OperationKind
func (*ToHTTPOpSpec) ReadArgs ¶
func (o *ToHTTPOpSpec) ReadArgs(args flux.Arguments) error
ReadArgs loads a flux.Arguments into ToHTTPOpSpec. It sets several default values. If the http method isn't set, it defaults to POST, it also uppercases the http method. If the time_column isn't set, it defaults to execute.TimeColLabel. If the value_column isn't set it defaults to a []string{execute.DefaultValueColLabel}.
func (*ToHTTPOpSpec) UnmarshalJSON ¶
func (o *ToHTTPOpSpec) UnmarshalJSON(b []byte) (err error)
UnmarshalJSON unmarshals and validates toHTTPOpSpec into JSON.
type ToHTTPProcedureSpec ¶
type ToHTTPProcedureSpec struct {
Spec *ToHTTPOpSpec
}
func (*ToHTTPProcedureSpec) Copy ¶
func (o *ToHTTPProcedureSpec) Copy() plan.ProcedureSpec
func (*ToHTTPProcedureSpec) Kind ¶
func (o *ToHTTPProcedureSpec) Kind() plan.ProcedureKind
type ToHTTPTransformation ¶
type ToHTTPTransformation struct {
// contains filtered or unexported fields
}
func NewToHTTPTransformation ¶
func NewToHTTPTransformation(d execute.Dataset, cache execute.TableBuilderCache, spec *ToHTTPProcedureSpec) *ToHTTPTransformation
func (*ToHTTPTransformation) Finish ¶
func (t *ToHTTPTransformation) Finish(id execute.DatasetID, err error)
func (*ToHTTPTransformation) RetractTable ¶
func (*ToHTTPTransformation) UpdateProcessingTime ¶
func (*ToHTTPTransformation) UpdateWatermark ¶
type ToKafkaOpSpec ¶
type ToKafkaOpSpec struct {
Brokers []string `json:"brokers"`
Topic string `json:"topic"`
Balancer string `json:"balancer"`
Name string `json:"name"`
NameColumn string `json:"nameColumn"` // either name or name_column must be set, if none is set try to use the "_measurement" column.
TimeColumn string `json:"timeColumn"`
TagColumns []string `json:"tagColumns"`
ValueColumns []string `json:"valueColumns"`
MsgBufSize int `json:"msgBufferSize"` // the maximim number of messages to buffer before sending to kafka, the library we use defaults to 100
}
func (ToKafkaOpSpec) Kind ¶
func (ToKafkaOpSpec) Kind() flux.OperationKind
func (*ToKafkaOpSpec) ReadArgs ¶
func (o *ToKafkaOpSpec) ReadArgs(args flux.Arguments) error
ReadArgs loads a flux.Arguments into ToKafkaOpSpec. It sets several default values. If the time_column isn't set, it defaults to execute.TimeColLabel. If the value_column isn't set it defaults to a []string{execute.DefaultValueColLabel}.
type ToKafkaProcedureSpec ¶
type ToKafkaProcedureSpec struct {
Spec *ToKafkaOpSpec
// contains filtered or unexported fields
}
func (*ToKafkaProcedureSpec) Copy ¶
func (o *ToKafkaProcedureSpec) Copy() plan.ProcedureSpec
func (*ToKafkaProcedureSpec) Kind ¶
func (o *ToKafkaProcedureSpec) Kind() plan.ProcedureKind
type ToKafkaTransformation ¶
type ToKafkaTransformation struct {
// contains filtered or unexported fields
}
func NewToKafkaTransformation ¶
func NewToKafkaTransformation(d execute.Dataset, cache execute.TableBuilderCache, spec *ToKafkaProcedureSpec) *ToKafkaTransformation
func (*ToKafkaTransformation) Finish ¶
func (t *ToKafkaTransformation) Finish(id execute.DatasetID, err error)