Documentation
¶
Overview ¶
Package core implements Genkit actions and other essential machinery. This package is primarily intended for Genkit internals and for plugins. Genkit applications should use the genkit package.
Package core implements Genkit's foundational action system and runtime machinery.
This package is primarily intended for plugin developers and Genkit internals. Application developers should use the genkit package instead, which provides a higher-level, more convenient API.
Actions ¶
Actions are the fundamental building blocks of Genkit. Every operation - flows, model calls, tool invocations, retrieval - is implemented as an action. Actions provide:
- Type-safe input/output with JSON schema validation
- Automatic tracing and observability
- Consistent error handling
- Registration in the action registry
Define a non-streaming action:
action := core.DefineAction(registry, "myAction",
func(ctx context.Context, input string) (string, error) {
return "processed: " + input, nil
},
)
result, err := action.Run(context.Background(), "hello")
Define a streaming action that sends chunks during execution:
streamingAction := core.DefineStreamingAction(registry, "countdown",
func(ctx context.Context, start int, cb core.StreamCallback[string]) (string, error) {
for i := start; i > 0; i-- {
if cb != nil {
if err := cb(ctx, fmt.Sprintf("T-%d", i)); err != nil {
return "", err
}
}
time.Sleep(time.Second)
}
return "Liftoff!", nil
},
)
Flows ¶
Flows are user-defined actions that orchestrate AI operations. They are the primary way application developers define business logic in Genkit:
flow := core.DefineFlow(registry, "myFlow",
func(ctx context.Context, input string) (string, error) {
// Use Run to create traced sub-steps
result, err := core.Run(ctx, "step1", func() (string, error) {
return process(input), nil
})
if err != nil {
return "", err
}
return result, nil
},
)
Streaming flows can send intermediate results to callers:
streamingFlow := core.DefineStreamingFlow(registry, "generateReport",
func(ctx context.Context, input Input, cb core.StreamCallback[Progress]) (Report, error) {
for i := 0; i < 100; i += 10 {
if cb != nil {
cb(ctx, Progress{Percent: i})
}
// ... work ...
}
return Report{...}, nil
},
)
Traced Steps with Run ¶
Use Run within flows to create traced sub-operations. Each Run call creates a span in the trace that's visible in the Genkit Developer UI:
result, err := core.Run(ctx, "fetchData", func() (Data, error) {
return fetchFromAPI()
})
processed, err := core.Run(ctx, "processData", func() (Result, error) {
return process(result)
})
Middleware ¶
Actions support middleware for cross-cutting concerns like logging, metrics, or authentication:
loggingMiddleware := func(next core.StreamingFunc[string, string, struct{}]) core.StreamingFunc[string, string, struct{}] {
return func(ctx context.Context, input string, cb core.StreamCallback[struct{}]) (string, error) {
log.Printf("Input: %s", input)
output, err := next(ctx, input, cb)
log.Printf("Output: %s, Error: %v", output, err)
return output, err
}
}
Chain multiple middleware together:
combined := core.ChainMiddleware(loggingMiddleware, metricsMiddleware) wrappedFn := combined(originalFunc)
Schema Management ¶
Register JSON schemas for use in prompts and validation:
// Define a schema from a map
core.DefineSchema(registry, "Person", map[string]any{
"type": "object",
"properties": map[string]any{
"name": map[string]any{"type": "string"},
"age": map[string]any{"type": "integer"},
},
"required": []any{"name"},
})
// Define a schema from a Go type (recommended)
core.DefineSchemaFor[Person](registry)
Schemas can be referenced in .prompt files by name.
Plugin Development ¶
Plugins extend Genkit's functionality by providing models, tools, retrievers, and other capabilities. Implement the api.Plugin interface:
type MyPlugin struct {
APIKey string
}
func (p *MyPlugin) Name() string {
return "myplugin"
}
func (p *MyPlugin) Init(ctx context.Context) []api.Action {
// Initialize the plugin and return actions to register
model := ai.DefineModel(...)
tool := ai.DefineTool(...)
return []api.Action{model, tool}
}
For plugins that resolve actions dynamically (e.g., listing available models from an API), implement api.DynamicPlugin:
type DynamicModelPlugin struct{}
func (p *DynamicModelPlugin) ListActions(ctx context.Context) []api.ActionDesc {
// Return descriptors of available actions
return []api.ActionDesc{
{Key: "/model/myplugin/model-a", Name: "model-a"},
{Key: "/model/myplugin/model-b", Name: "model-b"},
}
}
func (p *DynamicModelPlugin) ResolveAction(atype api.ActionType, name string) api.Action {
// Create and return the action on demand
return createModel(name)
}
Background Actions ¶
For long-running operations, use background actions that return immediately with an operation ID that can be polled for completion:
bgAction := core.DefineBackgroundAction(registry, "longTask",
func(ctx context.Context, input Input) (Output, error) {
// Start the operation
return startLongOperation(input)
},
func(ctx context.Context, op *core.Operation[Output]) (*core.Operation[Output], error) {
// Check operation status
return checkOperationStatus(op)
},
)
Error Handling ¶
Return user-facing errors with appropriate status codes:
if err := validate(input); err != nil {
return nil, core.NewPublicError(core.INVALID_ARGUMENT, "Invalid input", map[string]any{
"field": "email",
"error": err.Error(),
})
}
For internal errors that should be logged but not exposed to users:
return nil, core.NewError(core.INTERNAL, "database connection failed: %v", err)
Context ¶
Access action context for metadata and configuration:
ctx := core.FromContext(ctx)
if ctx != nil {
// Access action-specific context values
}
Set action context for nested operations:
ctx = core.WithActionContext(ctx, core.ActionContext{
"requestId": requestID,
})
For more information, see https://genkit.dev/docs/plugins
Package core provides base error types and utilities for Genkit.
Package status defines canonical status codes, names, and related types inspired by gRPC status codes.
Index ¶
- Constants
- Variables
- func DefineSchema(r api.Registry, name string, schema map[string]any)
- func DefineSchemaFor[T any](r api.Registry)
- func FlowNameFromContext(ctx context.Context) string
- func HTTPStatusCode(name StatusName) int
- func InferSchemaMap(value any) map[string]any
- func ResolveSchema(r api.Registry, schema map[string]any) (map[string]any, error)
- func Run[Out any](ctx context.Context, name string, fn func() (Out, error)) (Out, error)
- func SchemaRef(name string) map[string]any
- func WithActionContext(ctx context.Context, actionCtx ActionContext) context.Context
- type ActionContext
- type ActionDef
- func DefineAction[In, Out any](r api.Registry, name string, atype api.ActionType, metadata map[string]any, ...) *ActionDef[In, Out, struct{}]
- func DefineStreamingAction[In, Out, Stream any](r api.Registry, name string, atype api.ActionType, metadata map[string]any, ...) *ActionDef[In, Out, Stream]
- func LookupActionFor[In, Out, Stream any](r api.Registry, atype api.ActionType, name string) *ActionDef[In, Out, Stream]deprecated
- func NewAction[In, Out any](name string, atype api.ActionType, metadata map[string]any, ...) *ActionDef[In, Out, struct{}]
- func NewStreamingAction[In, Out, Stream any](name string, atype api.ActionType, metadata map[string]any, ...) *ActionDef[In, Out, Stream]
- func ResolveActionFor[In, Out, Stream any](r api.Registry, atype api.ActionType, name string) *ActionDef[In, Out, Stream]
- func (a *ActionDef[In, Out, Stream]) Desc() api.ActionDesc
- func (a *ActionDef[In, Out, Stream]) Name() string
- func (a *ActionDef[In, Out, Stream]) Register(r api.Registry)
- func (a *ActionDef[In, Out, Stream]) Run(ctx context.Context, input In, cb StreamCallback[Stream]) (output Out, err error)
- func (a *ActionDef[In, Out, Stream]) RunJSON(ctx context.Context, input json.RawMessage, cb StreamCallback[json.RawMessage]) (json.RawMessage, error)
- func (a *ActionDef[In, Out, Stream]) RunJSONWithTelemetry(ctx context.Context, input json.RawMessage, cb StreamCallback[json.RawMessage]) (*api.ActionRunResult[json.RawMessage], error)
- type BackgroundActionDef
- func DefineBackgroundAction[In, Out any](r api.Registry, name string, atype api.ActionType, metadata map[string]any, ...) *BackgroundActionDef[In, Out]
- func LookupBackgroundAction[In, Out any](r api.Registry, key string) *BackgroundActionDef[In, Out]
- func NewBackgroundAction[In, Out any](name string, atype api.ActionType, metadata map[string]any, ...) *BackgroundActionDef[In, Out]
- func (b *BackgroundActionDef[In, Out]) Cancel(ctx context.Context, op *Operation[Out]) (*Operation[Out], error)
- func (b *BackgroundActionDef[In, Out]) Check(ctx context.Context, op *Operation[Out]) (*Operation[Out], error)
- func (b *BackgroundActionDef[In, Out]) Register(r api.Registry)
- func (b *BackgroundActionDef[In, Out]) Start(ctx context.Context, input In) (*Operation[Out], error)
- func (b *BackgroundActionDef[In, Out]) SupportsCancel() bool
- type CancelOpFunc
- type CheckOpFunc
- type ContextProvider
- type Flow
- func (f *Flow[In, Out, Stream]) Desc() api.ActionDesc
- func (f *Flow[In, Out, Stream]) Name() string
- func (f *Flow[In, Out, Stream]) Register(r api.Registry)
- func (f *Flow[In, Out, Stream]) Run(ctx context.Context, input In) (Out, error)
- func (f *Flow[In, Out, Stream]) RunJSON(ctx context.Context, input json.RawMessage, cb StreamCallback[json.RawMessage]) (json.RawMessage, error)
- func (f *Flow[In, Out, Stream]) RunJSONWithTelemetry(ctx context.Context, input json.RawMessage, cb StreamCallback[json.RawMessage]) (*api.ActionRunResult[json.RawMessage], error)
- func (f *Flow[In, Out, Stream]) Stream(ctx context.Context, input In) func(func(*StreamingFlowValue[Out, Stream], error) bool)
- type Func
- type GenkitError
- type Middleware
- type Operation
- type ReflectionError
- type ReflectionErrorDetails
- type RequestData
- type StartOpFunc
- type Status
- type StatusName
- type StreamCallback
- type StreamingFlowValue
- type StreamingFunc
- type UserFacingError
Examples ¶
Constants ¶
const ( // CodeOK means not an error; returned on success. CodeOK = 0 // CodeCancelled means the operation was cancelled, typically by the caller. CodeCancelled = 1 // CodeUnknown means an unknown error occurred. CodeUnknown = 2 // CodeInvalidArgument means the client specified an invalid argument. CodeInvalidArgument = 3 // CodeDeadlineExceeded means the deadline expired before the operation could complete. CodeDeadlineExceeded = 4 // CodeNotFound means some requested entity (e.g., file or directory) was not found. CodeNotFound = 5 // CodeAlreadyExists means the entity that a client attempted to create already exists. CodeAlreadyExists = 6 // CodePermissionDenied means the caller does not have permission to execute the operation. CodePermissionDenied = 7 // CodeUnauthenticated means the request does not have valid authentication credentials. CodeUnauthenticated = 16 // CodeResourceExhausted means some resource has been exhausted. CodeResourceExhausted = 8 // CodeFailedPrecondition means the operation was rejected because the system is not in a state required. CodeFailedPrecondition = 9 // CodeAborted means the operation was aborted, typically due to some issue. CodeAborted = 10 // CodeOutOfRange means the operation was attempted past the valid range. CodeOutOfRange = 11 // CodeUnimplemented means the operation is not implemented or not supported/enabled. CodeUnimplemented = 12 // CodeInternal means internal errors. Some invariants expected by the underlying system were broken. CodeInternal = 13 CodeUnavailable = 14 // CodeDataLoss means unrecoverable data loss or corruption. CodeDataLoss = 15 )
Constants for canonical status codes (integer values).
Variables ¶
var StatusNameToCode = map[StatusName]int{ OK: CodeOK, CANCELLED: CodeCancelled, UNKNOWN: CodeUnknown, INVALID_ARGUMENT: CodeInvalidArgument, DEADLINE_EXCEEDED: CodeDeadlineExceeded, NOT_FOUND: CodeNotFound, ALREADY_EXISTS: CodeAlreadyExists, PERMISSION_DENIED: CodePermissionDenied, UNAUTHENTICATED: CodeUnauthenticated, RESOURCE_EXHAUSTED: CodeResourceExhausted, FAILED_PRECONDITION: CodeFailedPrecondition, ABORTED: CodeAborted, OUT_OF_RANGE: CodeOutOfRange, UNIMPLEMENTED: CodeUnimplemented, INTERNAL: CodeInternal, UNAVAILABLE: CodeUnavailable, DATA_LOSS: CodeDataLoss, }
StatusNameToCode maps status names to their integer code values. Exported for potential use elsewhere if needed.
Functions ¶
func DefineSchema ¶ added in v1.3.0
DefineSchema defines a named JSON schema and registers it in the registry. The `schema` argument must be a JSON schema definition represented as a map. It panics if a schema with the same name is already registered.
Example ¶
This example demonstrates defining a schema from a map.
package main
import (
"fmt"
"github.com/firebase/genkit/go/core"
"github.com/firebase/genkit/go/internal/registry"
)
func main() {
r := registry.New()
// Define a JSON schema as a map
core.DefineSchema(r, "Address", map[string]any{
"type": "object",
"properties": map[string]any{
"street": map[string]any{"type": "string"},
"city": map[string]any{"type": "string"},
"zip": map[string]any{"type": "string"},
},
"required": []any{"street", "city"},
})
fmt.Println("Schema registered: Address")
}
Output: Schema registered: Address
func DefineSchemaFor ¶ added in v1.3.0
DefineSchemaFor defines a named JSON schema derived from a Go type and registers it in the registry using the type's name.
Example ¶
This example demonstrates defining a schema from a Go type.
package main
import (
"fmt"
"github.com/firebase/genkit/go/core"
"github.com/firebase/genkit/go/internal/registry"
)
func main() {
r := registry.New()
// Define a struct type
type Person struct {
Name string `json:"name"`
Age int `json:"age"`
}
// Register the schema
core.DefineSchemaFor[Person](r)
// The schema is now registered and can be referenced in .prompt files
fmt.Println("Schema registered")
}
Output: Schema registered
func FlowNameFromContext ¶ added in v1.0.0
FlowNameFromContext returns the flow name from context if we're in a flow, empty string otherwise.
func HTTPStatusCode ¶ added in v0.5.3
func HTTPStatusCode(name StatusName) int
HTTPStatusCode gets the corresponding HTTP status code for a given Genkit status name.
func InferSchemaMap ¶ added in v0.7.0
InferSchemaMap infers a JSON schema from a Go value and converts it to a map.
func ResolveSchema ¶ added in v1.3.0
ResolveSchema resolves a schema that may contain a $ref to a registered schema. If the schema contains a $ref with the "genkit:" prefix, it looks up the schema by name. Returns the original schema if no $ref is present, or the resolved schema if found. Returns an error if the schema reference cannot be resolved.
func Run ¶ added in v0.3.0
Run runs the function f in the context of the current flow and returns what f returns. It returns an error if no flow is active.
Each call to Run results in a new step in the flow. A step has its own span in the trace, and its result is cached so that if the flow is restarted, f will not be called a second time.
Example ¶
This example demonstrates using Run to create traced sub-steps.
package main
import (
"context"
"fmt"
"strings"
"github.com/firebase/genkit/go/core"
"github.com/firebase/genkit/go/internal/registry"
)
func main() {
r := registry.New()
// Define a flow that uses Run for traced steps
flow := core.DefineFlow(r, "pipeline",
func(ctx context.Context, input string) (string, error) {
// Each Run creates a traced step visible in the Dev UI
upper, err := core.Run(ctx, "toUpper", func() (string, error) {
return strings.ToUpper(input), nil
})
if err != nil {
return "", err
}
result, err := core.Run(ctx, "addPrefix", func() (string, error) {
return "RESULT: " + upper, nil
})
return result, err
},
)
result, err := flow.Run(context.Background(), "hello")
if err != nil {
fmt.Println("Error:", err)
return
}
fmt.Println(result)
}
Output: RESULT: HELLO
func WithActionContext ¶ added in v0.3.0
func WithActionContext(ctx context.Context, actionCtx ActionContext) context.Context
WithActionContext returns a new Context with Action runtime context (side channel data) value set.
Types ¶
type ActionContext ¶ added in v0.3.0
ActionContext is the runtime context for an Action.
func FromContext ¶ added in v0.3.0
func FromContext(ctx context.Context) ActionContext
FromContext returns the Action runtime context (side channel data) from context.
type ActionDef ¶ added in v0.3.0
type ActionDef[In, Out, Stream any] struct { // contains filtered or unexported fields }
An ActionDef is a named, observable operation that underlies all Genkit primitives. It consists of a function that takes an input of type I and returns an output of type O, optionally streaming values of type S incrementally by invoking a callback. It optionally has other metadata, like a description and JSON Schemas for its input and output which it validates against.
Each time an ActionDef is run, it results in a new trace span.
For internal use only.
func DefineAction ¶
func DefineAction[In, Out any]( r api.Registry, name string, atype api.ActionType, metadata map[string]any, inputSchema map[string]any, fn Func[In, Out], ) *ActionDef[In, Out, struct{}]
DefineAction creates a new non-streaming Action and registers it. If inputSchema is nil, it is inferred from the function's input api.
func DefineStreamingAction ¶
func DefineStreamingAction[In, Out, Stream any]( r api.Registry, name string, atype api.ActionType, metadata map[string]any, inputSchema map[string]any, fn StreamingFunc[In, Out, Stream], ) *ActionDef[In, Out, Stream]
DefineStreamingAction creates a new streaming action and registers it. If inputSchema is nil, it is inferred from the function's input api.
func LookupActionFor
deprecated
func LookupActionFor[In, Out, Stream any](r api.Registry, atype api.ActionType, name string) *ActionDef[In, Out, Stream]
LookupActionFor returns the action for the given key in the global registry, or nil if there is none. It panics if the action is of the wrong api.
Deprecated: Use ResolveActionFor.
func NewAction ¶ added in v0.6.0
func NewAction[In, Out any]( name string, atype api.ActionType, metadata map[string]any, inputSchema map[string]any, fn Func[In, Out], ) *ActionDef[In, Out, struct{}]
NewAction creates a new non-streaming [Action] without registering it. If inputSchema is nil, it is inferred from the function's input api.
func NewStreamingAction ¶ added in v0.7.0
func NewStreamingAction[In, Out, Stream any]( name string, atype api.ActionType, metadata map[string]any, inputSchema map[string]any, fn StreamingFunc[In, Out, Stream], ) *ActionDef[In, Out, Stream]
NewStreamingAction creates a new streaming [Action] without registering it. If inputSchema is nil, it is inferred from the function's input api.
func ResolveActionFor ¶ added in v0.6.2
func ResolveActionFor[In, Out, Stream any](r api.Registry, atype api.ActionType, name string) *ActionDef[In, Out, Stream]
ResolveActionFor returns the action for the given key in the global registry, or nil if there is none. It panics if the action is of the wrong api.
func (*ActionDef[In, Out, Stream]) Desc ¶ added in v0.3.0
func (a *ActionDef[In, Out, Stream]) Desc() api.ActionDesc
Desc returns a descriptor of the action with resolved schema references.
func (*ActionDef[In, Out, Stream]) Register ¶ added in v0.7.0
Register registers the action with the given registry.
func (*ActionDef[In, Out, Stream]) Run ¶ added in v0.3.0
func (a *ActionDef[In, Out, Stream]) Run(ctx context.Context, input In, cb StreamCallback[Stream]) (output Out, err error)
Run executes the Action's function in a new trace span.
func (*ActionDef[In, Out, Stream]) RunJSON ¶ added in v0.3.0
func (a *ActionDef[In, Out, Stream]) RunJSON(ctx context.Context, input json.RawMessage, cb StreamCallback[json.RawMessage]) (json.RawMessage, error)
RunJSON runs the action with a JSON input, and returns a JSON result.
func (*ActionDef[In, Out, Stream]) RunJSONWithTelemetry ¶ added in v1.0.3
func (a *ActionDef[In, Out, Stream]) RunJSONWithTelemetry(ctx context.Context, input json.RawMessage, cb StreamCallback[json.RawMessage]) (*api.ActionRunResult[json.RawMessage], error)
RunJSONWithTelemetry runs the action with a JSON input, and returns a JSON result along with telemetry info.
type BackgroundActionDef ¶ added in v1.3.0
type BackgroundActionDef[In, Out any] struct { *ActionDef[In, *Operation[Out], struct{}] // contains filtered or unexported fields }
BackgroundActionDef is a background action that can be used to start, check, and cancel background operations.
For internal use only.
func DefineBackgroundAction ¶ added in v1.3.0
func DefineBackgroundAction[In, Out any]( r api.Registry, name string, atype api.ActionType, metadata map[string]any, startFn StartOpFunc[In, Out], checkFn CheckOpFunc[Out], cancelFn CancelOpFunc[Out], ) *BackgroundActionDef[In, Out]
DefineBackgroundAction creates and registers a background action with three component actions
func LookupBackgroundAction ¶ added in v1.3.0
func LookupBackgroundAction[In, Out any](r api.Registry, key string) *BackgroundActionDef[In, Out]
LookupBackgroundAction looks up a background action by key (which includes the action type, provider, and name).
func NewBackgroundAction ¶ added in v1.3.0
func NewBackgroundAction[In, Out any]( name string, atype api.ActionType, metadata map[string]any, startFn StartOpFunc[In, Out], checkFn CheckOpFunc[Out], cancelFn CancelOpFunc[Out], ) *BackgroundActionDef[In, Out]
NewBackgroundAction creates a new background action without registering it.
func (*BackgroundActionDef[In, Out]) Cancel ¶ added in v1.3.0
func (b *BackgroundActionDef[In, Out]) Cancel(ctx context.Context, op *Operation[Out]) (*Operation[Out], error)
Cancel attempts to cancel a background operation. It returns an error if the background action does not support cancellation.
func (*BackgroundActionDef[In, Out]) Check ¶ added in v1.3.0
func (b *BackgroundActionDef[In, Out]) Check(ctx context.Context, op *Operation[Out]) (*Operation[Out], error)
Check checks the status of a background operation.
func (*BackgroundActionDef[In, Out]) Register ¶ added in v1.3.0
func (b *BackgroundActionDef[In, Out]) Register(r api.Registry)
Register registers the model with the given registry.
func (*BackgroundActionDef[In, Out]) Start ¶ added in v1.3.0
func (b *BackgroundActionDef[In, Out]) Start(ctx context.Context, input In) (*Operation[Out], error)
Start starts a background operation.
func (*BackgroundActionDef[In, Out]) SupportsCancel ¶ added in v1.3.0
func (b *BackgroundActionDef[In, Out]) SupportsCancel() bool
SupportsCancel returns whether the background action supports cancellation.
type CancelOpFunc ¶ added in v1.3.0
CancelOpFunc cancels a background operation.
type CheckOpFunc ¶ added in v1.3.0
CheckOpFunc checks the status of a background operation.
type ContextProvider ¶ added in v0.3.0
type ContextProvider = func(ctx context.Context, req RequestData) (ActionContext, error)
ContextProvider is a function that returns an ActionContext for a given request. It is used to provide additional context to the Action.
type Flow ¶
A Flow is a user-defined Action. A Flow[In, Out, Stream] represents a function from In to Out. The Stream parameter is for flows that support streaming: providing their results incrementally.
func DefineFlow ¶ added in v0.3.0
func DefineFlow[In, Out any](r api.Registry, name string, fn Func[In, Out]) *Flow[In, Out, struct{}]
DefineFlow creates a Flow that runs fn, and registers it as an action. fn takes an input of type In and returns an output of type Out.
Example ¶
This example demonstrates defining a simple flow.
package main
import (
"context"
"fmt"
"strings"
"github.com/firebase/genkit/go/core"
"github.com/firebase/genkit/go/internal/registry"
)
func main() {
r := registry.New()
// Define a flow that processes input
flow := core.DefineFlow(r, "uppercase",
func(ctx context.Context, input string) (string, error) {
return strings.ToUpper(input), nil
},
)
// Run the flow
result, err := flow.Run(context.Background(), "hello")
if err != nil {
fmt.Println("Error:", err)
return
}
fmt.Println(result)
}
Output: HELLO
func DefineStreamingFlow ¶ added in v0.3.0
func DefineStreamingFlow[In, Out, Stream any](r api.Registry, name string, fn StreamingFunc[In, Out, Stream]) *Flow[In, Out, Stream]
DefineStreamingFlow creates a streaming Flow that runs fn, and registers it as an action.
fn takes an input of type In and returns an output of type Out, optionally streaming values of type Stream incrementally by invoking a callback.
If the function supports streaming and the callback is non-nil, it should stream the results by invoking the callback periodically, ultimately returning with a final return value that includes all the streamed data. Otherwise, it should ignore the callback and just return a result.
Example ¶
This example demonstrates defining a streaming flow.
package main
import (
"context"
"fmt"
"github.com/firebase/genkit/go/core"
"github.com/firebase/genkit/go/internal/registry"
)
func main() {
r := registry.New()
// Define a streaming flow that counts down
flow := core.DefineStreamingFlow(r, "countdown",
func(ctx context.Context, start int, cb core.StreamCallback[int]) (string, error) {
for i := start; i > 0; i-- {
if cb != nil {
if err := cb(ctx, i); err != nil {
return "", err
}
}
}
return "Done!", nil
},
)
// Use Stream() iterator to receive chunks
iter := flow.Stream(context.Background(), 3)
iter(func(val *core.StreamingFlowValue[string, int], err error) bool {
if err != nil {
fmt.Println("Error:", err)
return false
}
if val.Done {
fmt.Println("Result:", val.Output)
} else {
fmt.Println("Count:", val.Stream)
}
return true
})
}
Output: Count: 3 Count: 2 Count: 1 Result: Done!
func (*Flow[In, Out, Stream]) Desc ¶ added in v0.6.0
func (f *Flow[In, Out, Stream]) Desc() api.ActionDesc
Desc returns the descriptor of the flow.
func (*Flow[In, Out, Stream]) Register ¶ added in v0.7.0
Register registers the flow with the given registry.
func (*Flow[In, Out, Stream]) RunJSON ¶ added in v0.3.0
func (f *Flow[In, Out, Stream]) RunJSON(ctx context.Context, input json.RawMessage, cb StreamCallback[json.RawMessage]) (json.RawMessage, error)
RunJSON runs the flow with JSON input and streaming callback and returns the output as JSON.
func (*Flow[In, Out, Stream]) RunJSONWithTelemetry ¶ added in v1.0.3
func (f *Flow[In, Out, Stream]) RunJSONWithTelemetry(ctx context.Context, input json.RawMessage, cb StreamCallback[json.RawMessage]) (*api.ActionRunResult[json.RawMessage], error)
RunJSONWithTelemetry runs the flow with JSON input and streaming callback and returns the output as JSON along with telemetry info.
func (*Flow[In, Out, Stream]) Stream ¶
func (f *Flow[In, Out, Stream]) Stream(ctx context.Context, input In) func(func(*StreamingFlowValue[Out, Stream], error) bool)
Stream runs the flow in the context of another flow and streams the output. It returns a function whose argument function (the "yield function") will be repeatedly called with the results.
If the yield function is passed a non-nil error, the flow has failed with that error; the yield function will not be called again.
If the yield function's StreamingFlowValue argument has Done == true, the value's Output field contains the final output; the yield function will not be called again.
Otherwise the Stream field of the passed StreamingFlowValue holds a streamed result.
type Func ¶
Func is an alias for non-streaming functions with input of type In and output of type Out.
type GenkitError ¶ added in v0.5.3
type GenkitError struct {
Message string `json:"message"` // Exclude from default JSON if embedded elsewhere
Status StatusName `json:"status"`
HTTPCode int `json:"-"` // Exclude from default JSON
Details map[string]any `json:"details"` // Use map for arbitrary details
Source *string `json:"source,omitempty"` // Pointer for optional
}
GenkitError is the base error type for Genkit errors.
func NewError ¶ added in v0.5.3
func NewError(status StatusName, message string, args ...any) *GenkitError
NewError creates a new GenkitError with a stack trace.
func (*GenkitError) Error ¶ added in v0.5.3
func (e *GenkitError) Error() string
Error implements the standard error interface.
func (*GenkitError) ToReflectionError ¶ added in v0.5.3
func (e *GenkitError) ToReflectionError() ReflectionError
ToReflectionError returns a JSON-serializable representation for reflection API responses.
type Middleware ¶ added in v0.3.0
type Middleware[In, Out, Stream any] = func(StreamingFunc[In, Out, Stream]) StreamingFunc[In, Out, Stream]
Middleware is a function that wraps an action execution, similar to HTTP middleware. It can modify the input, output, and context, or perform side effects.
func ChainMiddleware ¶ added in v0.3.0
func ChainMiddleware[In, Out, Stream any](middlewares ...Middleware[In, Out, Stream]) Middleware[In, Out, Stream]
ChainMiddleware creates a new Middleware that applies a sequence of Middlewares, so that they execute in the given order when handling action request. In other words, ChainMiddleware(m1, m2)(handler) = m1(m2(handler))
Example ¶
This example demonstrates using ChainMiddleware to combine middleware.
package main
import (
"context"
"fmt"
"strings"
"github.com/firebase/genkit/go/core"
)
func main() {
// Define a middleware that wraps function calls
logMiddleware := func(next core.StreamingFunc[string, string, struct{}]) core.StreamingFunc[string, string, struct{}] {
return func(ctx context.Context, input string, cb core.StreamCallback[struct{}]) (string, error) {
fmt.Println("Before:", input)
result, err := next(ctx, input, cb)
fmt.Println("After:", result)
return result, err
}
}
// The original function
originalFn := func(ctx context.Context, input string, cb core.StreamCallback[struct{}]) (string, error) {
return strings.ToUpper(input), nil
}
// Chain and apply middleware
wrapped := core.ChainMiddleware(logMiddleware)(originalFn)
result, _ := wrapped(context.Background(), "hello", nil)
fmt.Println("Final:", result)
}
Output: Before: hello After: HELLO Final: HELLO
func Middlewares ¶ added in v0.3.0
func Middlewares[In, Out, Stream any](ms ...Middleware[In, Out, Stream]) []Middleware[In, Out, Stream]
Middlewares returns an array of middlewares that are passes in as an argument. core.Middlewares(apple, banana) is identical to []core.Middleware[InputType, OutputType]{apple, banana}
type Operation ¶ added in v1.3.0
type Operation[Out any] struct { Action string // Key of the action that created this operation. ID string // ID of the operation. Done bool // Whether the operation is complete. Output Out // Result when done. Error error // Error if the operation failed. Metadata map[string]any // Additional metadata. }
Operation represents a long-running operation started by a background action.
type ReflectionError ¶ added in v0.5.3
type ReflectionError struct {
Details *ReflectionErrorDetails `json:"details,omitempty"`
Message string `json:"message"`
Code int `json:"code"`
}
ReflectionError is the wire format for HTTP errors for Reflection API responses.
func ToReflectionError ¶ added in v0.5.3
func ToReflectionError(err error) ReflectionError
ToReflectionError gets the JSON representation for reflection API Error responses.
type ReflectionErrorDetails ¶ added in v0.5.3
type RequestData ¶ added in v0.3.0
type RequestData struct {
Method string // Method is the HTTP method of the request (e.g. "GET", "POST", etc.)
Headers map[string]string // Headers is the headers of the request. The keys are the header names in lowercase.
Input json.RawMessage // Input is the body of the request.
}
RequestData is the data associated with a request. It is used to provide additional context to the Action.
type StartOpFunc ¶ added in v1.3.0
StartOpFunc starts a background operation.
type Status ¶ added in v0.5.3
type Status struct {
Name StatusName `json:"name"`
Message string `json:"message,omitempty"`
}
Status represents a status condition, typically used in responses or errors.
func NewStatus ¶ added in v0.5.3
func NewStatus(name StatusName, message string) *Status
NewStatus creates a new Status object.
type StatusName ¶ added in v0.5.3
type StatusName string
StatusName defines the set of canonical status names.
const ( OK StatusName = "OK" CANCELLED StatusName = "CANCELLED" UNKNOWN StatusName = "UNKNOWN" INVALID_ARGUMENT StatusName = "INVALID_ARGUMENT" DEADLINE_EXCEEDED StatusName = "DEADLINE_EXCEEDED" NOT_FOUND StatusName = "NOT_FOUND" ALREADY_EXISTS StatusName = "ALREADY_EXISTS" PERMISSION_DENIED StatusName = "PERMISSION_DENIED" UNAUTHENTICATED StatusName = "UNAUTHENTICATED" RESOURCE_EXHAUSTED StatusName = "RESOURCE_EXHAUSTED" FAILED_PRECONDITION StatusName = "FAILED_PRECONDITION" ABORTED StatusName = "ABORTED" OUT_OF_RANGE StatusName = "OUT_OF_RANGE" UNIMPLEMENTED StatusName = "UNIMPLEMENTED" INTERNAL StatusName = "INTERNAL_SERVER_ERROR" UNAVAILABLE StatusName = "UNAVAILABLE" DATA_LOSS StatusName = "DATA_LOSS" )
Constants for canonical status names.
type StreamCallback ¶ added in v0.3.0
StreamCallback is a function that is called during streaming to return the next chunk of the stream.
type StreamingFlowValue ¶ added in v0.5.0
type StreamingFlowValue[Out, Stream any] struct { Done bool Output Out // valid if Done is true Stream Stream // valid if Done is false }
StreamingFlowValue is either a streamed value or a final output of a flow.
type StreamingFunc ¶ added in v0.3.0
type StreamingFunc[In, Out, Stream any] = func(context.Context, In, StreamCallback[Stream]) (Out, error)
StreamingFunc is an alias for streaming functions with input of type In, output of type Out, and streaming chunk of type Stream.
type UserFacingError ¶ added in v0.5.3
type UserFacingError struct {
Message string `json:"message"` // Exclude from default JSON if embedded elsewhere
Status StatusName `json:"status"`
Details map[string]any `json:"details"` // Use map for arbitrary details
}
UserFacingError is the base error type for user facing errors.
func NewPublicError ¶ added in v0.5.3
func NewPublicError(status StatusName, message string, details map[string]any) *UserFacingError
NewPublicError allows a web framework handler to know it is safe to return the message in a request. Other kinds of errors will result in a generic 500 message to avoid the possibility of internal exceptions being leaked to attackers.
Example ¶
This example demonstrates creating user-facing errors.
package main
import (
"fmt"
"github.com/firebase/genkit/go/core"
)
func main() {
// Create a user-facing error with details
err := core.NewPublicError(core.INVALID_ARGUMENT, "Invalid email format", map[string]any{
"field": "email",
"value": "not-an-email",
})
fmt.Println("Status:", err.Status)
fmt.Println("Message:", err.Message)
}
Output: Status: INVALID_ARGUMENT Message: Invalid email format
func (*UserFacingError) Error ¶ added in v0.5.3
func (e *UserFacingError) Error() string
Error implements the standard error interface for UserFacingError.
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
Package logger provides context-scoped structured logging for Genkit.
|
Package logger provides context-scoped structured logging for Genkit. |
|
Package tracing provides execution trace support for Genkit operations.
|
Package tracing provides execution trace support for Genkit operations. |
|
x
|
|
|
session
Package session provides experimental session management APIs for Genkit.
|
Package session provides experimental session management APIs for Genkit. |
|
streaming
Package streaming provides experimental durable streaming APIs for Genkit.
|
Package streaming provides experimental durable streaming APIs for Genkit. |