stream

package
v0.0.0-...-d0dabfa Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 30, 2020 License: MIT Imports: 32 Imported by: 3

Documentation

Index

Constants

View Source
const (
	TAG_STREAMSTART             = "STREAM_START"
	TAG_STREAMEND               = "STREAM_END"
	TAG_STREAMSUBSCRIBE         = "STREAM_SUBSCRIBE"
	TAG_STREAMSEARCH            = "STREAM_SEARCH"
	TAG_STREAM_COMPLETE         = "STREAM_COMPLETE"
	TAG_WORKERSTART             = "WORKER_START"
	TAG_WORKEREND               = "WORKER_END"
	TAG_BLOCKSEND               = "STREAM_BLOCK_SEND"
	TAG_BLOCKSEND_FAILED        = "STREAM_BLOCK_SEND_FAILED"
	TAG_BLOCKSEND_COMPLETE      = "STREAM_BLOCK_SEND_COMPLETE"
	TAG_BLOCKRECEIVE            = "STREAM_BLOCK_RECV"
	TAG_STREAMREQUEST           = "STREAM_REQUEST"
	TAG_STREAMRESPONSE          = "STREAM_RESPONSE"
	TAG_STREAMREQUESTACCEPT     = "STREAM_REQUEST_ACCEPT"
	TAG_STREAM_REQUEST_ACCEPTED = "STREAM_REQUEST_ACCEPTED"
	TAG_STREAMREQUESTREJECT     = "STREAM_REQUEST_REJECT"
	TAG_WORKERSTORE_REMOVE      = "WORKER_REMOVE"
	TAG_STREAM_SEARCHTIMEOUT    = "STREAM_SEARCH_TIMEOUT"
	TAG_PROVIDER_ADD            = "PROVIDER_ADD"
	TAG_PROVIDER_REMOVE         = "PROVIDER_REMOVE"
	TAG_RETRY_SUBSCRIBE         = "RETRY_SUBSCRIBE"
	TAG_SEARCH_BUSY             = "SEARCH_BUSY"
	TAG_SEARCH_NOMETA           = "SEARCH_NOMETA"
	TAG_SEARCH_NOPROVIDER       = "SEARCH_NOPROVIDER"
	TAG_SEARCH_GETPROVIDER      = "SEARCH_GETPROVIDER"
	TAG_WORKERSTORE_ADD         = "WORKER_ADD"
	TAG_SERVICE_NEWMESSAGE      = "SERVICE_NEW_MESSAGE"
	TAG_SERVICE_DONEMESSAGE     = "SERVICE_DONE_MESSAGE"
	TAG_STREAM_STARTHANDLE      = "STREAM_START_HANDLE"
	TAG_STREAM_DONEHANDLE       = "STREAM_DONE_HANDLE"
	TAG_STATUS                  = "STREAM_STATUS"
)
View Source
const InfoObsoleteTime = time.Hour * 5
View Source
const InformTimeOut = time.Second * 20
View Source
const RecvTimeout = time.Minute
View Source
const WorkerTimeout = time.Minute

Variables

View Source
var ErrRedundantReq = fmt.Errorf("Request is redundant")
View Source
var ErrUnknowkStream = fmt.Errorf("Unknown stream")

Functions

This section is empty.

Types

type Conn

type Conn struct {
	// contains filtered or unexported fields
}

type ConnEle

type ConnEle interface {
	Close() error
}

type ConnPool

type ConnPool struct {
	// contains filtered or unexported fields
}

func NewConnPool

func NewConnPool(factory Factory, capacity int, connTimeOut time.Duration) (*ConnPool, error)

func (*ConnPool) Close

func (cp *ConnPool) Close()

func (*ConnPool) Get

func (cp *ConnPool) Get() (ConnEle, error)

func (*ConnPool) Put

func (cp *ConnPool) Put(conn ConnEle) error

type ErrStreamAlreadyActive

type ErrStreamAlreadyActive struct {
	// contains filtered or unexported fields
}

func (*ErrStreamAlreadyActive) Error

func (e *ErrStreamAlreadyActive) Error() string

type ErrStreamNotActive

type ErrStreamNotActive struct {
	// contains filtered or unexported fields
}

func (*ErrStreamNotActive) Error

func (e *ErrStreamNotActive) Error() string

type Factory

type Factory func() (ConnEle, error)

type ProvidedStreams

type ProvidedStreams struct {
	// contains filtered or unexported fields
}

Access through ProvidedStreams only.

type StreamInfo

type StreamInfo struct {
	// contains filtered or unexported fields
}

type StreamInfos

type StreamInfos struct {
	// contains filtered or unexported fields
}

func NewStreamInfos

func NewStreamInfos() *StreamInfos

type StreamMode

type StreamMode int
const (
	StreamMode_PUSH StreamMode = 0
	StreamMode_PULL StreamMode = 1
)

type StreamNotifee

type StreamNotifee StreamService

func (*StreamNotifee) ClosedStream

func (*StreamNotifee) ClosedStream(net network.Network, str network.Stream)

func (*StreamNotifee) Connected

func (sn *StreamNotifee) Connected(net network.Network, conn network.Conn)

func (*StreamNotifee) Disconnected

func (sn *StreamNotifee) Disconnected(net network.Network, conn network.Conn)

func (*StreamNotifee) Listen

func (*StreamNotifee) Listen(net network.Network, addr ma.Multiaddr)

func (*StreamNotifee) ListenClose

func (*StreamNotifee) ListenClose(net network.Network, addr ma.Multiaddr)

func (*StreamNotifee) OpenedStream

func (*StreamNotifee) OpenedStream(net network.Network, str network.Stream)

type StreamService

type StreamService struct {
	ReceivedFile <-chan ipld.Node
	// contains filtered or unexported fields
}

func NewStreamService

func NewStreamService(
	account *keypair.Full,
	node func() *core.IpfsNode,
	datastore repo.Datastore,
	sendNotification func(*pb.Notification) error,
	subscribe func(string) error,
	getShadow func() string,
	getShadowIp func() string,
	ctx context.Context,
) *StreamService

NewStreamService returns a new stream service

func (*StreamService) CloseStream

func (h *StreamService) CloseStream(sid string)

func (*StreamService) CreateTCPConnPool

func (h *StreamService) CreateTCPConnPool()

func (*StreamService) FetchBlocks

func (h *StreamService) FetchBlocks(streamId string, startIndex uint64, maxNum int) ([]*pb.StreamBlock, error)

FetchBlocks fetches a list of blocks of a specific stream from database

func (*StreamService) FileAsStream

func (h *StreamService) FileAsStream(sf *pb.StreamFile, fileType pb.StreamMeta_Type) (*pb.StreamMeta, error)

* Start a new stream, where the stream id is exactly the file cid

func (*StreamService) GetDuration

func (h *StreamService) GetDuration(streamId string) int64

func (*StreamService) GetMaxWorkers

func (h *StreamService) GetMaxWorkers() int

func (*StreamService) GetParent

func (h *StreamService) GetParent(sid string) string

func (*StreamService) GetProvidedHopcnt

func (h *StreamService) GetProvidedHopcnt(config *pb.StreamRequest) (int, bool)

func (*StreamService) GetProvider

func (h *StreamService) GetProvider(sid string) peer.ID

TODO:

Use a different function to check activestreams

func (*StreamService) GetStatus

func (h *StreamService) GetStatus(sid string) (pb.StreamStatus, bool)

func (*StreamService) GetStreamMode

func (h *StreamService) GetStreamMode() StreamMode

func (*StreamService) Handle

func (h *StreamService) Handle(env *pb.Envelope, pid peer.ID) (*pb.Envelope, error)

Handle is called by the underlying service handler method

func (*StreamService) HandleStream

func (h *StreamService) HandleStream(_ *pb.Envelope, _ peer.ID) (chan *pb.Envelope, chan error, chan interface{})

HandleStream is called by the underlying service handler method

func (*StreamService) InformPush

func (h *StreamService) InformPush(peerId string, streamMeta *pb.StreamMeta, tree map[string][]string) error

====================== For Push ======================

func (*StreamService) IsBusy

func (h *StreamService) IsBusy() bool

func (*StreamService) Loggable

func (h *StreamService) Loggable() map[string]interface{}

===================== OTHERS =========================

func (*StreamService) OnStreamMeta

func (h *StreamService) OnStreamMeta(meta *pb.StreamMeta, treePrevious []string)

* ThreadGetStream is called when a new stream meta comes through application (thread most case). * In that case, a timer will be set. * If no inform receive before the timer end, the status would be set to timeout.

func (*StreamService) PeerDisconnected

func (h *StreamService) PeerDisconnected(pid peer.ID)

============== FOR PEER MANAGEMENT ===================

func (*StreamService) Ping

func (h *StreamService) Ping(pid peer.ID) (service.PeerStatus, error)

Ping pings another peer

func (*StreamService) Protocol

func (h *StreamService) Protocol() protocol.ID

Protocol returns the handler protocol

func (*StreamService) RequestAccepted

func (h *StreamService) RequestAccepted(peerId string, config *pb.StreamRequest)

RequestAccepted is called when a stream request is accepted by some peer.

func (*StreamService) SendMessage

func (h *StreamService) SendMessage(ctx context.Context, peerId string, env *pb.Envelope) error

SendMessage sends a message to a peer.

func (*StreamService) SendStreamBlcoksToShadow_TCP

func (h *StreamService) SendStreamBlcoksToShadow_TCP(peerId peer.ID, blks []*pb.StreamBlock) error

func (*StreamService) SendStreamBlocks

func (h *StreamService) SendStreamBlocks(peerId peer.ID, blks []*pb.StreamBlock) error

SendStreamBlocks send a list of block to a peer

func (*StreamService) SendStreamRequest

func (h *StreamService) SendStreamRequest(peerId string, config *pb.StreamRequest) (*pb.Envelope, error)

func (*StreamService) SendUnsubscribeRequest

func (h *StreamService) SendUnsubscribeRequest(peerId string, sid string) (*pb.Envelope, error)

func (*StreamService) SetInterval

func (h *StreamService) SetInterval(intv int64)

func (*StreamService) SetMaxWorkers

func (h *StreamService) SetMaxWorkers(n int)

func (*StreamService) SetStreamMode

func (h *StreamService) SetStreamMode(n int)

func (*StreamService) Start

func (h *StreamService) Start()

Start begins online services

func (*StreamService) StartStream

func (h *StreamService) StartStream(config *pb.StreamMeta)

======================= FOR STREAM MANAGEMENT =============================

func (*StreamService) Started

func (h *StreamService) Started(sid string) bool

*

  • Started return true if there stream with id "sid" is working.

func (*StreamService) StreamAddFile

func (h *StreamService) StreamAddFile(id string, sf *pb.StreamFile) error

func (*StreamService) UnsubscribeStream

func (h *StreamService) UnsubscribeStream(sid string) error

UnsubscribeStream want to unsubscribe to a stream, and send a request to the provider.

func (*StreamService) WorkerStat

func (h *StreamService) WorkerStat()

func (*StreamService) Workload

func (h *StreamService) Workload() int

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL