Documentation
¶
Index ¶
- Constants
- type Service
- func (s *Service) CreateThread(ctx context.Context, req *connect.Request[threadv1.CreateThreadRequest]) (*connect.Response[threadv1.Thread], error)
- func (s *Service) ForkThread(ctx context.Context, req *connect.Request[threadv1.ForkThreadRequest]) (*connect.Response[threadv1.ForkThreadResponse], error)
- func (s *Service) GetLlmMessage(ctx context.Context, req *connect.Request[threadv1.GetLlmMessageRequest]) (*connect.Response[threadv1.LlmMessage], error)
- func (s *Service) GetStreamManager() *StreamManager
- func (s *Service) GetThread(ctx context.Context, req *connect.Request[threadv1.GetThreadRequest]) (*connect.Response[threadv1.Thread], error)
- func (s *Service) GetThreadEventListener() *natslistener.ThreadEventListener
- func (s *Service) HandleThreadEvent(nc *nats.Conn, pool *pgxpool.Pool) func(ctx context.Context, envelope events.ThreadEventEnvelope)
- func (s *Service) ListLlmMessages(ctx context.Context, req *connect.Request[threadv1.ListLlmMessagesRequest]) (*connect.Response[threadv1.ListLlmMessagesResponse], error)
- func (s *Service) ListThreads(ctx context.Context, req *connect.Request[threadv1.ListThreadsRequest]) (*connect.Response[threadv1.ListThreadsResponse], error)
- func (s *Service) NewServiceHandler(opts ...connect.HandlerOption) (string, http.Handler)
- func (s *Service) Shutdown() error
- func (s *Service) StreamThreadEvents(ctx context.Context, req *connect.Request[threadv1.StreamThreadEventsRequest], ...) error
- func (s *Service) SubmitUserMessage(ctx context.Context, req *connect.Request[threadv1.SubmitUserMessageRequest]) (*connect.Response[threadv1.LlmMessage], error)
- func (s *Service) UpdateThread(ctx context.Context, req *connect.Request[threadv1.UpdateThreadRequest]) (*connect.Response[threadv1.Thread], error)
- type StreamManager
- func (sm *StreamManager) BroadcastContentDelta(threadUID uuid.UUID, event *threadv1.ContentDeltaEvent)
- func (sm *StreamManager) BroadcastContentStart(threadUID uuid.UUID, event *threadv1.ContentStartEvent)
- func (sm *StreamManager) BroadcastContentStop(threadUID uuid.UUID, event *threadv1.ContentStopEvent)
- func (sm *StreamManager) BroadcastStreamError(threadUID uuid.UUID, event *threadv1.StreamErrorEvent)
- func (sm *StreamManager) BroadcastThreadStateChange(threadUID uuid.UUID, event *threadv1.ThreadStateChangeEvent)
- func (sm *StreamManager) BroadcastToolCall(threadUID uuid.UUID, toolCall *toolv1.ToolCall)
- func (sm *StreamManager) Subscribe(ctx context.Context, threadUID uuid.UUID, stream StreamSender) func()
- func (sm *StreamManager) Unsubscribe(threadUID uuid.UUID, sub *streamSubscription)
- type StreamSender
Constants ¶
const (
MessageResourceType = "thread-message"
)
const ResourceType = "thread"
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Service ¶
type Service struct {
threadv1alpha1connect.UnimplementedThreadServiceHandler
// contains filtered or unexported fields
}
Service implements the ThreadService
func New ¶
func New(log *logger.Logger, db *database.Pool, jobQueue *job_queue.Client, nc *nats.Conn, m *metrics.Metrics, analyticsClient *analytics.Client) (*Service, error)
New creates a new thread service
func (*Service) CreateThread ¶
func (s *Service) CreateThread( ctx context.Context, req *connect.Request[threadv1.CreateThreadRequest], ) (*connect.Response[threadv1.Thread], error)
CreateThread creates a new thread
func (*Service) ForkThread ¶
func (s *Service) ForkThread( ctx context.Context, req *connect.Request[threadv1.ForkThreadRequest], ) (*connect.Response[threadv1.ForkThreadResponse], error)
ForkThread creates a new thread from an existing thread
func (*Service) GetLlmMessage ¶
func (s *Service) GetLlmMessage( ctx context.Context, req *connect.Request[threadv1.GetLlmMessageRequest], ) (*connect.Response[threadv1.LlmMessage], error)
GetLlmMessage retrieves an LLM message of a thread
func (*Service) GetStreamManager ¶
func (s *Service) GetStreamManager() *StreamManager
GetStreamManager returns the stream manager for external registration
func (*Service) GetThread ¶
func (s *Service) GetThread( ctx context.Context, req *connect.Request[threadv1.GetThreadRequest], ) (*connect.Response[threadv1.Thread], error)
GetThread retrieves a thread by ID
func (*Service) GetThreadEventListener ¶
func (s *Service) GetThreadEventListener() *natslistener.ThreadEventListener
GetThreadEventListener returns the thread event listener for external handler registration
func (*Service) HandleThreadEvent ¶
func (s *Service) HandleThreadEvent(nc *nats.Conn, pool *pgxpool.Pool) func(ctx context.Context, envelope events.ThreadEventEnvelope)
HandleThreadEvent processes unified thread event notifications and broadcasts to streams
func (*Service) ListLlmMessages ¶
func (s *Service) ListLlmMessages( ctx context.Context, req *connect.Request[threadv1.ListLlmMessagesRequest], ) (*connect.Response[threadv1.ListLlmMessagesResponse], error)
ListLlmMessages lists all messages of a thread
func (*Service) ListThreads ¶
func (s *Service) ListThreads( ctx context.Context, req *connect.Request[threadv1.ListThreadsRequest], ) (*connect.Response[threadv1.ListThreadsResponse], error)
ListThreads lists threads for a user
func (*Service) NewServiceHandler ¶
NewServiceHandler returns a connect service path and handler
func (*Service) StreamThreadEvents ¶
func (s *Service) StreamThreadEvents( ctx context.Context, req *connect.Request[threadv1.StreamThreadEventsRequest], stream *connect.ServerStream[threadv1.StreamThreadEventsResponse], ) error
StreamThreadEvents streams thread events (content, tool calls, state changes) for a thread
func (*Service) SubmitUserMessage ¶
func (s *Service) SubmitUserMessage( ctx context.Context, req *connect.Request[threadv1.SubmitUserMessageRequest], ) (*connect.Response[threadv1.LlmMessage], error)
SubmitUserMessage submits a new user message to a thread
type StreamManager ¶
type StreamManager struct {
// contains filtered or unexported fields
}
func NewStreamManager ¶
func NewStreamManager(m *metrics.Metrics, log *logger.Logger) *StreamManager
func (*StreamManager) BroadcastContentDelta ¶
func (sm *StreamManager) BroadcastContentDelta(threadUID uuid.UUID, event *threadv1.ContentDeltaEvent)
BroadcastContentDelta sends a content delta event to all streams subscribed to a thread.
func (*StreamManager) BroadcastContentStart ¶
func (sm *StreamManager) BroadcastContentStart(threadUID uuid.UUID, event *threadv1.ContentStartEvent)
BroadcastContentStart sends a content start event to all streams subscribed to a thread. If a stream fails to send, it is automatically unsubscribed.
func (*StreamManager) BroadcastContentStop ¶
func (sm *StreamManager) BroadcastContentStop(threadUID uuid.UUID, event *threadv1.ContentStopEvent)
BroadcastContentStop sends a content stop event to all streams subscribed to a thread.
func (*StreamManager) BroadcastStreamError ¶
func (sm *StreamManager) BroadcastStreamError(threadUID uuid.UUID, event *threadv1.StreamErrorEvent)
BroadcastStreamError sends a stream error event to all streams subscribed to a thread.
func (*StreamManager) BroadcastThreadStateChange ¶
func (sm *StreamManager) BroadcastThreadStateChange(threadUID uuid.UUID, event *threadv1.ThreadStateChangeEvent)
BroadcastThreadStateChange sends a thread state change event to all streams subscribed to a thread.
func (*StreamManager) BroadcastToolCall ¶
func (sm *StreamManager) BroadcastToolCall(threadUID uuid.UUID, toolCall *toolv1.ToolCall)
BroadcastToolCall sends a tool call to all streams subscribed to a thread.
func (*StreamManager) Subscribe ¶
func (sm *StreamManager) Subscribe(ctx context.Context, threadUID uuid.UUID, stream StreamSender) func()
Subscribe registers a stream to receive updates for a specific thread. Returns an unsubscribe function that should be called when the stream closes.
func (*StreamManager) Unsubscribe ¶
func (sm *StreamManager) Unsubscribe(threadUID uuid.UUID, sub *streamSubscription)
Unsubscribe removes a stream subscription.
type StreamSender ¶
type StreamSender = streaming.Sender[threadv1.StreamThreadEventsResponse]
StreamSender is an alias for the generic streaming.Sender interface specialized for thread events.