etl

package
v1.3.30 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 21, 2025 License: MIT Imports: 40 Imported by: 1

README

ETL package

The etl package compiles into aisnode executable to facilitate running custom ETL containers and communicating with those containers at runtime.

AIStore supports both on the fly (aka online) and offline user-defined dataset transformations. All the respective I/O intensive (and expensive) operation is confined to the storage cluster, with computing clients retaining all their resources to execute computation over transformed, filtered, and sorted data.

Popular use cases include - but are not limited to - dataset augmentation (of any kind) and filtering of AI datasets.

Please refer to ETL readme for the prerequisites, 4 supported ais <=> container communication mechanisms, and usage examples.

ETL readme also contains an overview of the architecture, important technical details, and further guidance.

Architecture

AIS-ETL extension is designed to maximize the effectiveness of the transformation process. In particular, AIS-ETL optimizes-out the entire networking operation that would otherwise be required to move pre-transformed data between storage and compute nodes.

Based on the specification provided by a user, each target starts its own ETL container (worker) - one ETL container per each storage target in the cluster. From now this "local" ETL container will be responsible for transforming objects stored on "its" AIS target. This approach allows us to run custom transformations close to data. This approach also ensures performance and scalability of the transformation workloads - the scalability that for all intents and purposes must be considered practically unlimited.

The following figure illustrates a cluster of 3 AIS proxies (gateways) and 4 storage targets, with each target running user-defined ETL in parallel:

ETL architecture

Management and Benchmarking

  • AIS CLI includes commands to start, stop, and monitor ETL at runtime.
  • AIS Loader has been extended to benchmark and stress test AIS clusters by running a number of pre-defined transformations that we include with the source code.

For more information and details, please refer to ETL readme.

Documentation

Overview

Package etl provides utilities to initialize and use transformation pods.

  • Copyright (c) 2018-2025, NVIDIA CORPORATION. All rights reserved.

Package etl provides utilities to initialize and use transformation pods.

  • Copyright (c) 2018-2025, NVIDIA CORPORATION. All rights reserved.

Package etl provides utilities to initialize and use transformation pods.

  • Copyright (c) 2018-2025, NVIDIA CORPORATION. All rights reserved.

Package etl provides utilities to initialize and use transformation pods.

  • Copyright (c) 2018-2025, NVIDIA CORPORATION. All rights reserved.

Package etl provides utilities to initialize and use transformation pods.

  • Copyright (c) 2021-2025, NVIDIA CORPORATION. All rights reserved.

Package etl provides utilities to initialize and use transformation pods.

  • Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.

Package etl provides utilities to initialize and use transformation pods.

  • Copyright (c) 2018-2025, NVIDIA CORPORATION. All rights reserved.

Package etl provides utilities to initialize and use transformation pods.

  • Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.

Package etl provides utilities to initialize and use transformation pods.

  • Copyright (c) 2018-2025, NVIDIA CORPORATION. All rights reserved.

Index

Constants

View Source
const (
	// init message types
	SpecType    = "spec"
	CodeType    = "code"
	ETLSpecType = "etl-spec"

	// common fields
	Name              = "name"
	CommunicationType = "communication_type"
	ArgType           = "arg_type"
	DirectPut         = "direct_put"

	// `InitSpecMsg` fields
	Spec = "spec"

	// `ETLSpecMsg` fields
	Runtime = "runtime"
	Image   = "image"
	Command = "command"
	Env     = "env"

	// consts for unmarshalling ETL details
	InitMsgType = "init_msg"
	ObjErrsType = "obj_errors"
)
View Source
const (
	CommTypeAnnotation         = "communication_type" // communication type to use if not explicitly set in the init message
	SupportDirectPutAnnotation = "support_direct_put" // indicates whether the ETL supports direct PUT; affects how the target interacts with it
	WaitTimeoutAnnotation      = "wait_timeout"       // timeout duration to wait for the ETL pod to become ready
)
View Source
const (
	DefaultInitTimeout   = 45 * time.Second
	DefaultObjTimeout    = 10 * time.Second
	DefaultAbortTimeout  = 2 * time.Second
	DefaultContainerPort = 8000
)
View Source
const (
	// ETL container receives POST request from target with the data. It
	// must read the data and return response to the target which then will be
	// transferred to the client.
	Hpush = "hpush://"
	// Target redirects the GET request to the ETL container. Then ETL container
	// contacts the target via `AIS_TARGET_URL` env variable to get the data.
	// The data is then transformed and returned to the client.
	Hpull = "hpull://"
	// Similar to redirection strategy but with usage of reverse proxy.
	HpushStdin = "io://"
	// WebSocket communication.
	WebSocket = "ws://"
)

enum communication types (`commTypes`)

View Source
const (
	ArgTypeDefault = ""
	ArgTypeURL     = "url"
	ArgTypeFQN     = "fqn"
)

enum arg types (`argTypes`)

View Source
const CommTypeSeparator = "://"

consistent with rfc2396.txt "Uniform Resource Identifiers (URI): Generic Syntax"

View Source
const MaxObjErr = 128
View Source
const PrefixXactID = "etl-"

Variables

This section is empty.

Functions

func Delete added in v1.3.28

func Delete(etlName string) error

func GetOfflineTransform added in v1.3.28

func GetOfflineTransform(etlName string, xctn core.Xact) (getROC core.GetROC, xetl *XactETL, session Session, err error)

func PodHealth

func PodHealth(etlName string) (string, error)

func Stop

func Stop(etlName string, errCause error) (err error)

three cases to call Stop() 1. user's DELETE requests 2. initialization failed 3. transaction/xaction abort (StopByXid)

func StopAll

func StopAll()

StopAll terminates all running ETLs.

func StopByXid added in v1.3.28

func StopByXid(xid string, errCause error) error

func Tinit added in v1.3.30

func Tinit()

func ValidateSecret added in v1.3.28

func ValidateSecret(etlName, secret string) error

Types

type CPUMemByTarget

type CPUMemByTarget []*CPUMemUsed

type CPUMemUsed

type CPUMemUsed struct {
	TargetID string  `json:"target_id"`
	CPU      float64 `json:"cpu"`
	Mem      int64   `json:"mem"`
}

func PodMetrics

func PodMetrics(etlName string) (*CPUMemUsed, error)

type CommStats

type CommStats interface {
	ObjCount() int64
	InBytes() int64
	OutBytes() int64
}

type Communicator

type Communicator interface {
	ETLName() string

	String() string

	GetSecret() string
	Xact() *XactETL // underlying `apc.ActETLInline` xaction (see xact/xs/etl.go)
	CommStats       // only stats for `apc.ActETLInline` inline transform

	// InlineTransform uses one of the two ETL container endpoints:
	//  - Method "PUT", Path "/"
	//  - Method "GET", Path "/bucket/object"
	//  - Returns:
	//    - size: the size of transformed object
	//    - ecode: error code
	//    - err: error encountered during transformation
	InlineTransform(w http.ResponseWriter, r *http.Request, lom *core.LOM, latestVer bool, targs string) (size int64, ecode int, err error)
	// contains filtered or unexported methods
}

Communicator is responsible for managing communications with local ETL pod.

func GetCommunicator

func GetCommunicator(etlName string) (Communicator, error)

GetCommunicator retrieves the Communicator from registry by etl name Returns an error if not found or not in the Running stage.

type Details added in v1.3.30

type Details struct {
	InitMsg InitMsg  `json:"init_msg"`
	ObjErrs []ObjErr `json:"obj_errors,omitempty"`
}

type ETLEntity added in v1.3.30

type ETLEntity struct {
	InitMsg InitMsg `json:"init_msg"`
	Stage   Stage   `json:"stage"`
	PodMap  PodMap  `json:"pod_map"`
}

ETLEntity represents an ETL instance managed by an individual target. - Created and added to the manager before entering the `Initializing` stage. - Removed only after the user explicitly deletes it from the `Aborted` stage.

Expected state transitions: - `Initializing`: Set up resources in the following order:

  1. Create (or reuse) communicator and pod watcher
  2. Start (or renew) xaction
  3. Create Kubernetes resources (pod/service)

- `Running`: All resources are active, handling inline and offline transform requests via the communicator. - `Aborted`: Kubernetes resources (pod/service) are cleaned up.

type ETLSpecMsg added in v1.3.30

type ETLSpecMsg struct {
	InitMsgBase `yaml:",inline"` // included all optional fields from InitMsgBase
	Runtime     RuntimeSpec      `json:"runtime" yaml:"runtime"`
}

ETLSpecMsg is a YAML representation of the ETL pod spec.

func (*ETLSpecMsg) FormatEnv added in v1.3.30

func (e *ETLSpecMsg) FormatEnv() string

func (*ETLSpecMsg) MsgType added in v1.3.30

func (*ETLSpecMsg) MsgType() string

func (*ETLSpecMsg) ParsePodSpec added in v1.3.30

func (e *ETLSpecMsg) ParsePodSpec() (*corev1.Pod, error)

func (*ETLSpecMsg) String added in v1.3.30

func (e *ETLSpecMsg) String() string

func (*ETLSpecMsg) Validate added in v1.3.30

func (e *ETLSpecMsg) Validate() error

type ETLs

type ETLs map[string]ETLEntity

type HealthByTarget

type HealthByTarget []*HealthStatus

type HealthStatus

type HealthStatus struct {
	TargetID string `json:"target_id"`
	Status   string `json:"health_status"` // enum { HealthStatusRunning, ... } above
}

type Info

type Info struct {
	Name     string `json:"id"`
	Stage    string `json:"stage"`
	XactID   string `json:"xaction_id"`
	ObjCount int64  `json:"obj_count"`
	InBytes  int64  `json:"in_bytes"`
	OutBytes int64  `json:"out_bytes"`
}

func List

func List() []Info

type InfoList

type InfoList []Info

func (*InfoList) Append added in v1.3.28

func (il *InfoList) Append(i Info)

func (InfoList) Len

func (il InfoList) Len() int

func (InfoList) Less

func (il InfoList) Less(i, j int) bool

func (InfoList) Swap

func (il InfoList) Swap(i, j int)

type InitMsg

type InitMsg interface {
	Name() string
	Cname() string
	MsgType() string // Code or Spec
	CommType() string
	ArgType() string
	Validate() error
	IsDirectPut() bool
	ParsePodSpec() (*corev1.Pod, error)
	Timeouts() (initTimeout, objTimeout cos.Duration)
	GetEnv() []corev1.EnvVar
	String() string
}

func GetInitMsg added in v1.3.28

func GetInitMsg(etlName string) (InitMsg, error)

func UnmarshalInitMsg

func UnmarshalInitMsg(b []byte) (InitMsg, error)

type InitMsgBase

type InitMsgBase struct {
	EtlName          string          `json:"name" yaml:"name"`
	CommTypeX        string          `json:"communication" yaml:"communication"`
	ArgTypeX         string          `json:"argument" yaml:"argument"`
	InitTimeout      cos.Duration    `json:"init_timeout,omitempty" yaml:"init_timeout,omitempty"`
	ObjTimeout       cos.Duration    `json:"obj_timeout,omitempty" yaml:"obj_timeout,omitempty"`
	SupportDirectPut bool            `json:"support_direct_put,omitempty" yaml:"support_direct_put,omitempty"`
	Env              []corev1.EnvVar `json:"env,omitempty" yaml:"env,omitempty"`
}

and implementations

func (*InitMsgBase) ArgType added in v1.3.19

func (m *InitMsgBase) ArgType() string

func (*InitMsgBase) Cname added in v1.3.30

func (m *InitMsgBase) Cname() string

func (*InitMsgBase) CommType

func (m *InitMsgBase) CommType() string

func (*InitMsgBase) GetEnv added in v1.3.30

func (m *InitMsgBase) GetEnv() []corev1.EnvVar

func (*InitMsgBase) IsDirectPut added in v1.3.28

func (m *InitMsgBase) IsDirectPut() bool

func (*InitMsgBase) Name

func (m *InitMsgBase) Name() string

func (*InitMsgBase) Timeouts added in v1.3.30

func (m *InitMsgBase) Timeouts() (initTimeout, objTimeout cos.Duration)

func (*InitMsgBase) Validate added in v1.3.30

func (m *InitMsgBase) Validate(detail string) error

type InitSpecMsg

type InitSpecMsg struct {
	Spec        []byte `json:"spec"`
	InitMsgBase `yaml:",inline"`
}

func (*InitSpecMsg) MsgType added in v1.3.19

func (*InitSpecMsg) MsgType() string

func (*InitSpecMsg) ParsePodSpec added in v1.3.29

func (m *InitSpecMsg) ParsePodSpec() (*corev1.Pod, error)

ParsePodSpec parses `m.Spec` into a Kubernetes Pod object.

func (*InitSpecMsg) String

func (m *InitSpecMsg) String() string

func (*InitSpecMsg) Validate

func (m *InitSpecMsg) Validate() error

type Logs

type Logs struct {
	TargetID string `json:"target_id"`
	Logs     []byte `json:"logs"`
}

func PodLogs

func PodLogs(etlName string) (logs Logs, err error)

type LogsByTarget

type LogsByTarget []Logs

type MD

type MD struct {
	Ext     any
	ETLs    ETLs
	Version int64
}

ETL metadata

func (*MD) Add

func (e *MD) Add(msg InitMsg, stage Stage, podMap PodMap) error

func (*MD) Get

func (e *MD) Get(id string) (en ETLEntity, present bool)

func (*MD) Init

func (e *MD) Init(l int)

func (*MD) JspOpts

func (*MD) JspOpts() jsp.Options

func (*MD) MarshalJSON

func (e *MD) MarshalJSON() ([]byte, error)

func (*MD) String

func (e *MD) String() string

func (*MD) UnmarshalJSON

func (e *MD) UnmarshalJSON(data []byte) (err error)

type ObjErr added in v1.3.30

type ObjErr struct {
	ObjName string `json:"obj_name"` // object name
	Message string `json:"msg"`      // error message
	Ecode   int    `json:"ecode"`    // error code
}

func (ObjErr) Error added in v1.3.30

func (eo ObjErr) Error() string

type ObjErrs added in v1.3.30

type ObjErrs []ObjErr

type PodInfo added in v1.3.30

type PodInfo struct {
	URI     string `json:"uri"`      // ETL pod URI
	PodName string `json:"pod_name"` // ETL pod name
	SvcName string `json:"svc_name"` // ETL service name
}

func Init added in v1.3.30

func Init(msg InitMsg, xid, secret string) (core.Xact, PodInfo, error)

(common for both `InitSpec` and `ETLSpec` flows)

type PodMap added in v1.3.30

type PodMap map[string]PodInfo // target ID to ETL pod info

used by 2PC initialization

type RuntimeSpec added in v1.3.29

type RuntimeSpec struct {
	Image   string          `json:"image" yaml:"image"`
	Command []string        `json:"command,omitempty" yaml:"command,omitempty"`
	Env     []corev1.EnvVar `json:"env,omitempty" yaml:"env,omitempty"`
}

type Session added in v1.3.28

type Session interface {
	// Finish cleans up the job's communication channel, and aborts the undergoing xaction (`TCB`/`TCO`) if errCause is provided
	Finish(errCause error) error
	OfflineWrite(lom *core.LOM, latestVer, sync bool, writer io.WriteCloser, gargs *core.GetROCArgs) (written int64, ecode int, err error)
	// contains filtered or unexported methods
}

Session represents a per-xaction communication context created by the statefulCommunicator.

type Stage added in v1.3.28

type Stage int

enum ETL lifecycle status (see docs/etl.md#etl-pod-lifecycle for details)

const (
	Unknown Stage = iota
	Initializing
	Running
	Aborted
)

func (Stage) String added in v1.3.28

func (s Stage) String() string

type WebsocketCtrlMsg added in v1.3.28

type WebsocketCtrlMsg struct {
	Daddr string `json:"dst_addr,omitempty"`
	Targs string `json:"etl_args,omitempty"`
	FQN   string `json:"fqn,omitempty"`
	Path  string `json:"path,omitempty"`
}

type XactETL added in v1.3.30

type XactETL struct {
	InlineObjErrs cos.Errs
	Vlabs         map[string]string

	xact.Base
	// contains filtered or unexported fields
}

represents `apc.ActETLInline` kind of xaction (`apc.ActETLBck`/`apc.ActETLObject` kinds are managed by tcb/tcobjs) responsible for triggering global abort on error to ensure all related ETL resources are cleaned up across all targets.

func (*XactETL) Abort added in v1.3.30

func (r *XactETL) Abort(err error) (aborted bool)

func (*XactETL) AddObjErr added in v1.3.30

func (r *XactETL) AddObjErr(xid string, err *ObjErr)

func (*XactETL) GetObjErrs added in v1.3.30

func (r *XactETL) GetObjErrs(xid string) []error

func (*XactETL) Run added in v1.3.30

func (*XactETL) Run(*sync.WaitGroup)

func (*XactETL) Snap added in v1.3.30

func (r *XactETL) Snap() (snap *core.Snap)

Directories

Path Synopsis
Package webserver provides a framework to impelemnt etl transformation webserver in golang.
Package webserver provides a framework to impelemnt etl transformation webserver in golang.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL