datasource

package
v0.1.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 27, 2026 License: Apache-2.0 Imports: 12 Imported by: 0

Documentation

Overview

Package datasource implements Fetch over a declarative protocol/datasource DataSource. It composes four pluggable concerns:

  1. Backend — how rows are obtained (mcp_tool | mcp_resource | feed_ref | inline).
  2. Projection — forge selectors project the backend result into rows.
  3. Cache — per-user/conversation/global memoisation with TTL.
  4. Identity — carried in ctx; never a method arg.

All public entry points (HTTP handler, internal MCP tool, scheduler) go through Fetch. There are no per-datasource or per-MCP-server code paths.

Index

Constants

View Source
const (
	CtxUserKey         identityKey = "agently.identity.user"
	CtxConversationKey identityKey = "agently.identity.conversation"
)

Variables

This section is empty.

Functions

func WithIdentity

func WithIdentity(ctx context.Context, user, conversation string) context.Context

WithIdentity returns ctx with explicit user + conversation identifiers. Useful from HTTP handlers and tests that don't use the full runtime auth stack.

Types

type FeedRefResolver

type FeedRefResolver interface {
	ResolveFeed(ctx context.Context, feedID string) (interface{}, error)
}

FeedRefResolver resolves a feed_ref backend to its already-emitted payload. Implementation lives in sdk layer (feeds); service/datasource keeps the interface here so the core has no import cycle.

type FetchOptions

type FetchOptions struct {
	// BypassCache forces a fresh backend call and writes the result back
	// into cache.
	BypassCache bool

	// WriteThrough (with BypassCache=false) means: on hit, still kick off
	// a background refresh if stale-while-revalidate is set. When used as
	// a prewarm, set BypassCache=true.
	WriteThrough bool
}

FetchOptions are per-call overrides carried on the wire.

type Identity

type Identity struct {
	User         string
	Conversation string
}

Identity is the cache-key identity extracted from ctx. It is never used for auth decisions — auth is already attached to ctx by the time Fetch runs.

type IdentityFunc

type IdentityFunc func(ctx context.Context) Identity

IdentityFunc extracts a cache-key identity (user, conversation) from ctx. Tests and non-HTTP callers can supply their own; the default reads well-known keys from ctx.

type MemoryStore

type MemoryStore struct {
	// contains filtered or unexported fields
}

MemoryStore is a minimal thread-safe Store implementation suitable for tests and for callers that manage datasources in-process. Production wiring loads from extension/forge/datasources/*.yaml via the workspace repository; that repository's List() output feeds MemoryStore.Replace.

func NewMemoryStore

func NewMemoryStore() *MemoryStore

func (*MemoryStore) Get

func (s *MemoryStore) Get(id string) (*dsproto.DataSource, bool)

Get fetches a datasource by id.

func (*MemoryStore) List

func (s *MemoryStore) List() []string

List returns a snapshot of known datasource ids.

func (*MemoryStore) Put

func (s *MemoryStore) Put(ds *dsproto.DataSource)

Put registers a datasource.

func (*MemoryStore) Replace

func (s *MemoryStore) Replace(items []*dsproto.DataSource)

Replace swaps the full set of datasources atomically.

type Options

type Options struct {
	Store    Store
	Executor ToolExecutor
	Identity IdentityFunc // if nil, uses defaultIdentity (reads generic ctx keys)
	FeedRef  FeedRefResolver
	Now      func() time.Time // for tests; defaults to time.Now
}

Options are the construction options for New.

type Service

type Service struct {
	// contains filtered or unexported fields
}

Service is the public entry point. Construct with New, then call Fetch.

func New

func New(opts Options) *Service

New constructs a Service. Store + Executor are required; the rest are optional.

func (*Service) Fetch

func (s *Service) Fetch(ctx context.Context, id string, inputs map[string]interface{}, opts FetchOptions) (*dsproto.FetchResult, error)

Fetch resolves the datasource by id and returns a projected result. The caller's identity is read from ctx by the configured IdentityFunc; auth to the upstream MCP server is already attached to ctx by the tool registry.

func (*Service) InvalidateCache

func (s *Service) InvalidateCache(ctx context.Context, id, inputsHash string) error

InvalidateCache drops all entries for a datasource in the caller's scope. When inputsHash is non-empty, only the entry matching that hash is dropped.

type Store

type Store interface {
	Get(id string) (*dsproto.DataSource, bool)
}

Store is the workspace-scoped registry of loaded datasources.

type ToolExecutor

type ToolExecutor interface {
	// Execute calls an MCP tool by fully qualified name "service:method"
	// with args and returns the raw JSON string result (the registry's
	// documented return type). ctx carries identity (auth token is
	// attached by the registry via WithAuthTokenContext).
	Execute(ctx context.Context, name string, args map[string]interface{}) (string, error)
}

ToolExecutor is the seam to invoke an MCP tool. In production this is wired to internal/tool/registry.Registry.Execute which dispatches under the caller's ctx identity. Tests can supply an in-memory stub.

Directories

Path Synopsis
Package adapter wires the datasource service to the production tool dispatch path in internal/tool/registry.
Package adapter wires the datasource service to the production tool dispatch path in internal/tool/registry.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL