Documentation
¶
Overview ¶
Package transport provides long-lived http/tcp connections for intra-cluster communications
- Copyright (c) 2018-2025, NVIDIA CORPORATION. All rights reserved.
Package transport provides long-lived http/tcp connections for intra-cluster communications
- Copyright (c) 2018-2026, NVIDIA CORPORATION. All rights reserved.
Package transport provides long-lived http/tcp connections for intra-cluster communications
- Copyright (c) 2018-2025, NVIDIA CORPORATION. All rights reserved.
Package transport provides long-lived http/tcp connections for intra-cluster communications
- Copyright (c) 2018-2025, NVIDIA CORPORATION. All rights reserved.
Package transport provides long-lived http/tcp connections for intra-cluster communications
- Copyright (c) 2018-2026, NVIDIA CORPORATION. All rights reserved.
Package transport provides long-lived http/tcp connections for intra-cluster communications
- Copyright (c) 2018-2025, NVIDIA CORPORATION. All rights reserved.
Package transport provides long-lived http/tcp connections for intra-cluster communications
- Copyright (c) 2018-2025, NVIDIA CORPORATION. All rights reserved.
Package transport provides long-lived http/tcp connections for intra-cluster communications
- Copyright (c) 2018-2025, NVIDIA CORPORATION. All rights reserved.
Package transport provides long-lived http/tcp connections for intra-cluster communications
- Copyright (c) 2018-2025, NVIDIA CORPORATION. All rights reserved.
Package transport provides long-lived http/tcp connections for intra-cluster communications
- Copyright (c) 2018-2025, NVIDIA CORPORATION. All rights reserved.
Package transport provides long-lived http/tcp connections for intra-cluster communications
- Copyright (c) 2018-2025, NVIDIA CORPORATION. All rights reserved.
Package transport provides long-lived http/tcp connections for intra-cluster communications
- Copyright (c) 2018-2025, NVIDIA CORPORATION. All rights reserved.
Example (Headers) ¶
package main
import (
"encoding/binary"
"fmt"
"io"
"net/http"
"net/http/httptest"
"sync"
"github.com/NVIDIA/aistore/api/apc"
"github.com/NVIDIA/aistore/cmn"
"github.com/NVIDIA/aistore/cmn/cos"
"github.com/NVIDIA/aistore/memsys"
"github.com/NVIDIA/aistore/transport"
)
const (
lorem = `Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut
labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat.`
duis = `Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur.
Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.`
et = `Et harum quidem rerum facilis est et expedita distinctio. Nam libero tempore, cum soluta nobis est
eligendi optio, cumque nihil impedit, quo minus id, quod maxime placeat, facere possimus, omnis voluptas assumenda est, omnis dolor repellendus.`
temporibus = `Temporibus autem quibusdam et aut officiis debitis aut rerum necessitatibus saepe eveniet,
ut et voluptates repudiandae sint et molestiae non-recusandae.`
)
func main() {
f := func(_ http.ResponseWriter, r *http.Request) {
body, err := cos.ReadAll(r.Body)
if err != nil {
panic(err)
}
if len(body) == 0 {
return
}
var (
hdr transport.ObjHdr
hlen, off int
)
for {
hlen = int(binary.BigEndian.Uint64(body[off:]))
off += 16 // hlen and hlen-checksum
hdr = transport.ExtObjHeader(body[off:], hlen)
if transport.ReservedOpcode(hdr.Opcode) {
break
}
fmt.Printf("Bck:%s ObjName:%s SID:%s Opaque:%v ObjAttrs:{%s} (%d)\n",
hdr.Bck.String(), hdr.ObjName, hdr.SID, hdr.Opaque, hdr.ObjAttrs.String(), hlen)
off += hlen + int(hdr.ObjAttrs.Size)
}
}
ts := httptest.NewServer(http.HandlerFunc(f))
defer ts.Close()
httpclient := transport.NewIntraDataClient()
stream := transport.NewObjStream(httpclient, ts.URL, cos.GenTie(), nil)
sendText(stream, lorem, duis)
stream.Fin()
}
func sendText(stream *transport.Stream, txt1, txt2 string) {
var wg sync.WaitGroup
cb := func(*transport.ObjHdr, io.ReadCloser, any, error) {
wg.Done()
}
sgl1 := memsys.PageMM().NewSGL(0)
sgl1.Write([]byte(txt1))
hdr := transport.ObjHdr{
Bck: cmn.Bck{
Name: "abc",
Provider: apc.AWS,
Ns: cmn.Ns{UUID: "uuid", Name: "namespace"},
},
ObjName: "X",
ObjAttrs: cmn.ObjAttrs{
Size: sgl1.Size(),
Atime: 663346294,
Cksum: cos.NewCksum(cos.ChecksumCesXxh, "h1"),
},
Opaque: nil,
}
hdr.ObjAttrs.SetVersion("1")
wg.Add(1)
stream.Send(&transport.Obj{Hdr: hdr, Reader: sgl1, SentCB: cb})
wg.Wait()
sgl2 := memsys.PageMM().NewSGL(0)
sgl2.Write([]byte(txt2))
hdr = transport.ObjHdr{
Bck: cmn.Bck{
Name: "abracadabra",
Provider: apc.AIS,
Ns: cmn.NsGlobal,
},
ObjName: "p/q/s",
ObjAttrs: cmn.ObjAttrs{
Size: sgl2.Size(),
Atime: 663346294,
Cksum: cos.NewCksum(cos.ChecksumCesXxh, "h2"),
},
Opaque: []byte{'1', '2', '3'},
}
hdr.ObjAttrs.SetVersion("222222222222222222222222")
hdr.ObjAttrs.SetCustomMD(cos.StrKVs{"xx": "11", "yy": "22"})
wg.Add(1)
stream.Send(&transport.Obj{Hdr: hdr, Reader: sgl2, SentCB: cb})
wg.Wait()
}
Output: Bck:s3://@uuid#namespace/abc ObjName:X SID: Opaque:[] ObjAttrs:{231B, v"1", xxhash2[h1], map[]} (72) Bck:ais://abracadabra ObjName:p/q/s SID: Opaque:[49 50 51] ObjAttrs:{213B, v"222222222222222222222222", xxhash2[h2], map[xx:11 yy:22]} (113)
Example (Obj) ¶
package main
import (
"fmt"
"io"
"net/http/httptest"
"sync"
"time"
"github.com/NVIDIA/aistore/3rdparty/golang/mux"
"github.com/NVIDIA/aistore/api/apc"
"github.com/NVIDIA/aistore/cmn"
"github.com/NVIDIA/aistore/cmn/cos"
"github.com/NVIDIA/aistore/memsys"
"github.com/NVIDIA/aistore/transport"
)
const (
lorem = `Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut
labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat.`
duis = `Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur.
Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.`
et = `Et harum quidem rerum facilis est et expedita distinctio. Nam libero tempore, cum soluta nobis est
eligendi optio, cumque nihil impedit, quo minus id, quod maxime placeat, facere possimus, omnis voluptas assumenda est, omnis dolor repellendus.`
temporibus = `Temporibus autem quibusdam et aut officiis debitis aut rerum necessitatibus saepe eveniet,
ut et voluptates repudiandae sint et molestiae non-recusandae.`
)
var objmux *mux.ServeMux
func sendText(stream *transport.Stream, txt1, txt2 string) {
var wg sync.WaitGroup
cb := func(*transport.ObjHdr, io.ReadCloser, any, error) {
wg.Done()
}
sgl1 := memsys.PageMM().NewSGL(0)
sgl1.Write([]byte(txt1))
hdr := transport.ObjHdr{
Bck: cmn.Bck{
Name: "abc",
Provider: apc.AWS,
Ns: cmn.Ns{UUID: "uuid", Name: "namespace"},
},
ObjName: "X",
ObjAttrs: cmn.ObjAttrs{
Size: sgl1.Size(),
Atime: 663346294,
Cksum: cos.NewCksum(cos.ChecksumCesXxh, "h1"),
},
Opaque: nil,
}
hdr.ObjAttrs.SetVersion("1")
wg.Add(1)
stream.Send(&transport.Obj{Hdr: hdr, Reader: sgl1, SentCB: cb})
wg.Wait()
sgl2 := memsys.PageMM().NewSGL(0)
sgl2.Write([]byte(txt2))
hdr = transport.ObjHdr{
Bck: cmn.Bck{
Name: "abracadabra",
Provider: apc.AIS,
Ns: cmn.NsGlobal,
},
ObjName: "p/q/s",
ObjAttrs: cmn.ObjAttrs{
Size: sgl2.Size(),
Atime: 663346294,
Cksum: cos.NewCksum(cos.ChecksumCesXxh, "h2"),
},
Opaque: []byte{'1', '2', '3'},
}
hdr.ObjAttrs.SetVersion("222222222222222222222222")
hdr.ObjAttrs.SetCustomMD(cos.StrKVs{"xx": "11", "yy": "22"})
wg.Add(1)
stream.Send(&transport.Obj{Hdr: hdr, Reader: sgl2, SentCB: cb})
wg.Wait()
}
func main() {
receive := func(hdr *transport.ObjHdr, objReader io.Reader, err error) error {
cos.Assert(err == nil)
object, err := cos.ReadAll(objReader)
if err != nil {
panic(err)
}
if int64(len(object)) != hdr.ObjAttrs.Size {
panic(fmt.Sprintf("size %d != %d", len(object), hdr.ObjAttrs.Size))
}
fmt.Printf("%s...\n", string(object[:16]))
return nil
}
ts := httptest.NewServer(objmux)
defer ts.Close()
trname := "dummy-obj"
err := transport.Handle(trname, receive)
if err != nil {
fmt.Println(err)
return
}
httpclient := transport.NewIntraDataClient()
stream := transport.NewObjStream(httpclient, ts.URL+transport.ObjURLPath(trname), cos.GenTie(), nil)
sendText(stream, lorem, duis)
sendText(stream, et, temporibus)
stream.Fin()
}
Output: Lorem ipsum dolo... Duis aute irure ... Et harum quidem ... Temporibus autem...
Index ¶
- Constants
- func DrainAndFreeReader(r io.Reader)
- func FreeRecv(object io.Reader)
- func Handle(trname string, rxObj RecvObj) error
- func IsErrDuplicateTrname(e error) bool
- func IsErrSBR(err error) bool
- func ObjURLPath(trname string) string
- func ReservedOpcode(opc int) bool
- func RxAnyStream(w http.ResponseWriter, r *http.Request)
- func Unhandle(trname string) error
- type Client
- type ErrSBR
- type ErrStreamTerm
- type Extra
- type Obj
- type ObjHdr
- type Parent
- type Receiver
- type RecvObj
- type SentCB
- type Stream
- func (s *Stream) Abort()
- func (s *Stream) Fin()
- func (s *Stream) ID() (string, int64)
- func (s *Stream) IsTerminated() bool
- func (s *Stream) Read(b []byte) (n int, err error)
- func (s *Stream) Send(obj *Obj) (err error)
- func (s *Stream) Stop()
- func (s *Stream) String() string
- func (s *Stream) TermInfo() (reason string, err error)
- func (s *Stream) URL() string
- type StreamCollector
- type TermedCB
Examples ¶
Constants ¶
const ( OpcDone = iota + 27182 OpcAbort OpcRequest OpcResponse )
group 1: application-level opcodes (sentinels)
const (
OpcReconnect = iota + 46351
)
group 2: transport/bundle (data mover's) opcodes
const (
SizeUnknown = cos.ContentLengthUnknown // -1: obj size unknown (not set)
)
Variables ¶
This section is empty.
Functions ¶
func DrainAndFreeReader ¶
DrainAndFreeReader: 1) reads and discards all the data from `r` - the `objReader`; 2) frees this objReader back to the `recvPool`. As such, this function is intended for usage only and exclusively by `transport.RecvObj` implementations.
func IsErrDuplicateTrname ¶
func ObjURLPath ¶
func ReservedOpcode ¶
Types ¶
type Client ¶
func NewIntraDataClient ¶
func NewIntraDataClient() Client
intra-cluster networking: fasthttp client
type ErrStreamTerm ¶ added in v1.4.1
type ErrStreamTerm struct {
// contains filtered or unexported fields
}
Tx
func (*ErrStreamTerm) Error ¶ added in v1.4.1
func (e *ErrStreamTerm) Error() string
func (*ErrStreamTerm) Unwrap ¶ added in v1.4.1
func (e *ErrStreamTerm) Unwrap() error
type Extra ¶
type Extra struct {
Parent *Parent
Config *cmn.Config // (to optimize-out GCO.Get())
Compression string // see CompressAlways, etc. enum
IdleTeardown time.Duration // when exceeded, causes PUT to terminate (and to renew upon the very next send)
ChanBurst int // overrides config.Transport.Burst
SizePDU int32 // NOTE: 0(zero): no PDUs; must be <= `maxSizePDU`; unknown size _requires_ PDUs
MaxHdrSize int32 // overrides config.Transport.MaxHeaderSize
}
func (*Extra) Compressed ¶
type Obj ¶
type Obj struct {
Reader io.ReadCloser // reader (to read the object, and close when done)
CmplArg any // optional context passed to the SentCB callback
SentCB SentCB // called when the last byte is sent _or_ when the stream terminates (see term.reason)
Hdr ObjHdr
// contains filtered or unexported fields
}
object to transmit
func (*Obj) IsHeaderOnly ¶
type ObjHdr ¶
type ObjHdr struct {
Bck cmn.Bck
ObjName string
SID string // sender node ID
Demux string // for shared data mover(s), to demux on the receive side
Opaque []byte // custom control (optional)
ObjAttrs cmn.ObjAttrs // attributes/metadata of the object that's being transmitted
Opcode int // (see reserved range above)
}
_object_ header (not to confuse w/ objects in buckets)
func ExtObjHeader ¶
func (*ObjHdr) IsHeaderOnly ¶
type Parent ¶ added in v1.4.1
type Parent struct {
Xact core.Xact // sender ID; abort
SentCB SentCB // to free SGLs, close files, etc. cleanup
TermedCB TermedCB // when err-ed
}
usage and scope: - entire stream's lifetime (all Send() calls) - additional stream control - global or optional params (to override defaults)
type Receiver ¶ added in v1.3.30
type Receiver interface {
ID() string
RecvObj(hdr *ObjHdr, objReader io.Reader, err error) error // Rx callback above
}
shared data mover (SDM)
type SentCB ¶ added in v1.4.1
type SentCB func(*ObjHdr, io.ReadCloser, any, error)
object-sent callback that has the following signature can optionally be defined on a: a) per-stream basis (via NewStream constructor - see Extra struct above) b) for a given object that is being sent (for instance, to support a call-per-batch semantics) Naturally, object callback "overrides" the per-stream one: when object callback is defined (i.e., non-nil), the stream callback is ignored/skipped. NOTE: if defined, the callback executes asynchronously as far as the sending part is concerned
type Stream ¶
type Stream struct {
// contains filtered or unexported fields
}
object stream & private types
func NewObjStream ¶
func (*Stream) IsTerminated ¶
func (s *Stream) IsTerminated() bool
func (*Stream) Send ¶
Asynchronously send an object (transport.Obj) defined by its header and its reader.
The sending pipeline is implemented as a pair (SQ, SCQ) where the former is a send queue realized as workCh, and the latter is a send completion queue (cmplCh). Together SQ and SCQ form a FIFO.
- header-only objects are supported; when there's no data to send (that is, when the header's Dsize field is set to zero), the reader is not required and the corresponding argument in Send() can be set to nil.
- object reader is *always* closed irrespectively of whether the Send() succeeds or fails. On success, if send-completion (SentCB) callback is provided (i.e., non-nil), the closing is done by doCmpl().
- Optional reference counting is also done by (and in) the doCmpl, so that the SentCB gets called if and only when the refcount (if provided i.e., non-nil) reaches zero.
- For every transmission of every object there's always an doCmpl() completion (with its refcounting and reader-closing). This holds true in all cases including network errors that may cause sudden and instant termination of the underlying stream(s).
type StreamCollector ¶
type StreamCollector struct{}
stream collector
func Init ¶
func Init(tstats cos.StatsUpdater) *StreamCollector
func (*StreamCollector) Name ¶
func (*StreamCollector) Name() string
func (*StreamCollector) Run ¶
func (sc *StreamCollector) Run() error
func (*StreamCollector) Stop ¶
func (sc *StreamCollector) Stop(err error)
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
Package bundle provides multi-streaming transport with the functionality to dynamically (un)register receive endpoints, establish long-lived flows, and more.
|
Package bundle provides multi-streaming transport with the functionality to dynamically (un)register receive endpoints, establish long-lived flows, and more. |