Documentation
¶
Overview ¶
Package etl provides utilities to initialize and use transformation pods.
- Copyright (c) 2018-2025, NVIDIA CORPORATION. All rights reserved.
Package etl provides utilities to initialize and use transformation pods.
- Copyright (c) 2018-2025, NVIDIA CORPORATION. All rights reserved.
Package etl provides utilities to initialize and use transformation pods.
- Copyright (c) 2018-2025, NVIDIA CORPORATION. All rights reserved.
Package etl provides utilities to initialize and use transformation pods.
- Copyright (c) 2018-2025, NVIDIA CORPORATION. All rights reserved.
Package etl provides utilities to initialize and use transformation pods.
- Copyright (c) 2021-2025, NVIDIA CORPORATION. All rights reserved.
Package etl provides utilities to initialize and use transformation pods.
- Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.
Package etl provides utilities to initialize and use transformation pods.
- Copyright (c) 2018-2025, NVIDIA CORPORATION. All rights reserved.
Package etl provides utilities to initialize and use transformation pods.
- Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.
Package etl provides utilities to initialize and use transformation pods.
- Copyright (c) 2018-2025, NVIDIA CORPORATION. All rights reserved.
Index ¶
- Constants
- func CleanupEntities(errCtx *cmn.ETLErrCtx, podName, svcName string) (err error)
- func Delete(etlName string) error
- func GetOfflineTransform(etlName string, xctn core.Xact) (getROC core.GetROC, xetl *XactETL, session Session, err error)
- func GetPipeline(etlNames []string) (apc.ETLPipeline, error)
- func PodHealth(etlName string) (string, error)
- func Stop(etlName string, errCause error) (err error)
- func StopAll()
- func StopByXid(xid string, errCause error) error
- func Tinit()
- func ValidateSecret(etlName, secret string) error
- type CPUMemByTarget
- type CPUMemUsed
- type CommStats
- type Communicator
- type Details
- type ETLEntity
- type ETLObjDownloadCtx
- type ETLSpecMsg
- type ETLs
- type HealthByTarget
- type HealthStatus
- type Info
- type InfoList
- type InitMsg
- type InitMsgBase
- func (m *InitMsgBase) Cname() string
- func (m *InitMsgBase) CommType() string
- func (m *InitMsgBase) GetEnv() []corev1.EnvVar
- func (m *InitMsgBase) IsDirectPut() bool
- func (m *InitMsgBase) Name() string
- func (m *InitMsgBase) PodName(tid string) string
- func (m *InitMsgBase) Timeouts() (initTimeout, objTimeout cos.Duration)
- func (m *InitMsgBase) Validate(detail string) error
- type InitSpecMsg
- type InlineTransArgs
- type Logs
- type LogsByTarget
- type MD
- func (e *MD) Add(msg InitMsg, stage Stage, podMap PodMap) error
- func (e *MD) Get(id string) (en ETLEntity, present bool)
- func (e *MD) Init(l int)
- func (*MD) JspOpts() jsp.Options
- func (e *MD) MarshalJSON() ([]byte, error)
- func (e *MD) String() string
- func (e *MD) UnmarshalJSON(data []byte) (err error)
- type ObjErr
- type ObjErrs
- type PodInfo
- type PodMap
- type RuntimeSpec
- type Session
- type Stage
- type WebsocketCtrlMsg
- type XactETL
Constants ¶
const ( // init message types SpecType = "spec" CodeType = "code" ETLSpecType = "etl-spec" // common fields Name = "name" CommunicationType = "communication_type" DirectPut = "direct_put" // `InitSpecMsg` fields Spec = "spec" // `ETLSpecMsg` fields Runtime = "runtime" Image = "image" Command = "command" Env = "env" // consts for unmarshalling ETL details InitMsgType = "init_msg" ObjErrsType = "obj_errors" )
const ( CommTypeAnnotation = "communication_type" // communication type to use if not explicitly set in the init message SupportDirectPutAnnotation = "support_direct_put" // indicates whether the ETL supports direct PUT; affects how the target interacts with it WaitTimeoutAnnotation = "wait_timeout" // timeout duration to wait for the ETL pod to become ready )
const ( DefaultInitTimeout = 45 * time.Second DefaultObjTimeout = 10 * time.Second DefaultAbortTimeout = 2 * time.Second DefaultContainerPort = 8000 )
const ( // ETL container receives POST request from target with the data. It // must read the data and return response to the target which then will be // transferred to the client. Hpush = "hpush://" // Target redirects the GET request to the ETL container. Then ETL container // contacts the target via `AIS_TARGET_URL` env variable to get the data. // The data is then transformed and returned to the client. Hpull = "hpull://" // Similar to redirection strategy but with usage of reverse proxy. HpushStdin = "io://" // WebSocket communication. WebSocket = "ws://" )
enum communication types (`commTypes`)
const CommTypeSeparator = "://"
consistent with rfc2396.txt "Uniform Resource Identifiers (URI): Generic Syntax"
const MaxObjErr = 128
const PrefixXactID = "etl-"
Variables ¶
This section is empty.
Functions ¶
func CleanupEntities ¶ added in v1.4.0
cleanupEntities removes provided entities. It tries its best to remove all entities so it doesn't stop when encountering an error.
func GetOfflineTransform ¶ added in v1.3.28
func GetPipeline ¶ added in v1.4.0
func GetPipeline(etlNames []string) (apc.ETLPipeline, error)
func Stop ¶
three cases to call Stop() 1. user's DELETE requests 2. initialization failed 3. transaction/xaction abort (StopByXid)
func ValidateSecret ¶ added in v1.3.28
Types ¶
type CPUMemByTarget ¶
type CPUMemByTarget []*CPUMemUsed
type CPUMemUsed ¶
type CPUMemUsed struct {
TargetID string `json:"target_id"`
CPU float64 `json:"cpu"`
Mem int64 `json:"mem"`
}
func PodMetrics ¶
func PodMetrics(etlName string) (*CPUMemUsed, error)
type Communicator ¶
type Communicator interface {
ETLName() string
String() string
GetSecret() string
Xact() *XactETL // underlying `apc.ActETLInline` xaction (see xact/xs/etl.go)
CommStats // only stats for `apc.ActETLInline` inline transform
// InlineTransform uses one of the two ETL container endpoints:
// - Method "PUT", Path "/"
// - Method "GET", Path "/bucket/object"
// - Returns:
// - size: the size of transformed object
// - ecode: error code
// - err: error encountered during transformation
InlineTransform(w http.ResponseWriter, r *http.Request, lom *core.LOM, args *InlineTransArgs) (size int64, ecode int, err error)
// ProcessDownloadJob extracts objects from job and routes them to ETL pod
ProcessDownloadJob(ctx *ETLObjDownloadCtx) (cos.ReadCloseSizer, int, error)
// contains filtered or unexported methods
}
Communicator is responsible for managing communications with local ETL pod.
func GetCommunicator ¶
func GetCommunicator(etlName string) (Communicator, error)
GetCommunicator retrieves the Communicator from registry by etl name Returns an error if not found or not in the Running stage.
type ETLEntity ¶ added in v1.3.30
type ETLEntity struct {
InitMsg InitMsg `json:"init_msg"`
Stage Stage `json:"stage"`
PodMap PodMap `json:"pod_map"`
}
ETLEntity represents an ETL instance managed by an individual target. - Created and added to the manager before entering the `Initializing` stage. - Removed only after the user explicitly deletes it from the `Aborted` stage.
Expected state transitions: - `Initializing`: Set up resources in the following order:
- Create (or reuse) communicator and pod watcher
- Start (or renew) xaction
- Create Kubernetes resources (pod/service)
- `Running`: All resources are active, handling inline and offline transform requests via the communicator. - `Aborted`: Kubernetes resources (pod/service) are cleaned up.
type ETLObjDownloadCtx ¶ added in v1.4.0
type ETLObjDownloadCtx struct {
ObjName string // Target object name
Link string // Source URL to download from
ETLArgs string // Transform arguments
}
ETLObjDownloadCtx contains ETL download job parameters
type ETLSpecMsg ¶ added in v1.3.30
type ETLSpecMsg struct {
InitMsgBase `yaml:",inline"` // included all optional fields from InitMsgBase
Runtime RuntimeSpec `json:"runtime" yaml:"runtime"`
Resources corev1.ResourceRequirements `json:"resources,omitempty" yaml:"resources,omitempty" swaggertype:"object"`
}
ETLSpecMsg is a YAML representation of the ETL pod spec. swagger:model
func (*ETLSpecMsg) FormatEnv ¶ added in v1.3.30
func (e *ETLSpecMsg) FormatEnv() string
func (*ETLSpecMsg) MsgType ¶ added in v1.3.30
func (*ETLSpecMsg) MsgType() string
func (*ETLSpecMsg) ParsePodSpec ¶ added in v1.3.30
func (e *ETLSpecMsg) ParsePodSpec() (*corev1.Pod, error)
func (*ETLSpecMsg) String ¶ added in v1.3.30
func (e *ETLSpecMsg) String() string
func (*ETLSpecMsg) UnmarshalYAML ¶ added in v1.4.0
func (e *ETLSpecMsg) UnmarshalYAML(node *yaml.Node) error
UnmarshalYAML works around the fact that resource.Quantity can't unmarshal from YAML directly. We decode the YAML node into a map, marshal it to JSON, and unmarshal again to parse resource.Quantity correctly.
func (*ETLSpecMsg) Validate ¶ added in v1.3.30
func (e *ETLSpecMsg) Validate() error
type HealthByTarget ¶
type HealthByTarget []*HealthStatus
type HealthStatus ¶
type Info ¶
type InitMsg ¶
type InitMsg interface {
Name() string
Cname() string
PodName(tid string) string // ETL pod name on the given target
MsgType() string
CommType() string
Validate() error
IsDirectPut() bool
ParsePodSpec() (*corev1.Pod, error)
Timeouts() (initTimeout, objTimeout cos.Duration)
GetEnv() []corev1.EnvVar
String() string
}
func GetInitMsg ¶ added in v1.3.28
func UnmarshalInitMsg ¶
type InitMsgBase ¶
type InitMsgBase struct {
EtlName string `json:"name" yaml:"name"`
CommTypeX string `json:"communication" yaml:"communication"`
InitTimeout cos.Duration `json:"init_timeout,omitempty" yaml:"init_timeout,omitempty" swaggertype:"primitive,string"`
ObjTimeout cos.Duration `json:"obj_timeout,omitempty" yaml:"obj_timeout,omitempty" swaggertype:"primitive,string"`
SupportDirectPut bool `json:"support_direct_put,omitempty" yaml:"support_direct_put,omitempty"`
Env []corev1.EnvVar `json:"env,omitempty" yaml:"env,omitempty" swaggertype:"array,object"`
}
and implementations swagger:model
func (*InitMsgBase) Cname ¶ added in v1.3.30
func (m *InitMsgBase) Cname() string
func (*InitMsgBase) CommType ¶
func (m *InitMsgBase) CommType() string
func (*InitMsgBase) GetEnv ¶ added in v1.3.30
func (m *InitMsgBase) GetEnv() []corev1.EnvVar
func (*InitMsgBase) IsDirectPut ¶ added in v1.3.28
func (m *InitMsgBase) IsDirectPut() bool
func (*InitMsgBase) Name ¶
func (m *InitMsgBase) Name() string
func (*InitMsgBase) PodName ¶ added in v1.4.0
func (m *InitMsgBase) PodName(tid string) string
func (*InitMsgBase) Timeouts ¶ added in v1.3.30
func (m *InitMsgBase) Timeouts() (initTimeout, objTimeout cos.Duration)
func (*InitMsgBase) Validate ¶ added in v1.3.30
func (m *InitMsgBase) Validate(detail string) error
type InitSpecMsg ¶
type InitSpecMsg struct {
Spec []byte `json:"spec"`
InitMsgBase `yaml:",inline"`
}
swagger:model
func (*InitSpecMsg) MsgType ¶ added in v1.3.19
func (*InitSpecMsg) MsgType() string
func (*InitSpecMsg) ParsePodSpec ¶ added in v1.3.29
func (m *InitSpecMsg) ParsePodSpec() (*corev1.Pod, error)
ParsePodSpec parses `m.Spec` into a Kubernetes Pod object.
func (*InitSpecMsg) String ¶
func (m *InitSpecMsg) String() string
func (*InitSpecMsg) Validate ¶
func (m *InitSpecMsg) Validate() error
type InlineTransArgs ¶ added in v1.4.0
type InlineTransArgs struct {
TransformArgs string
Pipeline apc.ETLPipeline
LatestVer bool
}
type LogsByTarget ¶
type LogsByTarget []Logs
type ObjErr ¶ added in v1.3.30
type PodInfo ¶ added in v1.3.30
type RuntimeSpec ¶ added in v1.3.29
type RuntimeSpec struct {
Image string `json:"image" yaml:"image"`
Command []string `json:"command,omitempty" yaml:"command,omitempty"`
Env []corev1.EnvVar `json:"env,omitempty" yaml:"env,omitempty" swaggertype:"array,object"`
}
swagger:model
type Session ¶ added in v1.3.28
type Session interface {
// Finish cleans up the job's communication channel, and aborts the undergoing xaction (`TCB`/`TCO`) if errCause is provided
Finish(errCause error) error
OfflineWrite(lom *core.LOM, latestVer, sync bool, writer io.WriteCloser, args *core.ETLArgs) (written int64, ecode int, err error)
// contains filtered or unexported methods
}
Session represents a per-xaction communication context created by the statefulCommunicator.
type Stage ¶ added in v1.3.28
type Stage int
enum ETL lifecycle status (see docs/etl.md#etl-pod-lifecycle for details)
type WebsocketCtrlMsg ¶ added in v1.3.28
type XactETL ¶ added in v1.3.30
type XactETL struct {
InlineObjErrs cos.Errs
Vlabs map[string]string
xact.Base
// contains filtered or unexported fields
}
represents `apc.ActETLInline` kind of xaction (`apc.ActETLBck`/`apc.ActETLObject` kinds are managed by tcb/tcobjs) responsible for triggering global abort on error to ensure all related ETL resources are cleaned up across all targets.