pipelaner

package module
v1.3.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 13, 2025 License: Apache-2.0 Imports: 14 Imported by: 0

README ยถ

Pipelaner

Pipelaner is a high-performance and efficient Framework and Agent for creating data pipelines. The core of pipeline descriptions is based on the Configuration As Code concept and the Pkl configuration language by Apple.

Pipelaner manages data streams through three key entities: Generator, Transform and Sink.


๐Ÿ“– Contents


๐Ÿ“Œ Core Entities

Generator

The component responsible for creating or retrieving source data for the pipeline. Generators can produce messages, events, or retrieve data from various sources such as files, databases, or APIs.

  • Example use case:
    Reading data from a file or receiving events via webhooks.

Transform

The component that processes data within the pipeline. Transforms perform operations such as filtering, aggregation, data transformation, or cleaning to prepare it for further processing.

  • Example use case:
    Filtering records based on specific conditions or converting data format from JSON to CSV.

Sink

The final destination for the data stream. Sinks send processed data to a target system, such as a database, API, or message queue.

  • Example use case:
    Saving data to PostgreSQL or sending it to a Kafka topic.

Basic Parameters
Parameter Type Description
name String Unique name of the pipeline element.
threads Int Number of threads for processing messages. Defaults to the value of GOMAXPROC.
outputBufferSize Int Size of the output buffer. Not applicable to Sink components.

๐Ÿ“ฆ Built-in Pipeline Elements

Generators
Name Description
cmd Reads the output of a command, e.g., "/usr/bin/log" "stream --style ndjson".
kafka Apache Kafka consumer that streams Value into the pipeline.
pipelaner GRPC server that streams values via gRPC.

Transforms
Name Description
batch Forms batches of data with a specified size.
chunks Splits incoming data into chunks.
debounce Eliminates "bounce" (frequent repeats) in data.
filter Filters data based on specified conditions.
remap Reassigns fields or transforms the data structure.
throttling Limits data processing rate.

Sinks
Name Description
clickhouse Sends data to a ClickHouse database.
console Outputs data to the console.
http Sends data to a specified HTTP endpoint.
kafka Publishes data to Apache Kafka.
pipelaner Streams data via gRPC to other Pipelaner nodes.

๐ŸŒ Scalability

Single-Node Deployment

For operation on a single host:
Single Node


Multi-Node Deployment

For distributed data processing across multiple hosts:
Multi-Node

For distributed interaction between nodes, you can use:

  1. gRPC โ€” via generators and sinks with the parameter sourceName: "pipelaner".
  2. Apache Kafka โ€” for reading/writing data via topics.

Example configuration using Kafka:

new Inputs.Kafka {
    ...
    common {
        ...
        topics {
            "kafka-topic"
        }         
    }
}

new Sinks.Kafka {
    ...
    common {
        ...
        topics {
            "kafka-topic"
        }         
    }
}

๐Ÿš€ Examples

Examples Description
Basic Pipeline A simple example illustrating the creation of a basic pipeline with prebuilt components.
Custom Components An advanced example showing how to create and integrate custom Generators, Transforms, and Sinks.

Overview
  1. ๐ŸŒŸ Basic Pipeline
    Learn the fundamentals of creating a pipeline with minimal configuration using ready-to-use components.

  2. ๐Ÿ›  Custom Components
    Extend Pipelanerโ€™s functionality by developing your own Generators, Transforms, and Sinks.


Each example includes clear configuration files and explanations to help you get started quickly.

๐Ÿ’ก Tip: Use these examples as templates to customize and build your own pipelines efficiently.

๐Ÿค Support

If you have questions, suggestions, or encounter any issues, please create an Issue in the repository.
You can also participate in discussions in the Discussions section.


๐Ÿ“œ License

This project is licensed under the Apache 2.0 license.
You are free to use, modify, and distribute the code under the terms of the license.

Documentation ยถ

Index ยถ

Constants ยถ

This section is empty.

Variables ยถ

This section is empty.

Functions ยถ

This section is empty.

Types ยถ

type Agent ยถ

type Agent struct {
	// contains filtered or unexported fields
}

func NewAgent ยถ

func NewAgent(file string) (*Agent, error)

func (*Agent) Serve ยถ

func (a *Agent) Serve(ctx context.Context) error

func (*Agent) Shutdown ยถ added in v1.0.0

func (a *Agent) Shutdown(ctx context.Context) error

type Pipelaner ยถ added in v1.0.0

type Pipelaner struct {
	Pipelines []pipeline
	// contains filtered or unexported fields
}

func NewPipelaner ยถ added in v1.0.0

func NewPipelaner(
	configs []components.Pipeline,
	loggerCfg logCfg.Config,
	metricsEnabled, gcAfterProcess bool,
) (*Pipelaner, error)

func (*Pipelaner) Run ยถ added in v1.0.0

func (p *Pipelaner) Run(ctx context.Context) error

Directories ยถ

Path Synopsis
examples
basic command
custom command
custom/gen/custom
Code generated from Pkl module `pipelaner.source.examples.custom`.
Code generated from Pkl module `pipelaner.source.examples.custom`.
gen
components
Code generated from Pkl module `com.pipelaner.source.components`.
Code generated from Pkl module `com.pipelaner.source.components`.
pipelaner
Code generated from Pkl module `com.pipelaner`.
Code generated from Pkl module `com.pipelaner`.
settings
Code generated from Pkl module `com.pipelaner.settings.settings`.
Code generated from Pkl module `com.pipelaner.settings.settings`.
settings/healthcheck
Code generated from Pkl module `com.pipelaner.settings.healthcheck.config`.
Code generated from Pkl module `com.pipelaner.settings.healthcheck.config`.
settings/logger
Code generated from Pkl module `com.pipelaner.settings.logger.config`.
Code generated from Pkl module `com.pipelaner.settings.logger.config`.
settings/logger/logformat
Code generated from Pkl module `com.pipelaner.settings.logger.config`.
Code generated from Pkl module `com.pipelaner.settings.logger.config`.
settings/logger/loglevel
Code generated from Pkl module `com.pipelaner.settings.logger.config`.
Code generated from Pkl module `com.pipelaner.settings.logger.config`.
settings/metrics
Code generated from Pkl module `com.pipelaner.settings.metrics.config`.
Code generated from Pkl module `com.pipelaner.settings.metrics.config`.
settings/migrations
Code generated from Pkl module `com.pipelaner.settings.migrations.config`.
Code generated from Pkl module `com.pipelaner.settings.migrations.config`.
settings/migrations/driver
Code generated from Pkl module `com.pipelaner.settings.migrations.config`.
Code generated from Pkl module `com.pipelaner.settings.migrations.config`.
settings/pprof
Code generated from Pkl module `com.pipelaner.settings.pprof.config`.
Code generated from Pkl module `com.pipelaner.settings.pprof.config`.
source/common
Code generated from Pkl module `com.pipelaner.source.common`.
Code generated from Pkl module `com.pipelaner.source.common`.
source/common/saslmechanism
Code generated from Pkl module `com.pipelaner.source.common`.
Code generated from Pkl module `com.pipelaner.source.common`.
source/input
Code generated from Pkl module `com.pipelaner.source.inputs`.
Code generated from Pkl module `com.pipelaner.source.inputs`.
source/input/autooffsetreset
Code generated from Pkl module `com.pipelaner.source.inputs`.
Code generated from Pkl module `com.pipelaner.source.inputs`.
source/input/commitstrategy
Code generated from Pkl module `com.pipelaner.source.inputs`.
Code generated from Pkl module `com.pipelaner.source.inputs`.
source/input/connectiontype
Code generated from Pkl module `com.pipelaner.source.inputs`.
Code generated from Pkl module `com.pipelaner.source.inputs`.
source/input/isolationlevel
Code generated from Pkl module `com.pipelaner.source.inputs`.
Code generated from Pkl module `com.pipelaner.source.inputs`.
source/input/strategy
Code generated from Pkl module `com.pipelaner.source.inputs`.
Code generated from Pkl module `com.pipelaner.source.inputs`.
source/sink
Code generated from Pkl module `com.pipelaner.source.sinks`.
Code generated from Pkl module `com.pipelaner.source.sinks`.
source/sink/method
Code generated from Pkl module `com.pipelaner.source.sinks`.
Code generated from Pkl module `com.pipelaner.source.sinks`.
source/transform
Code generated from Pkl module `com.pipelaner.source.transforms`.
Code generated from Pkl module `com.pipelaner.source.transforms`.
source/transform/flushtimerstrategy
Code generated from Pkl module `com.pipelaner.source.transforms`.
Code generated from Pkl module `com.pipelaner.source.transforms`.
internal
migrator
nolint
nolint

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL