Documentation
¶
Index ¶
- Constants
- Variables
- type Conn
- type ConnEle
- type ConnPool
- type ErrStreamAlreadyActive
- type ErrStreamNotActive
- type Factory
- type ProvidedStreams
- type StreamInfo
- type StreamInfos
- type StreamMode
- type StreamNotifee
- func (*StreamNotifee) ClosedStream(net network.Network, str network.Stream)
- func (sn *StreamNotifee) Connected(net network.Network, conn network.Conn)
- func (sn *StreamNotifee) Disconnected(net network.Network, conn network.Conn)
- func (*StreamNotifee) Listen(net network.Network, addr ma.Multiaddr)
- func (*StreamNotifee) ListenClose(net network.Network, addr ma.Multiaddr)
- func (*StreamNotifee) OpenedStream(net network.Network, str network.Stream)
- type StreamService
- func (h *StreamService) CloseStream(sid string)
- func (h *StreamService) CreateTCPConnPool()
- func (h *StreamService) FetchBlocks(streamId string, startIndex uint64, maxNum int) ([]*pb.StreamBlock, error)
- func (h *StreamService) FileAsStream(sf *pb.StreamFile, fileType pb.StreamMeta_Type) (*pb.StreamMeta, error)
- func (h *StreamService) GetDuration(streamId string) int64
- func (h *StreamService) GetMaxWorkers() int
- func (h *StreamService) GetParent(sid string) string
- func (h *StreamService) GetProvidedHopcnt(config *pb.StreamRequest) (int, bool)
- func (h *StreamService) GetProvider(sid string) peer.ID
- func (h *StreamService) GetStatus(sid string) (pb.StreamStatus, bool)
- func (h *StreamService) GetStreamMode() StreamMode
- func (h *StreamService) Handle(env *pb.Envelope, pid peer.ID) (*pb.Envelope, error)
- func (h *StreamService) HandleStream(_ *pb.Envelope, _ peer.ID) (chan *pb.Envelope, chan error, chan interface{})
- func (h *StreamService) InformPush(peerId string, streamMeta *pb.StreamMeta, tree map[string][]string) error
- func (h *StreamService) IsBusy() bool
- func (h *StreamService) Loggable() map[string]interface{}
- func (h *StreamService) OnStreamMeta(meta *pb.StreamMeta, treePrevious []string)
- func (h *StreamService) PeerDisconnected(pid peer.ID)
- func (h *StreamService) Ping(pid peer.ID) (service.PeerStatus, error)
- func (h *StreamService) Protocol() protocol.ID
- func (h *StreamService) RequestAccepted(peerId string, config *pb.StreamRequest)
- func (h *StreamService) SendMessage(ctx context.Context, peerId string, env *pb.Envelope) error
- func (h *StreamService) SendStreamBlcoksToShadow_TCP(peerId peer.ID, blks []*pb.StreamBlock) error
- func (h *StreamService) SendStreamBlocks(peerId peer.ID, blks []*pb.StreamBlock) error
- func (h *StreamService) SendStreamRequest(peerId string, config *pb.StreamRequest) (*pb.Envelope, error)
- func (h *StreamService) SendUnsubscribeRequest(peerId string, sid string) (*pb.Envelope, error)
- func (h *StreamService) SetInterval(intv int64)
- func (h *StreamService) SetMaxWorkers(n int)
- func (h *StreamService) SetStreamMode(n int)
- func (h *StreamService) Start()
- func (h *StreamService) StartStream(config *pb.StreamMeta)
- func (h *StreamService) Started(sid string) bool
- func (h *StreamService) StreamAddFile(id string, sf *pb.StreamFile) error
- func (h *StreamService) UnsubscribeStream(sid string) error
- func (h *StreamService) WorkerStat()
- func (h *StreamService) Workload() int
Constants ¶
const ( TAG_STREAMSTART = "STREAM_START" TAG_STREAMEND = "STREAM_END" TAG_STREAMSUBSCRIBE = "STREAM_SUBSCRIBE" TAG_STREAMSEARCH = "STREAM_SEARCH" TAG_STREAM_COMPLETE = "STREAM_COMPLETE" TAG_WORKERSTART = "WORKER_START" TAG_WORKEREND = "WORKER_END" TAG_BLOCKSEND = "STREAM_BLOCK_SEND" TAG_BLOCKSEND_FAILED = "STREAM_BLOCK_SEND_FAILED" TAG_BLOCKSEND_COMPLETE = "STREAM_BLOCK_SEND_COMPLETE" TAG_BLOCKRECEIVE = "STREAM_BLOCK_RECV" TAG_STREAMREQUEST = "STREAM_REQUEST" TAG_STREAMRESPONSE = "STREAM_RESPONSE" TAG_STREAMREQUESTACCEPT = "STREAM_REQUEST_ACCEPT" TAG_STREAM_REQUEST_ACCEPTED = "STREAM_REQUEST_ACCEPTED" TAG_STREAMREQUESTREJECT = "STREAM_REQUEST_REJECT" TAG_WORKERSTORE_REMOVE = "WORKER_REMOVE" TAG_STREAM_SEARCHTIMEOUT = "STREAM_SEARCH_TIMEOUT" TAG_PROVIDER_ADD = "PROVIDER_ADD" TAG_PROVIDER_REMOVE = "PROVIDER_REMOVE" TAG_RETRY_SUBSCRIBE = "RETRY_SUBSCRIBE" TAG_SEARCH_BUSY = "SEARCH_BUSY" TAG_SEARCH_NOMETA = "SEARCH_NOMETA" TAG_SEARCH_NOPROVIDER = "SEARCH_NOPROVIDER" TAG_SEARCH_GETPROVIDER = "SEARCH_GETPROVIDER" TAG_WORKERSTORE_ADD = "WORKER_ADD" TAG_SERVICE_NEWMESSAGE = "SERVICE_NEW_MESSAGE" TAG_SERVICE_DONEMESSAGE = "SERVICE_DONE_MESSAGE" TAG_STREAM_STARTHANDLE = "STREAM_START_HANDLE" TAG_STREAM_DONEHANDLE = "STREAM_DONE_HANDLE" TAG_STATUS = "STREAM_STATUS" )
const InfoObsoleteTime = time.Hour * 5
const InformTimeOut = time.Second * 20
const RecvTimeout = time.Minute
const WorkerTimeout = time.Minute
Variables ¶
var ErrRedundantReq = fmt.Errorf("Request is redundant")
var ErrUnknowkStream = fmt.Errorf("Unknown stream")
Functions ¶
This section is empty.
Types ¶
type ConnPool ¶
type ConnPool struct {
// contains filtered or unexported fields
}
func NewConnPool ¶
type ErrStreamAlreadyActive ¶
type ErrStreamAlreadyActive struct {
// contains filtered or unexported fields
}
func (*ErrStreamAlreadyActive) Error ¶
func (e *ErrStreamAlreadyActive) Error() string
type ErrStreamNotActive ¶
type ErrStreamNotActive struct {
// contains filtered or unexported fields
}
func (*ErrStreamNotActive) Error ¶
func (e *ErrStreamNotActive) Error() string
type ProvidedStreams ¶
type ProvidedStreams struct {
// contains filtered or unexported fields
}
Access through ProvidedStreams only.
type StreamInfo ¶
type StreamInfo struct {
// contains filtered or unexported fields
}
type StreamInfos ¶
type StreamInfos struct {
// contains filtered or unexported fields
}
func NewStreamInfos ¶
func NewStreamInfos() *StreamInfos
type StreamMode ¶
type StreamMode int
const ( StreamMode_PUSH StreamMode = 0 StreamMode_PULL StreamMode = 1 )
type StreamNotifee ¶
type StreamNotifee StreamService
func (*StreamNotifee) ClosedStream ¶
func (*StreamNotifee) ClosedStream(net network.Network, str network.Stream)
func (*StreamNotifee) Connected ¶
func (sn *StreamNotifee) Connected(net network.Network, conn network.Conn)
func (*StreamNotifee) Disconnected ¶
func (sn *StreamNotifee) Disconnected(net network.Network, conn network.Conn)
func (*StreamNotifee) ListenClose ¶
func (*StreamNotifee) ListenClose(net network.Network, addr ma.Multiaddr)
func (*StreamNotifee) OpenedStream ¶
func (*StreamNotifee) OpenedStream(net network.Network, str network.Stream)
type StreamService ¶
type StreamService struct {
ReceivedFile <-chan ipld.Node
// contains filtered or unexported fields
}
func NewStreamService ¶
func NewStreamService( account *keypair.Full, node func() *core.IpfsNode, datastore repo.Datastore, sendNotification func(*pb.Notification) error, subscribe func(string) error, getShadow func() string, getShadowIp func() string, ctx context.Context, ) *StreamService
NewStreamService returns a new stream service
func (*StreamService) CloseStream ¶
func (h *StreamService) CloseStream(sid string)
func (*StreamService) CreateTCPConnPool ¶
func (h *StreamService) CreateTCPConnPool()
func (*StreamService) FetchBlocks ¶
func (h *StreamService) FetchBlocks(streamId string, startIndex uint64, maxNum int) ([]*pb.StreamBlock, error)
FetchBlocks fetches a list of blocks of a specific stream from database
func (*StreamService) FileAsStream ¶
func (h *StreamService) FileAsStream(sf *pb.StreamFile, fileType pb.StreamMeta_Type) (*pb.StreamMeta, error)
* Start a new stream, where the stream id is exactly the file cid
func (*StreamService) GetDuration ¶
func (h *StreamService) GetDuration(streamId string) int64
func (*StreamService) GetMaxWorkers ¶
func (h *StreamService) GetMaxWorkers() int
func (*StreamService) GetParent ¶
func (h *StreamService) GetParent(sid string) string
func (*StreamService) GetProvidedHopcnt ¶
func (h *StreamService) GetProvidedHopcnt(config *pb.StreamRequest) (int, bool)
func (*StreamService) GetProvider ¶
func (h *StreamService) GetProvider(sid string) peer.ID
TODO:
Use a different function to check activestreams
func (*StreamService) GetStatus ¶
func (h *StreamService) GetStatus(sid string) (pb.StreamStatus, bool)
func (*StreamService) GetStreamMode ¶
func (h *StreamService) GetStreamMode() StreamMode
func (*StreamService) HandleStream ¶
func (h *StreamService) HandleStream(_ *pb.Envelope, _ peer.ID) (chan *pb.Envelope, chan error, chan interface{})
HandleStream is called by the underlying service handler method
func (*StreamService) InformPush ¶
func (h *StreamService) InformPush(peerId string, streamMeta *pb.StreamMeta, tree map[string][]string) error
====================== For Push ======================
func (*StreamService) IsBusy ¶
func (h *StreamService) IsBusy() bool
func (*StreamService) Loggable ¶
func (h *StreamService) Loggable() map[string]interface{}
===================== OTHERS =========================
func (*StreamService) OnStreamMeta ¶
func (h *StreamService) OnStreamMeta(meta *pb.StreamMeta, treePrevious []string)
* ThreadGetStream is called when a new stream meta comes through application (thread most case). * In that case, a timer will be set. * If no inform receive before the timer end, the status would be set to timeout.
func (*StreamService) PeerDisconnected ¶
func (h *StreamService) PeerDisconnected(pid peer.ID)
============== FOR PEER MANAGEMENT ===================
func (*StreamService) Ping ¶
func (h *StreamService) Ping(pid peer.ID) (service.PeerStatus, error)
Ping pings another peer
func (*StreamService) Protocol ¶
func (h *StreamService) Protocol() protocol.ID
Protocol returns the handler protocol
func (*StreamService) RequestAccepted ¶
func (h *StreamService) RequestAccepted(peerId string, config *pb.StreamRequest)
RequestAccepted is called when a stream request is accepted by some peer.
func (*StreamService) SendMessage ¶
SendMessage sends a message to a peer.
func (*StreamService) SendStreamBlcoksToShadow_TCP ¶
func (h *StreamService) SendStreamBlcoksToShadow_TCP(peerId peer.ID, blks []*pb.StreamBlock) error
func (*StreamService) SendStreamBlocks ¶
func (h *StreamService) SendStreamBlocks(peerId peer.ID, blks []*pb.StreamBlock) error
SendStreamBlocks send a list of block to a peer
func (*StreamService) SendStreamRequest ¶
func (h *StreamService) SendStreamRequest(peerId string, config *pb.StreamRequest) (*pb.Envelope, error)
func (*StreamService) SendUnsubscribeRequest ¶
func (*StreamService) SetInterval ¶
func (h *StreamService) SetInterval(intv int64)
func (*StreamService) SetMaxWorkers ¶
func (h *StreamService) SetMaxWorkers(n int)
func (*StreamService) SetStreamMode ¶
func (h *StreamService) SetStreamMode(n int)
func (*StreamService) StartStream ¶
func (h *StreamService) StartStream(config *pb.StreamMeta)
======================= FOR STREAM MANAGEMENT =============================
func (*StreamService) Started ¶
func (h *StreamService) Started(sid string) bool
*
- Started return true if there stream with id "sid" is working.
func (*StreamService) StreamAddFile ¶
func (h *StreamService) StreamAddFile(id string, sf *pb.StreamFile) error
func (*StreamService) UnsubscribeStream ¶
func (h *StreamService) UnsubscribeStream(sid string) error
UnsubscribeStream want to unsubscribe to a stream, and send a request to the provider.
func (*StreamService) WorkerStat ¶
func (h *StreamService) WorkerStat()
func (*StreamService) Workload ¶
func (h *StreamService) Workload() int