Documentation
¶
Overview ¶
Package processor defines abstraction for serverless concurrent processing
Index ¶
Examples ¶
Constants ¶
const ( //StatusOk represents successful processing status StatusOk = "ok" //StatusError represents error processing status StatusError = "error" )
Variables ¶
This section is empty.
Functions ¶
func NewDataCorruption ¶
NewDataCorruption returns data corruption error
Types ¶
type BaseReporter ¶
type BaseReporter struct {
*Response
// contains filtered or unexported fields
}
func (*BaseReporter) BaseResponse ¶
func (r *BaseReporter) BaseResponse() *Response
Response returns base response info
Example ¶
package main
import (
"context"
"encoding/json"
"fmt"
"github.com/viant/afs"
"github.com/viant/cloudless/data/processor"
"net/http"
"strings"
"sync"
)
// URLReporter represents URL reporter
type URLReporter struct {
processor.BaseReporter
ByResponseCode map[int]int
mutex sync.Mutex
}
// NewURLReporter represents URL reporeter
func NewURLReporter() processor.Reporter {
return &URLReporter{
ByResponseCode: make(map[int]int),
BaseReporter: processor.BaseReporter{
Response: &processor.Response{Status: processor.StatusOk},
},
}
}
type HTTPProcessor struct {
BaseURL string
}
func (p HTTPProcessor) Process(ctx context.Context, data []byte, reporter processor.Reporter) error {
urlReporter := reporter.(*URLReporter)
URL := p.BaseURL + string(data)
request, err := http.NewRequestWithContext(ctx, http.MethodGet, URL, nil)
if err != nil {
return processor.NewDataCorruption(fmt.Sprintf("invalid request: %v", URL))
}
response, err := http.DefaultClient.Do(request)
if err != nil {
return err
}
urlReporter.mutex.Lock()
defer urlReporter.mutex.Unlock()
urlReporter.ByResponseCode[response.StatusCode]++
return nil
}
func main() {
ctx := context.Background()
service := processor.New(&processor.Config{
CorruptionURL: "mem://localhost/corrupted",
RetryURL: "mem://localhost/retry",
FailedURL: "mem://localhost/failed",
}, afs.New(), &HTTPProcessor{BaseURL: "http://mydataexporter/enrich/?data="}, NewURLReporter)
reporter := service.Do(ctx, processor.NewRequest(strings.NewReader("dGVzdCBpcyB0ZXN0\nYW5vdGhlciBvbmU="),
nil,
"mem://localhost/trigger/data.txt"))
response, _ := json.Marshal(reporter)
fmt.Printf("%s\n", response)
}
type Config ¶
type Config struct {
DeadlineReductionMs int // Deadline typically comes from Lambda ctx. Max exec time == Deadline - DeadlineReductionMs
LoaderDeadlineLagMs int // Loader will finish earlier than workers to let the latter complete
MaxRetries int
Concurrency int
DestinationURL string // Service processing data destination URL. This is a template, e.g. $gs://$mybucket/$prefix/$a.dat
DestinationCodec string
RetryURL string // destination for the data to be retried
FailedURL string // destination for the data that has failed max retires
CorruptionURL string /// destination for the corrupted data
MaxExecTimeMs int // default execution timeMs used when context does not come with deadline
}
Config represents processor configuration
func (Config) ExpandDestinationURL ¶
func (Config) LoaderDeadline ¶
Deadline returns max execution time for a Processor
type DataCorruption ¶
type DataCorruption struct {
// contains filtered or unexported fields
}
DataCorruption represents corruption error
type PostProcessor ¶
PostProcessor is an optional preprocessor interface
type PreProcessor ¶
type PreProcessor interface {
Pre(ctx context.Context, reporter Reporter) (context.Context, error)
}
PreProcessor is an optional preprocessor interface
type Reporter ¶
type Reporter interface {
//BaseResponse returns base response
BaseResponse() *Response
}
Reporter represents interfcae providing processor response
type Request ¶
type Request struct {
io.ReadCloser
Attrs map[string]interface{}
StartTime time.Time
SourceURL string //incoming original filename url
}
Request represents a processing request
func NewRequest ¶
NewRequest create a processing request
func (*Request) Retry ¶
Retry extracts number of retry from URL . It looks after two consecutive digits eg: s3://bucket/prefix/filename-retry05.csv would extract number 5
func (*Request) TransformSourceURL ¶
TransformSourceURL returns baseURL + sourceURL path
type Response ¶
type Response struct {
Status string
Errors []string `json:",omitempty"`
RuntimeMs int
DestinationURL string `json:",omitempty"` // Service processing data destination URL. This is a template, e.g. $gs://$mybucket/$prefix/$a.dat
DestinationCodec string `json:",omitempty"` //optional compression codec (i.e gzip)
RetryURL string `json:",omitempty"` // destination for the data to be replayed
CorruptionURL string `json:",omitempty"`
Processed int32 `json:",omitempty"`
RetryErrors int32 `json:",omitempty"`
CorruptionErrors int32 `json:",omitempty"`
ProcessingErrors int32 `json:",omitempty"`
Loaded int32 `json:",omitempty"`
// contains filtered or unexported fields
}
Response represents base processing response
type Service ¶
type Service struct {
Processor
// contains filtered or unexported fields
}
Service represents processing service
func New ¶
func New(config *Config, fs afs.Service, processor Processor, reporterProvider func() Reporter) *Service
New creates a processing service
func (*Service) Do ¶
Do starts service processing
Example ¶
package main
import (
"context"
"encoding/json"
"fmt"
"github.com/viant/afs"
"github.com/viant/cloudless/data/processor"
"github.com/viant/toolbox"
"strings"
"sync/atomic"
)
type sumKeyType string
const sumKey = sumKeyType("sum")
// SumProcessor represents sum processor
type SumProcessor struct{}
// Process sums coma separated numbers
func (p SumProcessor) Process(ctx context.Context, data []byte, reporter processor.Reporter) error {
if len(data) == 0 {
return nil
}
value := ctx.Value(sumKey)
sum := value.(*int64)
aNumber, err := toolbox.ToInt(string(data))
if err != nil {
return processor.NewDataCorruption(fmt.Sprintf("invalid number: %s, %v", data, err))
}
atomic.AddInt64(sum, int64(aNumber))
return nil
}
func main() {
service := processor.New(&processor.Config{
CorruptionURL: "mem://localhost/corrupted",
RetryURL: "mem://localhost/retry",
FailedURL: "mem://localhost/failed",
}, afs.New(), &SumProcessor{}, processor.NewReporter)
sum := int64(0)
ctx := context.WithValue(context.Background(), sumKey, &sum)
reporter := service.Do(ctx, processor.NewRequest(strings.NewReader("1\n2\n3\n5\nasd\n373\n23"),
nil,
"mem://localhost/response/numbers.txt"))
fmt.Printf("Sum: %v\n", sum)
//Prints sum 407
response, _ := json.Marshal(reporter)
fmt.Printf("%s\n", response)
/* Prints
{
"CorruptionErrors": 1,
"CorruptionURL": "mem://localhost/corrupted/response/numbers.txt",
"Errors": [
"invalid number: asd, strconv.ParseInt: parsing \"asd\": invalid syntax"
],
"Loaded": 7,
"Processed": 6,
"ProcessingErrors": 0,
"RetryErrors": 0,
"RetryURL": "mem://localhost/retry/response/numbers-retry01.txt",
"RuntimeMs": 1,
"Status": "ok"
}
*/
}
