Documentation
¶
Overview ¶
Package redpanda is the SDK for Redpanda's inline Data Transforms, based on WebAssembly.
This library provides a framework for transforming records written within Redpanda from an input to an output topic.
Example (IdentityTransform) ¶
This example shows the basic usage of the package: This is a "transform" that does nothing but copies the same data to an new topic.
package main
import (
"github.com/redpanda-data/redpanda/src/go/transform-sdk"
)
// This example shows the basic usage of the package:
// This is a "transform" that does nothing but copies the same data to an new
// topic.
func main() {
// Other setup can happen here, such as setting up lookup tables,
// initializing reusable buffers, reading environment variables, etc.
// Make sure to register your callback so Redpanda knows which
// function to invoke when records are written
redpanda.OnRecordWritten(mirrorTransform)
}
// This will be called for each record in the source topic.
//
// The output records returned will be written to the destination topic.
func mirrorTransform(e redpanda.WriteEvent) ([]redpanda.Record, error) {
return []redpanda.Record{e.Record()}, nil
}
Example (RegularExpressionFilter) ¶
This example shows a filter that uses a regexp to filter records from one topic into another. The filter can be determined when the transform is deployed by using environment variables to specify the pattern.
package main
import (
"os"
"regexp"
"github.com/redpanda-data/redpanda/src/go/transform-sdk"
)
var (
re *regexp.Regexp = nil
checkValue bool = false
)
// This example shows a filter that uses a regexp to filter records from
// one topic into another. The filter can be determined when the transform
// is deployed by using environment variables to specify the pattern.
func main() {
// setup the regexp
pattern, ok := os.LookupEnv("PATTERN")
if !ok {
panic("Missing PATTERN variable")
}
re = regexp.MustCompile(pattern)
mk, ok := os.LookupEnv("MATCH_VALUE")
checkValue = ok && mk == "1"
redpanda.OnRecordWritten(doRegexFilter)
}
func doRegexFilter(e redpanda.WriteEvent) ([]redpanda.Record, error) {
var b []byte
if checkValue {
b = e.Record().Value
} else {
b = e.Record().Key
}
if b == nil {
return nil, nil
}
pass := re.Match(b)
if pass {
return []redpanda.Record{e.Record()}, nil
} else {
return nil, nil
}
}
Example (Transcoding) ¶
This example shows a transform that converts CSV into JSON.
package main
import (
"bytes"
"encoding/csv"
"encoding/json"
"errors"
"io"
"strconv"
"github.com/redpanda-data/redpanda/src/go/transform-sdk"
)
// This example shows a transform that converts CSV into JSON.
func main() {
redpanda.OnRecordWritten(csvToJsonTransform)
}
type Foo struct {
A string `json:"a"`
B int `json:"b"`
}
func csvToJsonTransform(e redpanda.WriteEvent) ([]redpanda.Record, error) {
// The input data is a CSV (without a header row) that is the structure of:
// key, a, b
// This transform emits each row in that CSV as JSON.
reader := csv.NewReader(bytes.NewReader(e.Record().Value))
// Improve performance by reusing the result slice.
reader.ReuseRecord = true
output := []redpanda.Record{}
for {
row, err := reader.Read()
if err == io.EOF {
break
} else if err != nil {
return nil, err
}
if len(row) != 3 {
return nil, errors.New("unexpected number of rows")
}
// Convert the last column into an int
b, err := strconv.Atoi(row[2])
if err != nil {
return nil, err
}
// Marshal our JSON value
f := Foo{
A: row[1],
B: b,
}
v, err := json.Marshal(&f)
if err != nil {
return nil, err
}
// Add our output record using the first column as the key.
output = append(output, redpanda.Record{
Key: []byte(row[0]),
Value: v,
})
}
return output, nil
}
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func OnRecordWritten ¶
func OnRecordWritten(fn OnRecordWrittenCallback)
OnRecordWritten registers a callback to be fired when a record is written to the input topic.
OnRecordWritten should be called in a package's `main` function to register the transform function that will be applied.
Types ¶
type OnRecordWrittenCallback ¶
type OnRecordWrittenCallback func(e WriteEvent) ([]Record, error)
OnRecordWrittenCallback is a callback to transform records after a write event happens in the input topic.
type Record ¶
type Record struct {
// Key is an optional field.
Key []byte
// Value is the blob of data that is written to Redpanda.
Value []byte
// Headers are client specified key/value pairs that are
// attached to a record.
Headers []RecordHeader
// Attrs is the attributes of a record.
//
// Output records should leave these unset.
Attrs RecordAttrs
// The timestamp associated with this record.
//
// For output records this can be left unset as it will
// always be the same value as the input record.
Timestamp time.Time
// The offset of this record in the partition.
//
// For output records this field is left unset,
// as it will be set by Redpanda.
Offset int64
}
Record is a record that has been written to Redpanda.
type RecordAttrs ¶
type RecordAttrs struct {
// contains filtered or unexported fields
}
type RecordHeader ¶
Headers are optional key/value pairs that are passed along with records.
type WriteEvent ¶
type WriteEvent interface {
// Access the record associated with this event
Record() Record
}
WriteEvent contains information about the write that took place, namely it contains the record that was written.
Directories
¶
| Path | Synopsis |
|---|---|
|
internal
|
|
|
rwbuf
A buffer with a reader and writer index that gives access to the underlying array.
|
A buffer with a reader and writer index that gives access to the underlying array. |
|
Package sr is a schema registry client for Redpanda for usage within inline Data Transforms.
|
Package sr is a schema registry client for Redpanda for usage within inline Data Transforms. |