Documentation
¶
Index ¶
- Constants
- Variables
- func ConvertSchemaToToolInput(schemaInfo *SchemaInfo) (*mcp.ToolInputSchema, error)
- func GetPulsarTypeSchema(schemaInfo *SchemaInfo) (pulsar.Schema, error)
- func IsClusterUnhealthy(err error) bool
- type CircuitBreaker
- func (cb *CircuitBreaker) AllowRequest() bool
- func (cb *CircuitBreaker) ForceClose()
- func (cb *CircuitBreaker) ForceOpen()
- func (cb *CircuitBreaker) GetState() CircuitState
- func (cb *CircuitBreaker) GetStateString() string
- func (cb *CircuitBreaker) RecordFailure()
- func (cb *CircuitBreaker) RecordSuccess()
- func (cb *CircuitBreaker) Reset()
- func (cb *CircuitBreaker) String() string
- type CircuitState
- type ClusterErrorHandler
- type FunctionInvoker
- type FunctionResult
- type FunctionTool
- type ManagerOptions
- type PulsarFunctionManager
- type SchemaInfo
- type Server
Constants ¶
const ( CustomRuntimeOptionsEnvMcpToolNameKey = "MCP_TOOL_NAME" CustomRuntimeOptionsEnvMcpToolDescriptionKey = "MCP_TOOL_DESCRIPTION" )
Variables ¶
var ( ErrFunctionNotFound = errors.New("function not found") ErrNotOurMessage = errors.New("not our message") )
var DefaultStringSchema = &mcp.ToolInputSchema{ Type: "object", Properties: map[string]interface{}{ "payload": map[string]interface{}{ "type": "string", "description": "The payload of the message, in plain text format", }, }, }
var DefaultStringSchemaInfo = &SchemaInfo{ Type: "STRING", Definition: map[string]interface{}{ "type": "string", }, PulsarSchemaInfo: &utils.SchemaInfo{ Type: "STRING", }, }
Functions ¶
func ConvertSchemaToToolInput ¶
func ConvertSchemaToToolInput(schemaInfo *SchemaInfo) (*mcp.ToolInputSchema, error)
ConvertSchemaToToolInput converts a schema to MCP tool input schema
func GetPulsarTypeSchema ¶
func GetPulsarTypeSchema(schemaInfo *SchemaInfo) (pulsar.Schema, error)
func IsClusterUnhealthy ¶ added in v0.1.13
IsClusterUnhealthy checks if an error indicates cluster health issues
Types ¶
type CircuitBreaker ¶
type CircuitBreaker struct {
// contains filtered or unexported fields
}
func NewCircuitBreaker ¶
func NewCircuitBreaker(threshold int, resetTimeout time.Duration) *CircuitBreaker
NewCircuitBreaker creates a new circuit breaker
func (*CircuitBreaker) AllowRequest ¶
func (cb *CircuitBreaker) AllowRequest() bool
AllowRequest determines if a request should be allowed
func (*CircuitBreaker) ForceClose ¶
func (cb *CircuitBreaker) ForceClose()
ForceClose forces the circuit breaker to close
func (*CircuitBreaker) ForceOpen ¶
func (cb *CircuitBreaker) ForceOpen()
ForceOpen forces the circuit breaker to open
func (*CircuitBreaker) GetState ¶
func (cb *CircuitBreaker) GetState() CircuitState
GetState returns the current state of the circuit breaker
func (*CircuitBreaker) GetStateString ¶
func (cb *CircuitBreaker) GetStateString() string
GetStateString returns a string representation of the circuit breaker state
func (*CircuitBreaker) RecordFailure ¶
func (cb *CircuitBreaker) RecordFailure()
RecordFailure records a failed operation
func (*CircuitBreaker) RecordSuccess ¶
func (cb *CircuitBreaker) RecordSuccess()
RecordSuccess records a successful operation
func (*CircuitBreaker) Reset ¶
func (cb *CircuitBreaker) Reset()
Reset resets the circuit breaker to closed state
func (*CircuitBreaker) String ¶
func (cb *CircuitBreaker) String() string
String returns a string representation of the circuit breaker
type CircuitState ¶
type CircuitState int
const ( StateOpen CircuitState = iota StateHalfOpen StateClosed )
type ClusterErrorHandler ¶ added in v0.1.13
type ClusterErrorHandler func(*PulsarFunctionManager, error)
type FunctionInvoker ¶
type FunctionInvoker struct {
// contains filtered or unexported fields
}
FunctionInvoker handles function invocation and result tracking
func NewFunctionInvoker ¶
func NewFunctionInvoker(manager *PulsarFunctionManager) *FunctionInvoker
NewFunctionInvoker creates a new FunctionInvoker
func (*FunctionInvoker) InvokeFunctionAndWait ¶
func (fi *FunctionInvoker) InvokeFunctionAndWait(ctx context.Context, fnTool *FunctionTool, params map[string]interface{}) (*mcp.CallToolResult, error)
InvokeFunctionAndWait sends a message to the function and waits for the result
type FunctionResult ¶
FunctionResult represents the result of a function invocation
type FunctionTool ¶
type FunctionTool struct {
Name string
Function *utils.FunctionConfig
InputSchema *SchemaInfo
OutputSchema *SchemaInfo
InputTopic string
OutputTopic string
Tool mcp.Tool
SchemaFetchSuccess bool
}
type ManagerOptions ¶
type ManagerOptions struct {
PollInterval time.Duration
DefaultTimeout time.Duration
FailureThreshold int
ResetTimeout time.Duration
TenantNamespaces []string
StrictExport bool
ClusterErrorHandler ClusterErrorHandler
}
func DefaultManagerOptions ¶
func DefaultManagerOptions() *ManagerOptions
type PulsarFunctionManager ¶
type PulsarFunctionManager struct {
// contains filtered or unexported fields
}
PulsarFunctionManager manages the lifecycle of Pulsar Functions as MCP tools
func NewPulsarFunctionManager ¶
func NewPulsarFunctionManager(snServer *Server, readOnly bool, options *ManagerOptions, sessionID string) (*PulsarFunctionManager, error)
NewPulsarFunctionManager creates a new PulsarFunctionManager
func (*PulsarFunctionManager) GetProducer ¶
func (m *PulsarFunctionManager) GetProducer(topic string) (pulsarclient.Producer, error)
GetProducer retrieves a producer from the cache or creates a new one if not found.
func (*PulsarFunctionManager) Start ¶
func (m *PulsarFunctionManager) Start()
Start starts polling for functions
func (*PulsarFunctionManager) Stop ¶
func (m *PulsarFunctionManager) Stop()
Stop stops polling for functions
type SchemaInfo ¶
type SchemaInfo struct {
Type string
Definition map[string]interface{}
PulsarSchemaInfo *utils.SchemaInfo
}
func GetSchemaFromTopic ¶
func GetSchemaFromTopic(admin cmdutils.Client, topic string) (*SchemaInfo, error)
GetSchemaFromTopic retrieves schema information from a topic