pftools

package
v0.1.14 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 29, 2025 License: Apache-2.0 Imports: 19 Imported by: 0

Documentation

Index

Constants

View Source
const (
	CustomRuntimeOptionsEnvMcpToolNameKey        = "MCP_TOOL_NAME"
	CustomRuntimeOptionsEnvMcpToolDescriptionKey = "MCP_TOOL_DESCRIPTION"
)

Variables

View Source
var (
	ErrFunctionNotFound = errors.New("function not found")
	ErrNotOurMessage    = errors.New("not our message")
)
View Source
var DefaultStringSchema = &mcp.ToolInputSchema{
	Type: "object",
	Properties: map[string]interface{}{
		"payload": map[string]interface{}{
			"type":        "string",
			"description": "The payload of the message, in plain text format",
		},
	},
}
View Source
var DefaultStringSchemaInfo = &SchemaInfo{
	Type: "STRING",
	Definition: map[string]interface{}{
		"type": "string",
	},
	PulsarSchemaInfo: &utils.SchemaInfo{
		Type: "STRING",
	},
}

Functions

func ConvertSchemaToToolInput

func ConvertSchemaToToolInput(schemaInfo *SchemaInfo) (*mcp.ToolInputSchema, error)

ConvertSchemaToToolInput converts a schema to MCP tool input schema

func GetPulsarTypeSchema

func GetPulsarTypeSchema(schemaInfo *SchemaInfo) (pulsar.Schema, error)

func IsAuthError added in v0.1.14

func IsAuthError(err error) bool

func IsClusterUnhealthy added in v0.1.13

func IsClusterUnhealthy(err error) bool

IsClusterUnhealthy checks if an error indicates cluster health issues

Types

type CircuitBreaker

type CircuitBreaker struct {
	// contains filtered or unexported fields
}

func NewCircuitBreaker

func NewCircuitBreaker(threshold int, resetTimeout time.Duration) *CircuitBreaker

NewCircuitBreaker creates a new circuit breaker

func (*CircuitBreaker) AllowRequest

func (cb *CircuitBreaker) AllowRequest() bool

AllowRequest determines if a request should be allowed

func (*CircuitBreaker) ForceClose

func (cb *CircuitBreaker) ForceClose()

ForceClose forces the circuit breaker to close

func (*CircuitBreaker) ForceOpen

func (cb *CircuitBreaker) ForceOpen()

ForceOpen forces the circuit breaker to open

func (*CircuitBreaker) GetState

func (cb *CircuitBreaker) GetState() CircuitState

GetState returns the current state of the circuit breaker

func (*CircuitBreaker) GetStateString

func (cb *CircuitBreaker) GetStateString() string

GetStateString returns a string representation of the circuit breaker state

func (*CircuitBreaker) RecordFailure

func (cb *CircuitBreaker) RecordFailure()

RecordFailure records a failed operation

func (*CircuitBreaker) RecordSuccess

func (cb *CircuitBreaker) RecordSuccess()

RecordSuccess records a successful operation

func (*CircuitBreaker) Reset

func (cb *CircuitBreaker) Reset()

Reset resets the circuit breaker to closed state

func (*CircuitBreaker) String

func (cb *CircuitBreaker) String() string

String returns a string representation of the circuit breaker

type CircuitState

type CircuitState int
const (
	StateOpen CircuitState = iota
	StateHalfOpen
	StateClosed
)

type ClusterErrorHandler added in v0.1.13

type ClusterErrorHandler func(*PulsarFunctionManager, error)

type FunctionInvoker

type FunctionInvoker struct {
	// contains filtered or unexported fields
}

FunctionInvoker handles function invocation and result tracking

func NewFunctionInvoker

func NewFunctionInvoker(manager *PulsarFunctionManager) *FunctionInvoker

NewFunctionInvoker creates a new FunctionInvoker

func (*FunctionInvoker) InvokeFunctionAndWait

func (fi *FunctionInvoker) InvokeFunctionAndWait(ctx context.Context, fnTool *FunctionTool, params map[string]interface{}) (*mcp.CallToolResult, error)

InvokeFunctionAndWait sends a message to the function and waits for the result

type FunctionResult

type FunctionResult struct {
	Data  string
	Error error
}

FunctionResult represents the result of a function invocation

type FunctionTool

type FunctionTool struct {
	Name               string
	Function           *utils.FunctionConfig
	InputSchema        *SchemaInfo
	OutputSchema       *SchemaInfo
	InputTopic         string
	OutputTopic        string
	Tool               mcp.Tool
	SchemaFetchSuccess bool
}

type ManagerOptions

type ManagerOptions struct {
	PollInterval        time.Duration
	DefaultTimeout      time.Duration
	FailureThreshold    int
	ResetTimeout        time.Duration
	TenantNamespaces    []string
	StrictExport        bool
	ClusterErrorHandler ClusterErrorHandler
}

func DefaultManagerOptions

func DefaultManagerOptions() *ManagerOptions

type PulsarFunctionManager

type PulsarFunctionManager struct {
	// contains filtered or unexported fields
}

PulsarFunctionManager manages the lifecycle of Pulsar Functions as MCP tools

func NewPulsarFunctionManager

func NewPulsarFunctionManager(snServer *Server, readOnly bool, options *ManagerOptions, sessionID string) (*PulsarFunctionManager, error)

NewPulsarFunctionManager creates a new PulsarFunctionManager

func (*PulsarFunctionManager) GetProducer

func (m *PulsarFunctionManager) GetProducer(topic string) (pulsarclient.Producer, error)

GetProducer retrieves a producer from the cache or creates a new one if not found.

func (*PulsarFunctionManager) Start

func (m *PulsarFunctionManager) Start()

Start starts polling for functions

func (*PulsarFunctionManager) Stop

func (m *PulsarFunctionManager) Stop()

Stop stops polling for functions

type SchemaInfo

type SchemaInfo struct {
	Type             string
	Definition       map[string]interface{}
	PulsarSchemaInfo *utils.SchemaInfo
}

func GetSchemaFromTopic

func GetSchemaFromTopic(admin cmdutils.Client, topic string) (*SchemaInfo, error)

GetSchemaFromTopic retrieves schema information from a topic

type Server

type Server struct {
	MCPServer     *server.MCPServer
	KafkaSession  *kafka.Session
	PulsarSession *pulsar.Session
	Logger        interface{}
}

Server is imported directly to avoid circular dependency

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL