Documentation
¶
Overview ¶
Package etl provides utilities to initialize and use transformation pods.
- Copyright (c) 2018-2025, NVIDIA CORPORATION. All rights reserved.
Package etl provides utilities to initialize and use transformation pods.
- Copyright (c) 2018-2025, NVIDIA CORPORATION. All rights reserved.
Package etl provides utilities to initialize and use transformation pods.
- Copyright (c) 2018-2025, NVIDIA CORPORATION. All rights reserved.
Package etl provides utilities to initialize and use transformation pods.
- Copyright (c) 2021-2025, NVIDIA CORPORATION. All rights reserved.
Package etl provides utilities to initialize and use transformation pods.
- Copyright (c) 2018-2025, NVIDIA CORPORATION. All rights reserved.
Package etl provides utilities to initialize and use transformation pods.
- Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.
Package etl provides utilities to initialize and use transformation pods.
- Copyright (c) 2018-2025, NVIDIA CORPORATION. All rights reserved.
Package etl provides utilities to initialize and use transformation pods.
- Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.
Index ¶
- Constants
- func Delete(etlName string) error
- func InitCode(msg *InitCodeMsg, xid, secret string) (core.Xact, error)
- func InitSpec(msg *InitSpecMsg, xid, secret string, opts StartOpts) (core.Xact, error)
- func ParsePodSpec(errCtx *cmn.ETLErrCtx, spec []byte) (*corev1.Pod, error)
- func PodHealth(etlName string) (string, error)
- func Stop(etlName string, errCause error) (err error)
- func StopAll()
- func StopByXid(xid string, errCause error) error
- func ValidateSecret(etlName, secret string) error
- type Aborter
- type CPUMemByTarget
- type CPUMemUsed
- type CommStats
- type Communicator
- type ETLs
- type HealthByTarget
- type HealthStatus
- type Info
- type InfoList
- type InitCodeMsg
- type InitMsg
- type InitMsgBase
- type InitSpecMsg
- type Logs
- type LogsByTarget
- type MD
- func (e *MD) Add(msg InitMsg)
- func (e *MD) Del(id string) (deleted bool)
- func (e *MD) Get(id string) (msg InitMsg, present bool)
- func (e *MD) Init(l int)
- func (*MD) JspOpts() jsp.Options
- func (e *MD) MarshalJSON() ([]byte, error)
- func (e *MD) String() string
- func (e *MD) UnmarshalJSON(data []byte) (err error)
- type Session
- type Stage
- type StartOpts
- type WebsocketCtrlMsg
Constants ¶
const ( Spec = "spec" Code = "code" // additional environment variables to set in ETL container ArgType = "ARG_TYPE" DirectPut = "DIRECT_PUT" )
const ( CommTypeAnnotation = "communication_type" // communication type to use if not explicitly set in the init message SupportDirectPutAnnotation = "support_direct_put" // indicates whether the ETL supports direct PUT; affects how the target interacts with it WaitTimeoutAnnotation = "wait_timeout" // timeout duration to wait for the ETL pod to become ready )
const ( DefaultTimeout = 45 * time.Second DefaultReqTimeout = 10 * time.Second )
const ( // ETL container receives POST request from target with the data. It // must read the data and return response to the target which then will be // transferred to the client. Hpush = "hpush://" // Target redirects the GET request to the ETL container. Then ETL container // contacts the target via `AIS_TARGET_URL` env variable to get the data. // The data is then transformed and returned to the client. Hpull = "hpull://" // Similar to redirection strategy but with usage of reverse proxy. HpushStdin = "io://" // WebSocket communication. WebSocket = "ws://" )
enum communication types (`commTypes`)
const ( ArgTypeDefault = "" ArgTypeURL = "url" ArgTypeFQN = "fqn" )
enum arg types (`argTypes`)
const CommTypeSeparator = "://"
consistent with rfc2396.txt "Uniform Resource Identifiers (URI): Generic Syntax"
const PrefixXactID = "etl-"
Variables ¶
This section is empty.
Functions ¶
func InitCode ¶
func InitCode(msg *InitCodeMsg, xid, secret string) (core.Xact, error)
Given user message `InitCodeMsg`: - make the corresponding assorted substitutions in the etl/runtime/podspec.yaml spec, and - execute `InitSpec` with the modified podspec See also: etl/runtime/podspec.yaml
func ValidateSecret ¶ added in v1.3.28
Types ¶
type Aborter ¶
type Aborter struct {
// contains filtered or unexported fields
}
Aborter listens to smap changes and aborts the ETL on the target when there is any change in targets membership. Aborter should be registered on ETL init. It is unregistered by Stop function. The is no synchronization between aborters on different targets. It is assumed that if one target received smap with changed targets membership, eventually each of the targets will receive it as well. Hence, all ETL containers will be stopped.
func (*Aborter) ListenSmapChanged ¶
func (e *Aborter) ListenSmapChanged()
type CPUMemByTarget ¶
type CPUMemByTarget []*CPUMemUsed
type CPUMemUsed ¶
type CPUMemUsed struct {
TargetID string `json:"target_id"`
CPU float64 `json:"cpu"`
Mem int64 `json:"mem"`
}
func PodMetrics ¶
func PodMetrics(etlName string) (*CPUMemUsed, error)
type Communicator ¶
type Communicator interface {
meta.Slistener
ETLName() string
PodName() string
SvcName() string
String() string
SetupConnection() error
Stop()
Restart(boot *etlBootstrapper)
GetPodWatcher() *podWatcher
GetSecret() string
Xact() core.Xact // underlying `apc.ActETLInline` xaction (see xact/xs/etl.go)
CommStats // only stats for `apc.ActETLInline` inline transform
// InlineTransform uses one of the two ETL container endpoints:
// - Method "PUT", Path "/"
// - Method "GET", Path "/bucket/object"
InlineTransform(w http.ResponseWriter, r *http.Request, lom *core.LOM, latestVer bool, targs string) (int, error)
// contains filtered or unexported methods
}
Communicator is responsible for managing communications with local ETL pod. It listens to cluster membership changes and terminates ETL pod, if need be.
func GetCommunicator ¶
func GetCommunicator(etlName string) (Communicator, error)
GetCommunicator retrieves the Communicator from registry by etl name Returns an error if not found or not in the Running stage.
type HealthByTarget ¶
type HealthByTarget []*HealthStatus
type HealthStatus ¶
type Info ¶
type InitCodeMsg ¶
type InitCodeMsg struct {
Runtime string `json:"runtime"`
Funcs struct {
Transform string `json:"transform"`
}
Code []byte `json:"code"` // cannot be omitted
Deps []byte `json:"dependencies"`
InitMsgBase
ChunkSize int64 `json:"chunk_size"`
Flags int64 `json:"flags"`
}
======================================================================================== InitCodeMsg carries the name of the transforming function; the `Transform` function is mandatory and cannot be "" (empty) - it _will_ be called by the `Runtime` container (see etl/runtime/all.go for all supported pre-built runtimes); ChunkSize:
0 (zero) - read the entire payload in memory and then transform it in one shot; > 0 - use chunk-size buffering and transform incrementally, one chunk at a time
Flags:
bitwise flags: (streaming | debug | strict | ...) future enhancements
=========================================================================================
func (*InitCodeMsg) MsgType ¶ added in v1.3.19
func (*InitCodeMsg) MsgType() string
func (*InitCodeMsg) String ¶
func (m *InitCodeMsg) String() string
func (*InitCodeMsg) Validate ¶
func (m *InitCodeMsg) Validate() error
type InitMsg ¶
type InitMsg interface {
Name() string
MsgType() string // Code or Spec
CommType() string
ArgType() string
Validate() error
IsDirectPut() bool
String() string
}
func GetInitMsg ¶ added in v1.3.28
func UnmarshalInitMsg ¶
type InitMsgBase ¶
type InitMsgBase struct {
EtlName string `json:"id"`
CommTypeX string `json:"communication"` // enum commTypes
ArgTypeX string `json:"argument"` // enum argTypes
Timeout cos.Duration `json:"timeout"`
SupportDirectPut bool `json:"support_direct_put"`
}
and implementations
func (InitMsgBase) ArgType ¶ added in v1.3.19
func (m InitMsgBase) ArgType() string
func (InitMsgBase) CommType ¶
func (m InitMsgBase) CommType() string
func (InitMsgBase) IsDirectPut ¶ added in v1.3.28
func (m InitMsgBase) IsDirectPut() bool
func (InitMsgBase) Name ¶
func (m InitMsgBase) Name() string
type InitSpecMsg ¶
type InitSpecMsg struct {
Spec []byte `json:"spec"`
InitMsgBase
}
func (*InitSpecMsg) MsgType ¶ added in v1.3.19
func (*InitSpecMsg) MsgType() string
func (*InitSpecMsg) String ¶
func (m *InitSpecMsg) String() string
func (*InitSpecMsg) Validate ¶
func (m *InitSpecMsg) Validate() error
type LogsByTarget ¶
type LogsByTarget []Logs
type Session ¶ added in v1.3.28
type Session interface {
// OfflineTransform is an instance of `core.GetROC` function, which is driven by `TCB` and `TCO` to provide offline transformation
OfflineTransform(lom *core.LOM, latestVer, sync bool, gargs *core.GetROCArgs) core.ReadResp
// Finish cleans up the job's communication channel, and aborts the undergoing xaction (`TCB`/`TCO`) if errCause is provided
Finish(errCause error) error
}
Session represents a per-xaction communication context created by the statefulCommunicator.
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
Package runtime provides skeletons and static specifications for building ETL from scratch.
|
Package runtime provides skeletons and static specifications for building ETL from scratch. |
|
Package webserver provides a framework to impelemnt etl transformation webserver in golang.
|
Package webserver provides a framework to impelemnt etl transformation webserver in golang. |