pipeline

package
v0.12.13 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 30, 2025 License: MIT Imports: 42 Imported by: 3

Documentation

Index

Constants

View Source
const (
	ReaderCompress Type = "rcompress"
	WriterCompress Type = "wcompress"

	CompressTypeNop    CompressType = "nop"
	CompressTypeGzip   CompressType = "gzip"
	CompressTypeSnappy CompressType = "snappy"
	CompressTypeZstd   CompressType = "zstd"
	CompressTypeLz4    CompressType = "lz4"

	CompressLevelFast   CompressLevel = "fast"
	CompressLevelBetter CompressLevel = "better"
	CompressLevelBest   CompressLevel = "best"
)
View Source
const PluginTypePipeline plugin.Type = "ppl"

Variables

View Source
var (
	ErrWrapTwice   = errors.New("wrap twice")
	ErrWrapNil     = errors.New("wrap nil")
	ErrNotWrapper  = errors.New("not wrapper")
	ErrInvalidWrap = errors.New("invalid wrap")
)
View Source
var CreateReader = newBaseReader
View Source
var CreateWorker = newBaseWorker
View Source
var CreateWriter = newBaseWriter

Functions

func NewGzipReader

func NewGzipReader(r io.Reader, cfg *CompressCfg) io.ReadCloser

func NewGzipWritter

func NewGzipWritter(w io.Writer, cfg *CompressCfg) io.WriteCloser

func NewLz4Reader added in v0.12.7

func NewLz4Reader(r io.Reader, cfg *CompressCfg) io.ReadCloser

func NewLz4Writer added in v0.12.7

func NewLz4Writer(w io.Writer, cfg *CompressCfg) io.WriteCloser

func NewSnappyReader

func NewSnappyReader(r io.Reader, _ *CompressCfg) io.ReadCloser

func NewSnappyWriter

func NewSnappyWriter(w io.Writer, cfg *CompressCfg) io.WriteCloser

func NewZstdReader

func NewZstdReader(r io.Reader, cfg *CompressCfg) io.ReadCloser

func NewZstdWriter

func NewZstdWriter(w io.Writer, cfg *CompressCfg) io.WriteCloser

Types

type BaseReader added in v0.11.0

type BaseReader struct {
	runner.Runner
	io.Reader
	// contains filtered or unexported fields
}

func (*BaseReader) Close added in v0.11.0

func (b *BaseReader) Close() error

func (*BaseReader) DirectReader added in v0.11.26

func (b *BaseReader) DirectReader() io.Reader

func (*BaseReader) Init added in v0.11.0

func (b *BaseReader) Init() error

func (*BaseReader) Size added in v0.11.0

func (b *BaseReader) Size() int64

func (*BaseReader) WithOptions added in v0.11.0

func (b *BaseReader) WithOptions(opts ...Option)

func (*BaseReader) WrapReader added in v0.11.0

func (b *BaseReader) WrapReader(r io.Reader)

func (*BaseReader) WriteTo added in v0.11.0

func (b *BaseReader) WriteTo(w io.Writer) (int64, error)

type BaseWorker added in v0.11.0

type BaseWorker struct {
	runner.Runner
	// contains filtered or unexported fields
}

func (*BaseWorker) Close added in v0.11.0

func (b *BaseWorker) Close() error

func (*BaseWorker) Init added in v0.11.0

func (b *BaseWorker) Init() error

func (*BaseWorker) LastReader added in v0.11.0

func (b *BaseWorker) LastReader() io.Reader

func (*BaseWorker) LastWriter added in v0.11.0

func (b *BaseWorker) LastWriter() io.Writer

func (*BaseWorker) ReadFrom added in v0.11.0

func (b *BaseWorker) ReadFrom(r ...io.Reader)

func (*BaseWorker) Reader added in v0.11.0

func (b *BaseWorker) Reader() io.Reader

func (*BaseWorker) Readers added in v0.11.0

func (b *BaseWorker) Readers() []io.Reader

func (*BaseWorker) Result added in v0.11.0

func (b *BaseWorker) Result() *WorkerResult

func (*BaseWorker) Start added in v0.11.0

func (b *BaseWorker) Start() error

func (*BaseWorker) Stop added in v0.11.0

func (b *BaseWorker) Stop() error

func (*BaseWorker) WithOptions added in v0.11.0

func (b *BaseWorker) WithOptions(...Option)

func (*BaseWorker) WriteTo added in v0.11.0

func (b *BaseWorker) WriteTo(w ...io.Writer)

func (*BaseWorker) Writer added in v0.11.0

func (b *BaseWorker) Writer() io.Writer

func (*BaseWorker) Writers added in v0.11.0

func (b *BaseWorker) Writers() []io.Writer

type BaseWriter added in v0.11.0

type BaseWriter struct {
	runner.Runner
	io.Writer
	// contains filtered or unexported fields
}

func (*BaseWriter) Close added in v0.11.0

func (b *BaseWriter) Close() error

func (*BaseWriter) DirectWriter added in v0.11.26

func (b *BaseWriter) DirectWriter() io.Writer

func (*BaseWriter) Init added in v0.11.0

func (b *BaseWriter) Init() error

func (*BaseWriter) ReadFrom added in v0.11.0

func (b *BaseWriter) ReadFrom(r io.Reader) (int64, error)

func (*BaseWriter) WithOptions added in v0.11.0

func (b *BaseWriter) WithOptions(opts ...Option)

func (*BaseWriter) WrapWriter added in v0.11.0

func (b *BaseWriter) WrapWriter(w io.Writer)

type Cfg

type Cfg struct {
	Workers []*WorkerCfg `json:"workers" yaml:"workers"`
}

func NewCfg

func NewCfg() *Cfg

func (*Cfg) Add added in v0.0.8

func (c *Cfg) Add(typ Type, cfg any, opt *CommonOption) *WorkerCfg

func (*Cfg) AddWorker added in v0.11.2

func (c *Cfg) AddWorker(w *WorkerCfg) *Cfg

type Cmd added in v0.11.0

type Cmd struct {
	Worker
	*cmd.Cfg
}

func NewCmd added in v0.11.0

func NewCmd() *Cmd

func (*Cmd) SetCfg added in v0.11.0

func (c *Cmd) SetCfg(cfg any)

func (*Cmd) Start added in v0.11.0

func (c *Cmd) Start() error

func (*Cmd) Stop added in v0.11.0

func (c *Cmd) Stop() error

type Common added in v0.11.0

type Common interface {
	io.Closer
	runner.Runner
	plugin.Plugin
	// contains filtered or unexported methods
}

type CommonCfg added in v0.11.0

type CommonCfg struct {
	Type Type `json:"type" yaml:"type"`
	Cfg  any  `json:"cfg" yaml:"cfg"`
}

func (*CommonCfg) UnmarshalJSON added in v0.11.0

func (c *CommonCfg) UnmarshalJSON(data []byte) error

func (*CommonCfg) UnmarshalYAML added in v0.11.0

func (c *CommonCfg) UnmarshalYAML(data []byte) error

type CommonCfgWithOption added in v0.11.0

type CommonCfgWithOption struct {
	*CommonCfg
	*CommonOption
}

func (*CommonCfgWithOption) UnmarshalJSON added in v0.11.0

func (cc *CommonCfgWithOption) UnmarshalJSON(data []byte) error

func (*CommonCfgWithOption) UnmarshalYAML added in v0.11.0

func (cc *CommonCfgWithOption) UnmarshalYAML(data []byte) error

type CommonOption added in v0.11.0

type CommonOption struct {
	BufSize             int            `json:"bufSize" yaml:"bufSize"`
	QueueSize           int            `json:"queueSize" yaml:"queueSize"`
	Deadline            int            `json:"deadline" yaml:"deadline"`
	Async               bool           `json:"async" yaml:"async"`
	ProgressLogInterval int            `json:"progressLogInterval" yaml:"progressLogInterval"`
	Hash                string         `json:"hash" yaml:"hash"`
	Checksum            string         `json:"checksum" yaml:"checksum"`
	RateLimitCfg        *ratelimit.Cfg `json:"rateLimitCfg" yaml:"rateLimitCfg"`
	Count               bool           `json:"count" yaml:"count"`
}

type CompressCfg added in v0.11.0

type CompressCfg struct {
	Type        CompressType  `json:"type"        validate:"required" yaml:"type"`
	Level       CompressLevel `json:"level"       validate:"required" yaml:"level"`
	Concurrency int           `json:"concurrency" yaml:"concurrency"`
}

func NewCompressCfg added in v0.11.0

func NewCompressCfg() *CompressCfg

type CompressLevel

type CompressLevel string

type CompressReader added in v0.11.0

type CompressReader struct {
	Reader
	*CompressCfg
	// contains filtered or unexported fields
}

func NewCompressReader added in v0.11.0

func NewCompressReader() *CompressReader

func (*CompressReader) Init added in v0.11.0

func (c *CompressReader) Init() error

func (*CompressReader) SetCfg added in v0.11.0

func (c *CompressReader) SetCfg(cfg any)

func (*CompressReader) WrapReader added in v0.11.0

func (c *CompressReader) WrapReader(r io.Reader)

type CompressType

type CompressType string

type CompressWriter added in v0.11.0

type CompressWriter struct {
	Writer
	*CompressCfg
	// contains filtered or unexported fields
}

func NewCompressWriter added in v0.11.0

func NewCompressWriter() *CompressWriter

func (*CompressWriter) Init added in v0.11.0

func (c *CompressWriter) Init() error

func (*CompressWriter) SetCfg added in v0.11.0

func (c *CompressWriter) SetCfg(cfg any)

func (*CompressWriter) WrapWriter added in v0.11.0

func (c *CompressWriter) WrapWriter(w io.Writer)

type Copy added in v0.11.0

type Copy struct {
	Worker
	// contains filtered or unexported fields
}

func NewCopy added in v0.11.0

func NewCopy() *Copy

func (*Copy) Init added in v0.11.0

func (c *Copy) Init() error

func (*Copy) SetCfg added in v0.11.0

func (c *Copy) SetCfg(cfg any)

func (*Copy) Start added in v0.11.0

func (c *Copy) Start() error

func (*Copy) Stop added in v0.11.0

func (c *Copy) Stop() error

type CopyCfg added in v0.11.0

type CopyCfg struct {
	BufSize int `json:"bufSize" yaml:"bufSize"`
}

func NewCopyCfg added in v0.11.0

func NewCopyCfg() *CopyCfg

type File added in v0.11.0

type File struct {
	*os.File
	*FileCfg
	// contains filtered or unexported fields
}

func (*File) Size added in v0.11.0

func (f *File) Size() int64

type FileCfg added in v0.11.0

type FileCfg struct {
	Path string `json:"path" yaml:"path"`
	Perm uint32 `json:"perm" yaml:"perm"`
}

func NewFileCfg added in v0.11.0

func NewFileCfg() *FileCfg

type FileReader added in v0.11.0

type FileReader struct {
	Reader
	// contains filtered or unexported fields
}

func NewFileReader added in v0.11.0

func NewFileReader() *FileReader

func (*FileReader) Init added in v0.11.0

func (f *FileReader) Init() error

func (*FileReader) SetCfg added in v0.11.0

func (f *FileReader) SetCfg(cfg any)

func (*FileReader) Size added in v0.11.0

func (f *FileReader) Size() int64

func (*FileReader) WrapReader added in v0.11.0

func (f *FileReader) WrapReader(io.Reader)

type FileWriter added in v0.11.0

type FileWriter struct {
	Writer
	// contains filtered or unexported fields
}

func NewFileWriter added in v0.11.0

func NewFileWriter() *FileWriter

func (*FileWriter) Init added in v0.11.0

func (f *FileWriter) Init() error

func (*FileWriter) SetCfg added in v0.11.0

func (f *FileWriter) SetCfg(cfg any)

func (*FileWriter) WrapWriter added in v0.11.0

func (f *FileWriter) WrapWriter(io.Writer)

type FtpCfg

type FtpCfg struct {
	*ftp.Cfg
	Path string `json:"path" yaml:"path" validate:"required"`
}

func NewFtpCfg added in v0.11.0

func NewFtpCfg() *FtpCfg

type FtpReader added in v0.11.0

type FtpReader struct {
	Reader
	*FtpCfg
}

func NewFtpReader added in v0.11.0

func NewFtpReader() *FtpReader

func (*FtpReader) Init added in v0.11.0

func (f *FtpReader) Init() error

func (*FtpReader) SetCfg added in v0.11.0

func (f *FtpReader) SetCfg(c any)

func (*FtpReader) WrapReader added in v0.11.0

func (f *FtpReader) WrapReader(io.Reader)

type FtpWriter added in v0.11.0

type FtpWriter struct {
	Writer
	*FtpCfg
}

func NewFtpWriter added in v0.11.0

func NewFtpWriter() *FtpWriter

func (*FtpWriter) Init added in v0.11.0

func (f *FtpWriter) Init() error

func (*FtpWriter) SetCfg added in v0.11.0

func (f *FtpWriter) SetCfg(c any)

func (*FtpWriter) WrapWriter added in v0.11.0

func (f *FtpWriter) WrapWriter(io.Writer)

type NopCfg added in v0.11.2

type NopCfg struct {
}

func NewNopCfg added in v0.11.2

func NewNopCfg() *NopCfg

type NopReader added in v0.11.2

type NopReader struct {
	Reader
	*NopCfg
}

func NewNopReader added in v0.11.2

func NewNopReader() *NopReader

type NopWriter added in v0.11.2

type NopWriter struct {
	Writer
	*NopCfg
}

func NewNopWriter added in v0.11.2

func NewNopWriter() *NopWriter

type OSSCfg added in v0.11.0

type OSSCfg struct {
	*oss.Cfg
	Append bool `json:"append" yaml:"append"`
}

func NewOSSCfg added in v0.11.0

func NewOSSCfg() *OSSCfg

type OSSReader added in v0.11.0

type OSSReader struct {
	Reader
	*OSSCfg
	// contains filtered or unexported fields
}

func NewOSSReader added in v0.11.0

func NewOSSReader() *OSSReader

func (*OSSReader) Init added in v0.11.0

func (o *OSSReader) Init() error

func (*OSSReader) SetCfg added in v0.11.0

func (o *OSSReader) SetCfg(c any)

func (*OSSReader) Size added in v0.11.0

func (o *OSSReader) Size() int64

func (*OSSReader) WrapReader added in v0.11.0

func (o *OSSReader) WrapReader(io.Reader)

type OSSWriter added in v0.11.0

type OSSWriter struct {
	Writer
	*OSSCfg
}

func NewOSSWriter added in v0.11.0

func NewOSSWriter() *OSSWriter

func (*OSSWriter) Init added in v0.11.0

func (o *OSSWriter) Init() error

func (*OSSWriter) SetCfg added in v0.11.0

func (o *OSSWriter) SetCfg(c any)

func (*OSSWriter) WrapWriter added in v0.11.0

func (o *OSSWriter) WrapWriter(io.Writer)

type Option added in v0.11.0

type Option interface {
	// contains filtered or unexported methods
}

func Checksum added in v0.11.0

func Checksum(checksum string, h hash.Hash) Option

func CountRead added in v0.11.2

func CountRead() Option

func CountWrite added in v0.11.2

func CountWrite() Option

func EnableAsyncRead added in v0.11.0

func EnableAsyncRead(bufSize int, queueSize int) Option

func EnableAsyncWrite added in v0.11.0

func EnableAsyncWrite(bufSize int, queueSize int, deadline time.Duration) Option

func EnableBufRead added in v0.11.0

func EnableBufRead(bufSize int) Option

func EnableBufWrite added in v0.11.0

func EnableBufWrite(bufSize int) Option

func HashRead added in v0.11.13

func HashRead(h hash.Hash) Option

func HashWrite added in v0.11.13

func HashWrite(h hash.Hash) Option

func LogRead added in v0.11.0

func LogRead(getFields func() []any) Option

func LogWrite added in v0.11.0

func LogWrite(getFields func() []any) Option

func MultiWrite added in v0.11.0

func MultiWrite(w ...io.Writer) Option

MultiWrite only for Writer

func OnClose added in v0.11.0

func OnClose(c ...closeFunc) Option

func ProgressLogRead added in v0.11.2

func ProgressLogRead(interval time.Duration) Option

func ProgressLogWrite added in v0.11.2

func ProgressLogWrite(interval time.Duration) Option

func RateLimitRead added in v0.11.13

func RateLimitRead(cfg *ratelimit.Cfg) Option

func RateLimitWrite added in v0.11.13

func RateLimitWrite(cfg *ratelimit.Cfg) Option

func Tee added in v0.11.0

func Tee(w ...io.Writer) Option

Tee only for Reader

func WrapReader added in v0.11.0

func WrapReader(f ReaderWrapFunc) Option

func WrapWriter added in v0.11.0

func WrapWriter(f WriterWrapFunc) Option

type Pipeline

type Pipeline struct {
	runner.Runner
	// contains filtered or unexported fields
}

func New

func New() *Pipeline

func (*Pipeline) AddWorker added in v0.11.0

func (p *Pipeline) AddWorker(w Worker)

AddWorker for no cfg scene.

func (*Pipeline) Init

func (p *Pipeline) Init() error

func (*Pipeline) Result added in v0.7.3

func (p *Pipeline) Result() *Result

func (*Pipeline) SetCfg added in v0.11.0

func (p *Pipeline) SetCfg(cfg any)

func (*Pipeline) Start

func (p *Pipeline) Start() error

func (*Pipeline) Stop

func (p *Pipeline) Stop() error

func (*Pipeline) Workers added in v0.11.0

func (p *Pipeline) Workers() []Worker

type Reader added in v0.11.0

type Reader interface {
	Common
	io.Reader
	io.WriterTo

	DirectReader() io.Reader // for zero copy
	// contains filtered or unexported methods
}

type ReaderCfg added in v0.11.0

type ReaderCfg struct {
	*CommonCfgWithOption
}

func (*ReaderCfg) UnmarshalJSON added in v0.11.20

func (rc *ReaderCfg) UnmarshalJSON(data []byte) error

func (*ReaderCfg) UnmarshalYAML added in v0.11.20

func (rc *ReaderCfg) UnmarshalYAML(data []byte) error

type ReaderWrapFunc added in v0.11.0

type ReaderWrapFunc func(io.Reader) io.Reader

type Result added in v0.7.3

type Result struct {
	Cfg           *Cfg            `json:"cfg" yaml:"cfg"`
	Data          map[string]any  `json:"data" yaml:"data"`
	WorkersResult []*WorkerResult `json:"workersResult" yaml:"workersResult"`
}

type SSH added in v0.11.0

type SSH struct {
	Worker
	// contains filtered or unexported fields
}

func NewSSH added in v0.11.0

func NewSSH() *SSH

func (*SSH) Close added in v0.11.0

func (s *SSH) Close() error

func (*SSH) Init added in v0.11.0

func (s *SSH) Init() error

func (*SSH) SetCfg added in v0.11.0

func (s *SSH) SetCfg(c any)

func (*SSH) Start added in v0.11.0

func (s *SSH) Start() error

func (*SSH) Stop added in v0.11.0

func (s *SSH) Stop() error

type SSHCfg

type SSHCfg struct {
	Addr       string `json:"addr"       yaml:"addr" validate:"required"`
	User       string `json:"user"       yaml:"user" validate:"required"`
	Pwd        string `json:"pwd"        yaml:"pwd"`
	PrivateKey string `json:"privateKey" yaml:"privateKey"`
	Path       string `json:"path"       yaml:"path" validate:"required"`
	Timeout    int    `json:"timeout"    yaml:"timeout"`
}

func NewSSHCfg added in v0.11.0

func NewSSHCfg() *SSHCfg

type Tail added in v0.11.0

type Tail struct {
	Reader
	// contains filtered or unexported fields
}

func NewTail added in v0.11.0

func NewTail() *Tail

func (*Tail) Init added in v0.11.0

func (t *Tail) Init() error

func (*Tail) Offset added in v0.11.2

func (t *Tail) Offset() int64

func (*Tail) SetCfg added in v0.11.0

func (t *Tail) SetCfg(c any)

func (*Tail) WrapReader added in v0.11.0

func (t *Tail) WrapReader(io.Reader)

type TailCfg added in v0.11.0

type TailCfg struct {
	Path   string `json:"path" yaml:"path"`
	Offset int64  `json:"offset" yaml:"offset"`
}

func NewTailCfg added in v0.11.0

func NewTailCfg() *TailCfg

type Type added in v0.11.0

type Type string
const (
	ReaderFile Type = "rfile"
	WriterFile Type = "wfile"
)
const (
	ReaderFtp Type = "rftp"
	WriterFtp Type = "wftp"
)
const (
	ReaderNop Type = "rnop"
	WriterNop Type = "wnop"
)
const (
	ReaderOSS Type = "ross"
	WriterOSS Type = "woss"
)
const ReaderTail Type = "tail"
const WorkerCmd Type = "cmd"
const WorkerCopy Type = "copy"
const WorkerSSH Type = "ssh"

type Worker added in v0.11.0

type Worker interface {
	Common

	WriteTo(...io.Writer)
	ReadFrom(...io.Reader)

	Writers() []io.Writer
	Readers() []io.Reader
	LastWriter() io.Writer
	LastReader() io.Reader

	Reader() io.Reader
	Writer() io.Writer

	Result() *WorkerResult
}

type WorkerCfg added in v0.11.0

type WorkerCfg struct {
	*CommonCfgWithOption
	Readers []*ReaderCfg `json:"readers" yaml:"readers"`
	Writers []*WriterCfg `json:"writers" yaml:"writers"`
}

func (*WorkerCfg) ReadFrom added in v0.11.2

func (wc *WorkerCfg) ReadFrom(typ Type, cfg any, opt *CommonOption) *WorkerCfg

func (*WorkerCfg) ReadFromReader added in v0.11.2

func (wc *WorkerCfg) ReadFromReader(c *CommonCfgWithOption) *WorkerCfg

func (*WorkerCfg) UnmarshalJSON added in v0.11.0

func (wc *WorkerCfg) UnmarshalJSON(data []byte) error

func (*WorkerCfg) UnmarshalYAML added in v0.11.0

func (wc *WorkerCfg) UnmarshalYAML(data []byte) error

func (*WorkerCfg) WriteTo added in v0.11.2

func (wc *WorkerCfg) WriteTo(typ Type, cfg any, opt *CommonOption) *WorkerCfg

func (*WorkerCfg) WriteToWriter added in v0.11.2

func (wc *WorkerCfg) WriteToWriter(c *CommonCfgWithOption) *WorkerCfg

type WorkerResult added in v0.11.0

type WorkerResult struct {
	Data        map[string]any   `json:"data" yaml:"data"`
	ReadersData []map[string]any `json:"readersData" yaml:"readersData"`
	WritersData []map[string]any `json:"writersData" yaml:"writersData"`
}

type Writer added in v0.11.0

type Writer interface {
	Common
	io.Writer
	io.ReaderFrom

	DirectWriter() io.Writer // for zero copy
	// contains filtered or unexported methods
}

type WriterCfg added in v0.11.0

type WriterCfg struct {
	*CommonCfgWithOption
}

func (*WriterCfg) UnmarshalJSON added in v0.11.20

func (wc *WriterCfg) UnmarshalJSON(data []byte) error

func (*WriterCfg) UnmarshalYAML added in v0.11.20

func (wc *WriterCfg) UnmarshalYAML(data []byte) error

type WriterWrapFunc added in v0.11.0

type WriterWrapFunc func(io.Writer) io.Writer

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL