redpanda

package module
v0.0.0-...-701900d Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 11, 2023 License: Apache-2.0 Imports: 5 Imported by: 0

Documentation

Overview

Package redpanda is the SDK for Redpanda's inline Data Transforms, based on WebAssembly.

This library provides a framework for transforming records written within Redpanda from an input to an output topic.

Example (IdentityTransform)

This example shows the basic usage of the package: This is a "transform" that does nothing but copies the same data to an new topic.

package main

import (
	"github.com/redpanda-data/redpanda/src/go/transform-sdk"
)

// This example shows the basic usage of the package:
// This is a "transform" that does nothing but copies the same data to an new
// topic.
func main() {
	// Other setup can happen here, such as setting up lookup tables,
	// initializing reusable buffers, reading environment variables, etc.

	// Make sure to register your callback so Redpanda knows which
	// function to invoke when records are written
	redpanda.OnRecordWritten(mirrorTransform)
}

// This will be called for each record in the source topic.
//
// The output records returned will be written to the destination topic.
func mirrorTransform(e redpanda.WriteEvent) ([]redpanda.Record, error) {
	return []redpanda.Record{e.Record()}, nil
}
Example (RegularExpressionFilter)

This example shows a filter that uses a regexp to filter records from one topic into another. The filter can be determined when the transform is deployed by using environment variables to specify the pattern.

package main

import (
	"os"
	"regexp"

	"github.com/redpanda-data/redpanda/src/go/transform-sdk"
)

var (
	re         *regexp.Regexp = nil
	checkValue bool           = false
)

// This example shows a filter that uses a regexp to filter records from
// one topic into another. The filter can be determined when the transform
// is deployed by using environment variables to specify the pattern.
func main() {
	// setup the regexp
	pattern, ok := os.LookupEnv("PATTERN")
	if !ok {
		panic("Missing PATTERN variable")
	}
	re = regexp.MustCompile(pattern)
	mk, ok := os.LookupEnv("MATCH_VALUE")
	checkValue = ok && mk == "1"

	redpanda.OnRecordWritten(doRegexFilter)
}

func doRegexFilter(e redpanda.WriteEvent) ([]redpanda.Record, error) {
	var b []byte
	if checkValue {
		b = e.Record().Value
	} else {
		b = e.Record().Key
	}
	if b == nil {
		return nil, nil
	}
	pass := re.Match(b)
	if pass {
		return []redpanda.Record{e.Record()}, nil
	} else {
		return nil, nil
	}
}
Example (Transcoding)

This example shows a transform that converts CSV into JSON.

package main

import (
	"bytes"
	"encoding/csv"
	"encoding/json"
	"errors"
	"io"
	"strconv"

	"github.com/redpanda-data/redpanda/src/go/transform-sdk"
)

// This example shows a transform that converts CSV into JSON.
func main() {
	redpanda.OnRecordWritten(csvToJsonTransform)
}

type Foo struct {
	A string `json:"a"`
	B int    `json:"b"`
}

func csvToJsonTransform(e redpanda.WriteEvent) ([]redpanda.Record, error) {
	// The input data is a CSV (without a header row) that is the structure of:
	// key, a, b
	// This transform emits each row in that CSV as JSON.
	reader := csv.NewReader(bytes.NewReader(e.Record().Value))
	// Improve performance by reusing the result slice.
	reader.ReuseRecord = true
	output := []redpanda.Record{}
	for {
		row, err := reader.Read()
		if err == io.EOF {
			break
		} else if err != nil {
			return nil, err
		}
		if len(row) != 3 {
			return nil, errors.New("unexpected number of rows")
		}
		// Convert the last column into an int
		b, err := strconv.Atoi(row[2])
		if err != nil {
			return nil, err
		}
		// Marshal our JSON value
		f := Foo{
			A: row[1],
			B: b,
		}
		v, err := json.Marshal(&f)
		if err != nil {
			return nil, err
		}
		// Add our output record using the first column as the key.
		output = append(output, redpanda.Record{
			Key:   []byte(row[0]),
			Value: v,
		})

	}
	return output, nil
}

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func OnRecordWritten

func OnRecordWritten(fn OnRecordWrittenCallback)

OnRecordWritten registers a callback to be fired when a record is written to the input topic.

OnRecordWritten should be called in a package's `main` function to register the transform function that will be applied.

Types

type OnRecordWrittenCallback

type OnRecordWrittenCallback func(e WriteEvent) ([]Record, error)

OnRecordWrittenCallback is a callback to transform records after a write event happens in the input topic.

type Record

type Record struct {
	// Key is an optional field.
	Key []byte
	// Value is the blob of data that is written to Redpanda.
	Value []byte
	// Headers are client specified key/value pairs that are
	// attached to a record.
	Headers []RecordHeader
	// Attrs is the attributes of a record.
	//
	// Output records should leave these unset.
	Attrs RecordAttrs
	// The timestamp associated with this record.
	//
	// For output records this can be left unset as it will
	// always be the same value as the input record.
	Timestamp time.Time
	// The offset of this record in the partition.
	//
	// For output records this field is left unset,
	// as it will be set by Redpanda.
	Offset int64
}

Record is a record that has been written to Redpanda.

type RecordAttrs

type RecordAttrs struct {
	// contains filtered or unexported fields
}

type RecordHeader

type RecordHeader struct {
	Key   []byte
	Value []byte
}

Headers are optional key/value pairs that are passed along with records.

type WriteEvent

type WriteEvent interface {
	// Access the record associated with this event
	Record() Record
}

WriteEvent contains information about the write that took place, namely it contains the record that was written.

Directories

Path Synopsis
internal
rwbuf
A buffer with a reader and writer index that gives access to the underlying array.
A buffer with a reader and writer index that gives access to the underlying array.
Package sr is a schema registry client for Redpanda for usage within inline Data Transforms.
Package sr is a schema registry client for Redpanda for usage within inline Data Transforms.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL