Documentation
¶
Overview ¶
Package processor defines abstraction for serverless concurrent processing
Index ¶
- Constants
- func NewDataCorruption(msg string) error
- func NewPartialRetry(msg string, data interface{}) error
- type BaseReporter
- type Config
- func (c Config) AdjustScannerBuffer(scanner *bufio.Scanner)
- func (c Config) Deadline(ctx context.Context) time.Time
- func (c *Config) ExpandDestination(startTime time.Time) *config.Stream
- func (c Config) ExpandDestinationRotationURL(startTime time.Time) string
- func (c Config) ExpandDestinationURL(startTime time.Time) string
- func (c *Config) Init(ctx context.Context, fs afs.Service) error
- func (c *Config) InitWithNoLimit()
- func (c Config) LoaderDeadline(ctx context.Context) time.Time
- func (c *Config) Validate() error
- type DataCorruption
- type Field
- type Fields
- type Handler
- type HandlerReporter
- type PartialRetry
- type PostProcessor
- type PreProcessor
- type Processor
- type Reporter
- type Request
- type Response
- type Service
- type Sort
- type Sortables
- type Spec
- type StatusSet
- type Writer
Examples ¶
Constants ¶
const ( //OnDoneDelete delete action OnDoneDelete = "delete" //OnDoneMove move action OnDoneMove = "move" )
const ( //Source types Parquet = "parquet" JSON = "json" CSV = "csv" )
const ( StatusOk = "ok" StatusError = "error" StatusSetOk = StatusSet(1) StatusSetError = StatusSet(2) StatusSetRetriable = StatusSet(4) StatusSetCorrupted = StatusSet(8) )
const (
RetryFragment = "-retry"
)
Variables ¶
This section is empty.
Functions ¶
func NewDataCorruption ¶
NewDataCorruption returns data corruption error
func NewPartialRetry ¶
NewDataCorruption returns data corruption error
Types ¶
type BaseReporter ¶
type BaseReporter struct {
*Response
// contains filtered or unexported fields
}
func (*BaseReporter) BaseResponse ¶
func (r *BaseReporter) BaseResponse() *Response
Response returns base response info
Example ¶
package main
import (
"context"
"encoding/json"
"fmt"
"github.com/viant/afs"
"github.com/viant/cloudless/data/processor"
"net/http"
"strings"
"sync"
)
// URLReporter represents URL reporter
type URLReporter struct {
processor.BaseReporter
ByResponseCode map[int]int
mutex sync.Mutex
}
// NewURLReporter represents URL reporeter
func NewURLReporter() processor.Reporter {
return &URLReporter{
ByResponseCode: make(map[int]int),
BaseReporter: processor.BaseReporter{
Response: &processor.Response{Status: processor.StatusOk},
},
}
}
type HTTPProcessor struct {
BaseURL string
}
func (p HTTPProcessor) Process(ctx context.Context, data interface{}, reporter processor.Reporter) error {
urlReporter := reporter.(*URLReporter)
URL := p.BaseURL + string(data.([]byte))
request, err := http.NewRequestWithContext(ctx, http.MethodGet, URL, nil)
if err != nil {
return processor.NewDataCorruption(fmt.Sprintf("invalid request: %v", URL))
}
response, err := http.DefaultClient.Do(request)
if err != nil {
return err
}
urlReporter.mutex.Lock()
defer urlReporter.mutex.Unlock()
urlReporter.ByResponseCode[response.StatusCode]++
return nil
}
func main() {
ctx := context.Background()
service := processor.New(&processor.Config{
CorruptionURL: "mem://localhost/corrupted",
RetryURL: "mem://localhost/retry",
FailedURL: "mem://localhost/failed",
}, afs.New(), &HTTPProcessor{BaseURL: "http://mydataexporter/enrich/?data="}, NewURLReporter)
reporter := service.Do(ctx, processor.NewRequest(strings.NewReader("dGVzdCBpcyB0ZXN0\nYW5vdGhlciBvbmU="),
nil,
"mem://localhost/trigger/data.txt"))
response, _ := json.Marshal(reporter)
fmt.Printf("%s\n", response)
}
type Config ¶
type Config struct {
DeadlineReductionMs int // Deadline typically comes from Lambda ctx. Max exec time == Deadline - DeadlineReductionMs
LoaderDeadlineLagMs int // Loader will finish earlier than workers to let the latter complete
MaxRetries int
Concurrency int
DestinationURL string // Service processing data destination URL. This is a template, e.g. $gs://$mybucket/$prefix/$a.dat
DestinationCodec string
Destination *config.Stream
RetryURL string // destination for the data to be retried
FailedURL string // destination for the data that has failed max retires
CorruptionURL string /// destination for the corrupted data
MaxExecTimeMs int // default execution timeMs used when context does not come with deadline
OnDone string //move or delete, (move moves data to process URL,or delete for delete)
OnDoneURL string
ReaderBufferSize int //if set above zero uses afs Steam option
BatchSize int //number of data lines passed to processor (1 by default)
Sort Sort //optional sorting config
ScannerBufferMB int //use in case you see bufio.Scanner: token too long
MetricPort int //if specified HTTP endpoint port to expose metrics
RowTypeName string // parquet/json row type
OnMirrorURL string //OnMirror represents copy url of the resource
QuorumExt string
}
Config represents processor configuration
func (Config) AdjustScannerBuffer ¶
func (*Config) ExpandDestination ¶
func (Config) ExpandDestinationRotationURL ¶
func (Config) ExpandDestinationURL ¶
func (*Config) InitWithNoLimit ¶
func (c *Config) InitWithNoLimit()
InitWithNoLimit intialise config with no execution limit
func (Config) LoaderDeadline ¶
Deadline returns max execution time for a Processor
type DataCorruption ¶
type DataCorruption struct {
// contains filtered or unexported fields
}
DataCorruption represents corruption error
type Fields ¶
type Fields struct {
Sort
// contains filtered or unexported fields
}
Fields
type Handler ¶
type Handler struct {
// contains filtered or unexported fields
}
Handler represents custom processor handler, that allows creating a process per request URL
func NewHandler ¶
Handler create a custom handler processor (a dedicated process can be created based on processor.Request)
type HandlerReporter ¶
HandlerReporter creates a handler reporter
type PartialRetry ¶
type PartialRetry struct {
// contains filtered or unexported fields
}
PartialRetry partial retry error allows to write only partial data back to retry stream
type PostProcessor ¶
PostProcessor is an optional preprocessor interface
type PreProcessor ¶
type PreProcessor interface {
Pre(ctx context.Context, reporter Reporter) (context.Context, error)
}
PreProcessor is an optional preprocessor interface
type Processor ¶
type Processor interface {
Process(ctx context.Context, data interface{}, reporter Reporter) error
}
Processor represents data processor
type Reporter ¶
type Reporter interface {
//BaseResponse returns base response
BaseResponse() *Response
}
Reporter represents interfcae providing processor response
func NewHandlerReporter ¶
NewHandlerReporter represents URL reporeter
type Request ¶
type Request struct {
io.ReadCloser
SourceType string
io.ReaderAt
RowType reflect.Type
Attrs map[string]interface{}
StartTime time.Time
SourceURL string //incoming original filename url
}
Request represents a processing request
func NewRequest ¶
NewRequest create a processing request
func (*Request) Retry ¶
Retry extracts number of retry from URL . It looks after two consecutive digits eg: s3://bucket/prefix/filename-retry05.csv would extract number 5
func (*Request) TransformSourceURL ¶
TransformSourceURL returns baseURL + sourceURL path
type Response ¶
type Response struct {
Status string
Errors []string `json:",omitempty"`
StartTime time.Time
RuntimeMs int
SourceURL string `json:",omitempty"`
Destination *config.Stream
RetryURL string `json:"-"` // destination for the data to be replayed
CorruptionURL string `json:"-"`
Processed int32 `json:",omitempty"`
RetryErrors int32 `json:",omitempty"`
CorruptionErrors int32 `json:",omitempty"`
RetriableErrors int32 `json:",omitempty"`
Loaded int32 `json:",omitempty"`
LoadTimeouts int32 `json:",omitempty"`
Batched int32 `json:",omitempty"`
Skipped int32 `json:",omitempty"`
// contains filtered or unexported fields
}
Response represents base processing response
type Service ¶
type Service struct {
Config *Config
Metrics *gmetric.Service
Processor
// contains filtered or unexported fields
}
Service represents processing service
func New ¶
func New(config *Config, fs afs.Service, processor Processor, reporterProvider func() Reporter) *Service
New creates data processing service
func NewWithMetrics ¶
func NewWithMetrics(config *Config, fs afs.Service, processor Processor, reporterProvider func() Reporter, metrics *gmetric.Service) *Service
NewWithMetrics creates data processing service
func (*Service) Do ¶
Do starts service processing
Example ¶
package main
import (
"context"
"encoding/json"
"fmt"
"github.com/viant/afs"
"github.com/viant/cloudless/data/processor"
"github.com/viant/toolbox"
"strings"
"sync/atomic"
)
type sumKeyType string
const sumKey = sumKeyType("sum")
// SumProcessor represents sum processor
type SumProcessor struct{}
// Process sums comma separated numbers
func (p SumProcessor) Process(ctx context.Context, data interface{}, reporter processor.Reporter) error {
tmpData := data.([]byte)
if len(tmpData) == 0 {
return nil
}
value := ctx.Value(sumKey)
sum := value.(*int64)
aNumber, err := toolbox.ToInt(string(tmpData))
if err != nil {
return processor.NewDataCorruption(fmt.Sprintf("invalid number: %s, %v", tmpData, err))
}
atomic.AddInt64(sum, int64(aNumber))
return nil
}
func main() {
service := processor.New(&processor.Config{
CorruptionURL: "mem://localhost/corrupted",
RetryURL: "mem://localhost/retry",
FailedURL: "mem://localhost/failed",
}, afs.New(), &SumProcessor{}, processor.NewReporter)
sum := int64(0)
ctx := context.WithValue(context.Background(), sumKey, &sum)
reporter := service.Do(ctx, processor.NewRequest(strings.NewReader("1\n2\n3\n5\nasd\n373\n23"),
nil,
"mem://localhost/response/numbers.txt"))
fmt.Printf("Sum: %v\n", sum)
//Prints sum 407
response, _ := json.Marshal(reporter)
fmt.Printf("%s\n", response)
/* Prints
{
"CorruptionErrors": 1,
"CorruptionURL": "mem://localhost/corrupted/response/numbers.txt",
"Errors": [
"invalid number: asd, strconv.ParseInt: parsing \"asd\": invalid syntax"
],
"Loaded": 7,
"Processed": 6,
"RetriableErrors": 0,
"RetryErrors": 0,
"RetryURL": "mem://localhost/retry/response/numbers-retry01.txt",
"RuntimeMs": 1,
"Status": "ok"
}
*/
}
func (*Service) StartMetricsEndpoint ¶
func (s *Service) StartMetricsEndpoint()
type Sortables ¶
Sortables represent sortable items
