jsonrpc

package module
v0.16.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 10, 2025 License: Apache-2.0 Imports: 5 Imported by: 22

README

JSON-RPC

GoDoc Go Report Card GoReportCard

This package implements the JSON-RPC 2.0 protocol in Go, providing a lightweight and efficient way to create JSON-RPC clients and servers.

Features

  • Full implementation of the JSON-RPC 2.0 specification
  • Support for notifications, requests, responses, and batch processing
  • Customizable error handling
  • Efficient JSON marshaling and unmarshaling
  • Thread-safe implementation
  • Comprehensive logging capabilities

Installation

go get github.com/viant/jsonrpc
HTTP Streamable (NDJSON) Transport

The streaming transport enables JSON-RPC over HTTP with newline-delimited JSON for server push and resumability.

Key points:

  • Single endpoint (configurable, e.g., /rpc).
  • Handshake: POST <URI> → returns a session id header (name is configurable, e.g., X-Session-Id).
  • Exchange: POST <URI> (with session header) carries JSON-RPC messages; synchronous JSON response returned.
  • Streaming: GET <URI> with headers Accept: application/x-ndjson and the session header opens a newline-delimited JSON stream.
  • Each streamed line is an envelope {"id":<seq>,"data":<jsonrpc>}. Clients can resume after disconnect using Last-Event-ID.

Packages:

// Server
import streamsrv "github.com/viant/jsonrpc/transport/server/http/streamable"

// Client
import streamcli "github.com/viant/jsonrpc/transport/client/http/streamable"

Minimal server example:

package main

import (
    "context"
    "net/http"
    "github.com/viant/jsonrpc"
    "github.com/viant/jsonrpc/transport"
    streamsrv "github.com/viant/jsonrpc/transport/server/http/streamable"
    ssnsession "github.com/viant/jsonrpc/transport/server/http/session"
)

type handler struct{}

func (h *handler) Serve(ctx context.Context, req *jsonrpc.Request, resp *jsonrpc.Response) {
    resp.Result = []byte(`"pong"`)
}

func (h *handler) OnNotification(ctx context.Context, n *jsonrpc.Notification) {}

func main() {
    newH := func(ctx context.Context) transport.Handler { return &handler{} }
    // configure a custom header name for the session id
    http.Handle("/rpc", streamsrv.New(newH,
        streamsrv.WithSessionLocation(ssnsession.NewHeaderLocation("X-Session-Id")),
    ))
    _ = http.ListenAndServe(":8080", nil)
}

Minimal client example:

package main

import (
    "context"
    "fmt"
    "github.com/viant/jsonrpc"
    streamcli "github.com/viant/jsonrpc/transport/client/http/streamable"
)

func main() {
    ctx := context.Background()
    // Use the same custom header name as the server
    client, _ := streamcli.New(ctx, "http://localhost:8080/rpc",
        streamcli.WithSessionHeaderName("X-Session-Id"),
    )

    req := &jsonrpc.Request{Jsonrpc: "2.0", Method: "ping"}
    resp, _ := client.Send(ctx, req)
    fmt.Println(string(resp.Result)) // pong
}

Both SSE and Streamable transports share a common flush helper located at transport/server/http/common.

Session Lifecycle and Reconnect

Both Streamable and SSE transports support graceful reconnects and eventual session cleanup.

  • States: Active (stream attached), Detached (stream closed; pending reconnect), Closed (removed).
  • Reconnect: When a stream disconnects, the session moves to Detached. If the client reconnects within a grace period, the server reattaches the stream and replays missed events using Last-Event-ID.
  • Cleanup: A background sweeper removes sessions based on configured policies and timeouts.

Config options (server):

  • WithReconnectGrace(duration): keep Detached sessions for quick reconnects (default: 30s).
  • WithIdleTTL(duration): remove sessions idle for longer than TTL (default: 5m).
  • WithMaxLifetime(duration): hard cap on any session’s lifetime (default: 1h).
  • WithCleanupInterval(duration): sweeper cadence (default: 30s).
  • WithMaxEventBuffer(int): number of events kept for replay (default: 1024).
  • WithRemovalPolicy(policy): RemovalOnDisconnect | RemovalAfterGrace (default) | RemovalAfterIdle | RemovalManual.
  • WithOverflowPolicy(policy): OverflowDropOldest (default) | OverflowMark.
  • WithOnSessionClose(func): hook invoked before a session is finally removed.

BFF cookie (optional, dev/prod modes):

  • WithBFFCookieSession(BFFCookie{Name, Secure, HttpOnly, SameSite, Path, Domain, MaxAge})
  • WithCORSAllowedOrigins([]string{"http://localhost:3000", "https://app.example.com"})
  • WithCORSAllowCredentials(true) Notes: In dev, set Secure: false to allow HTTP; in prod, keep Secure: true and avoid wildcard origins when credentials are used.

Example (Streamable):

http.Handle("/rpc", streamsrv.New(newH,
    streamsrv.WithReconnectGrace(30*time.Second),
    streamsrv.WithIdleTTL(5*time.Minute),
    streamsrv.WithMaxLifetime(1*time.Hour),
    streamsrv.WithCleanupInterval(30*time.Second),
    streamsrv.WithMaxEventBuffer(1024),
    streamsrv.WithRemovalPolicy(base.RemovalAfterGrace),
))

Migration note: To restore legacy behavior that removed sessions immediately on disconnect, set WithReconnectGrace(0) and WithRemovalPolicy(base.RemovalOnDisconnect).

For browser-based flows where the server (BFF) holds authentication, use a single httpOnly cookie to carry an opaque BFF auth session id (default name suggestion: BFF-Auth-Session). This id maps to durable server-side auth state in an AuthStore (e.g., Redis). No access or refresh tokens are exposed to the client.

  • Transport session header remains header-only and short-lived per client/tab.
  • Auth session: BFF-Auth-Session cookie persists longer and can rehydrate a new transport session on handshake when no session id is present.
  • Rehydrate: enable with WithRehydrateOnHandshake(true) and inject an AuthStore and cookie settings.
  • Logout current vs all: DELETE kills only the current transport session; configure a LogoutAllPath to revoke the BFF auth session and clear the cookie.

Server options (Streamable):

  • WithAuthStore(store): pluggable durable auth store (default is in-memory; supply your own e.g. Redis-backed implementation)
  • WithBFFAuthCookie(&BFFAuthCookie{Name: "BFF-Auth-Session", HttpOnly: true, Secure: true, SameSite: http.SameSiteLaxMode})
  • WithBFFAuthCookieUseTopDomain(true): auto Domain=eTLD+1 (prod); omit Domain for localhost/dev
  • WithRehydrateOnHandshake(true): mint new transport session using auth cookie when session is missing
  • WithLogoutAllPath("/logout-all"): optional path to revoke auth and clear cookie

Security notes:

  • Use Access-Control-Allow-Credentials + exact origins (no wildcard) when sending cookies cross-site.
  • Keep cookie Secure: true in production; allow Secure: false only in dev over HTTP.
  • Consider binding the grant to device hints (UA hash, optional IP range) in your AuthStore.
Custom AuthStore (implement your own)

You can provide a durable store by implementing the auth.Store interface and passing it via WithAuthStore(store). Example skeleton:

package mystore

import (
    "context"
    "time"
    "github.com/viant/jsonrpc/transport/server/auth"
)

type Store struct {
    // add fields for DB/Redis client here
}

func New() *Store { return &Store{} }

func (s *Store) Put(ctx context.Context, g *auth.Grant) error {
    // persist g (apply TTLs using g.ExpiresAt, g.MaxExpiresAt)
    return nil
}

func (s *Store) Get(ctx context.Context, id string) (*auth.Grant, error) {
    // load by id; return auth.ErrNotFound if missing/expired
    return nil, auth.ErrNotFound
}

func (s *Store) Touch(ctx context.Context, id string, at time.Time) error {
    // slide idle TTL (update LastUsedAt/ExpiresAt; clamp to MaxExpiresAt)
    return nil
}

func (s *Store) Rotate(ctx context.Context, oldID string, newGrant *auth.Grant) (string, error) {
    // atomically create new id in same FamilyID; keep old valid briefly (grace)
    return "", nil
}

func (s *Store) Revoke(ctx context.Context, id string) error {
    // delete single grant
    return nil
}

func (s *Store) RevokeFamily(ctx context.Context, familyID string) error {
    // delete all grants in the family
    return nil
}

Wiring it into the server:

srv := streamable.New(newHandler,
    streamable.WithAuthStore(mystore.New()),
    streamable.WithBFFAuthCookie(&streamable.BFFAuthCookie{Name: "BFF-Auth-Session", HttpOnly: true, Secure: true}),
    streamable.WithRehydrateOnHandshake(true),
)

Usage

This package provides multiple transport implementations for JSON-RPC 2.0 communication:

Standard I/O (stdio) Transport

The stdio transport allows JSON-RPC communication with external processes through standard input/output.

Client Usage

The stdio client executes a command and communicates with it via stdin/stdout:

package main

import (
    "context"
    "fmt"
    "github.com/viant/jsonrpc"
    "github.com/viant/jsonrpc/transport/client/stdio"
)

func main() {
    // Create a new stdio client that runs the "my_service" command
    client, err := stdio.New("my_service", 
        stdio.WithArguments("--config", "config.json"),
        stdio.WithEnvironment("DEBUG", "true"),
    )
    if err != nil {
        panic(err)
    }

    // Create a JSON-RPC request
    request := &jsonrpc.Request{
        Jsonrpc: "2.0",
        Method:  "add",
        Params:  []byte(`{"x": 10, "y": 20}`),
        ID:      1,
    }

    // Send the request and get the response
    ctx := context.Background()
    response, err := client.Send(ctx, request)
    if err != nil {
        panic(err)
    }

    fmt.Printf("Result: %s\n", response.Result)

    // Send a notification (no response expected)
    notification := &jsonrpc.Notification{
        Jsonrpc: "2.0",
        Method:  "log",
        Params:  []byte(`{"message": "Hello, world!"}`),
    }

    err = client.Notify(ctx, notification)
    if err != nil {
        panic(err)
    }

    // Send a batch request
    batchRequest := jsonrpc.BatchRequest{
        &jsonrpc.Request{
            Jsonrpc: "2.0",
            Method:  "subtract",
            Params:  []byte(`[42, 23]`),
            Id:      1,
        },
        &jsonrpc.Request{
            Jsonrpc: "2.0",
            Method:  "subtract",
            Params:  []byte(`[23, 42]`),
            Id:      2,
        },
        // A notification (no response expected)
        &jsonrpc.Request{
            Jsonrpc: "2.0",
            Method:  "update",
            Params:  []byte(`[1,2,3,4,5]`),
        },
    }

    responses, err := client.SendBatch(ctx, batchRequest)
    if err != nil {
        panic(err)
    }

    // Process batch responses
    for i, response := range responses {
        fmt.Printf("Response %d: %s\n", i+1, response.Result)
    }
}
Server Usage

The stdio server reads JSON-RPC messages from stdin and writes responses to stdout:

package main

import (
    "context"
    "github.com/viant/jsonrpc"
    "github.com/viant/jsonrpc/transport"
    "github.com/viant/jsonrpc/transport/server/stdio"
    "os"
)

// Define a handler for JSON-RPC methods
type Handler struct{}

func (h *Handler) Handle(ctx context.Context, method string, params []byte) (interface{}, error) {
    switch method {
    case "add":
        // Parse parameters and perform addition
        var args struct {
            X int `json:"x"`
            Y int `json:"y"`
        }
        if err := jsonrpc.Unmarshal(params, &args); err != nil {
            return nil, err
        }
        return map[string]int{"result": args.X + args.Y}, nil
    default:
        return nil, jsonrpc.NewError(jsonrpc.MethodNotFoundError, "Method not found", nil)
    }
}

func main() {
    // Create a new handler factory
    newHandler := func(ctx context.Context) transport.Handler {
        return &Handler{}
    }

    // Create a new stdio server
    ctx := context.Background()
    server := stdio.New(ctx, newHandler,
        stdio.WithErrorWriter(os.Stderr),
    )

    // Start listening for JSON-RPC messages
    if err := server.ListenAndServe(); err != nil {
        panic(err)
    }
}

HTTP Server-Sent Events (SSE) Transport

The HTTP SSE transport allows JSON-RPC communication over HTTP using Server-Sent Events for real-time updates.

Client Usage

The SSE client connects to an SSE endpoint and sends messages to the server:

package main

import (
    "context"
    "fmt"
    "github.com/viant/jsonrpc"
    "github.com/viant/jsonrpc/transport/client/http/sse"
    "time"
)

func main() {
    // Create a new SSE client
    ctx := context.Background()
    client, err := sse.New(ctx, "http://localhost:8080/sse",
        sse.WithHandshakeTimeout(time.Second * 10),
    )
    if err != nil {
        panic(err)
    }

    // Create a JSON-RPC request
    request := &jsonrpc.Request{
        Jsonrpc: "2.0",
        Method:  "getData",
        Params:  []byte(`{"id": 123}`),
        ID:      1,
    }

    // Send the request and get the response
    response, err := client.Send(ctx, request)
    if err != nil {
        panic(err)
    }

    fmt.Printf("Result: %s\n", response.Result)

    // Send a notification
    notification := &jsonrpc.Notification{
        Jsonrpc: "2.0",
        Method:  "logEvent",
        Params:  []byte(`{"event": "user_login"}`),
    }

    err = client.Notify(ctx, notification)
    if err != nil {
        panic(err)
    }
}
Server Usage

The SSE server handles HTTP requests and maintains SSE connections:

package main

import (
    "context"
    "fmt"
	"encoding/json"
    "github.com/viant/jsonrpc"
    "github.com/viant/jsonrpc/transport"
    "github.com/viant/jsonrpc/transport/server/http/sse"
    "net/http"
)

// Define a handler for JSON-RPC methods
type Handler struct{}

func (h *Handler) Handle(ctx context.Context, method string, params []byte) (interface{}, error) {
    switch method {
    case "getData":
        var args struct {
            ID int `json:"id"`
        }
        if err := json.Unmarshal(params, &args); err != nil {
            return nil, err
        }
        return map[string]string{"data": fmt.Sprintf("Data for ID %d", args.ID)}, nil
    default:
        return nil, jsonrpc.NewError(jsonrpc.MethodNotFoundError, "Method not found", nil)
    }
}

func main() {
    // Create a new handler factory
    newHandler := func(ctx context.Context) transport.Handler {
        return &Handler{}
    }

    // Create a new SSE handler
    handler := sse.New(newHandler,
        sse.WithSSEURI("/events"),
        sse.WithMessageURI("/rpc"),
    )

    // Register the handler with an HTTP server
    http.Handle("/events", handler)
    http.Handle("/rpc", handler)

    // Start the HTTP server
    if err := http.ListenAndServe(":8080", nil); err != nil {
        panic(err)
    }
}

Message Types

The package provides the following message types:

  • Request - Represents a JSON-RPC request containing method name, parameters, and an ID
  • Notification - Similar to a request but without an ID, indicating no response is expected
  • Response - Contains the result of a request or an error if the request failed
  • Error - Provides detailed error information with error code, message, and optional data

Error Codes

As per the JSON-RPC 2.0 specification, the following error codes are defined:

Code Constant Description
-32700 ParseError Invalid JSON received
-32600 InvalidRequest The JSON sent is not a valid Request object
-32601 MethodNotFound The method does not exist or is not available
-32602 InvalidParams Invalid method parameters
-32603 InternalError Internal JSON-RPC error

License

The source code is made available under the LICENSE file.

Contribution

Feel free to submit issues, fork the repository and send pull requests!

Credits

Author: Adrian Witas

This project is maintained by Viant.

Documentation

Index

Constants

View Source
const (
	ParseError     = -32700
	InvalidRequest = -32600
	MethodNotFound = -32601
	InvalidParams  = -32602
	InternalError  = -32603
)
View Source
const RequestIdKey = requestId("jsonrpc-request-id")

SessionKey is the key used to store the session ID in the context.

View Source
const SessionKey = sessionKey("jsonrpc-session")

SessionKey is the key used to store the session ID in the context.

View Source
const Version = "2.0"

Version is the JSON-RPC protocol version.

Variables

This section is empty.

Functions

func AsRequestIntId added in v0.6.2

func AsRequestIntId(r RequestId) (int, bool)

func IsUnauthorized added in v0.16.0

func IsUnauthorized(err error) bool

IsUnauthorized returns true if err is or wraps an UnauthorizedError.

Types

type BatchRequest

type BatchRequest []*Request

BatchRequest represents a JSON-RPC 2.0 batch request as per specs

func (*BatchRequest) UnmarshalJSON

func (b *BatchRequest) UnmarshalJSON(data []byte) error

UnmarshalJSON is a custom JSON unmarshaler for the BatchRequest type

type BatchResponse

type BatchResponse []*Response

BatchResponse represents a JSON-RPC 2.0 batch response as per specs

type Error

type Error struct {
	// The error type that occurred.
	Code int `json:"code" yaml:"code" mapstructure:"code"`

	// Additional information about the error. The value of this member is defined by
	// the sender (e.g. detailed error information, nested errors etc.).
	Data json.RawMessage `json:"data,omitempty" yaml:"data,omitempty" mapstructure:"data,omitempty"`

	// A short description of the error. The message SHOULD be limited to a concise
	// single sentence.
	Message string `json:"message" yaml:"message" mapstructure:"message"`
}

Error is used to provide additional information about the error that occurred.

func NewError

func NewError(
	code int,
	message string,
	data interface{},
) *Error

NewError creates a new Error instance to represent the error that occurred.

func NewInternalError

func NewInternalError(message string, data []byte) *Error

NewInternalError creates a new internal error

func NewInvalidParamsError

func NewInvalidParamsError(message string, data []byte) *Error

NewInvalidParamsError creates a new invalid params error

func NewInvalidRequest

func NewInvalidRequest(message string, data []byte) *Error

NewInvalidRequest creates a new invalid request error

func NewMethodNotFound

func NewMethodNotFound(message string, data []byte) *Error

NewMethodNotFound creates a new invalid request error

func NewParsingError

func NewParsingError(message string, data []byte) *Error

NewParsingError creates a new parsing error

func (*Error) Error

func (e *Error) Error() string

Error returns the error message

type Listener

type Listener func(message *Message)

type Logger

type Logger interface {
	// Errorf logs an error message with formatting
	Errorf(format string, args ...interface{})
}

Logger defines the interface for logging operations

var DefaultLogger Logger = NewStdLogger(os.Stderr)

DefaultLogger is the default logger instance that writes to os.Stderr

type Message

type Message struct {
	Type                MessageType
	JsonRpcRequest      *Request
	JsonRpcNotification *Notification
	JsonRpcResponse     *Response
}

Message is a wrapper around the different types of JSON-RPC messages (Request, Notification, Response, Error).

func NewNotificationMessage

func NewNotificationMessage(notification *Notification) *Message

NewNotificationMessage creates a new JSON-RPC message of type Notification.

func NewRequestMessage

func NewRequestMessage(request *Request) *Message

NewRequestMessage creates a new JSON-RPC message of type Request.

func NewResponseMessage

func NewResponseMessage(response *Response) *Message

NewResponseMessage creates a new JSON-RPC message of type Response.

func (*Message) MarshalJSON

func (m *Message) MarshalJSON() ([]byte, error)

MarshalJSON is a custom JSON marshaler for the Message type.

func (*Message) Method

func (m *Message) Method() string

type MessageType

type MessageType string

MessageType is an enumeration of the types of messages in the JSON-RPC protocol.

const (
	MessageTypeRequest      MessageType = "request"
	MessageTypeNotification MessageType = "notification"
	MessageTypeResponse     MessageType = "response"
)

type Notification

type Notification struct {
	// Jsonrpc corresponds to the JSON schema field "jsonrpc".
	Jsonrpc string `json:"jsonrpc" yaml:"jsonrpc" mapstructure:"jsonrpc"`

	// Method corresponds to the JSON schema field "method".
	Method string `json:"method" yaml:"method" mapstructure:"method"`

	// Params corresponds to the JSON schema field "params".
	// It is stored as a []byte to enable efficient marshaling and unmarshaling into custom types later on in the protocol
	Params json.RawMessage `json:"params,omitempty" yaml:"params,omitempty" mapstructure:"params,omitempty"`
}

Notification is a type representing a JSON-RPC notification message.

func NewNotification added in v0.5.0

func NewNotification(method string, params interface{}) (*Notification, error)

NewNotification creates a new Notification instance with the specified method and params.

func (*Notification) UnmarshalJSON

func (m *Notification) UnmarshalJSON(data []byte) error

UnmarshalJSON is a custom JSON unmarshaler for the Notification type.

type Request

type Request struct {
	// Id corresponds to the JSON schema field "id".
	Id RequestId `json:"id,omitempty" yaml:"id" mapstructure:"id"`

	// Jsonrpc corresponds to the JSON schema field "jsonrpc".
	Jsonrpc string `json:"jsonrpc" yaml:"jsonrpc" mapstructure:"jsonrpc"`

	// Method corresponds to the JSON schema field "method".
	Method string `json:"method" yaml:"method" mapstructure:"method"`

	// Params corresponds to the JSON schema field "params".
	// It is stored as a []byte to enable efficient marshaling and unmarshaling into custom types later on in the protocol
	Params json.RawMessage `json:"params,omitempty" yaml:"params,omitempty" mapstructure:"params,omitempty"`
}

Request represents a JSON-RPC request message.

func NewRequest

func NewRequest(method string, parameters interface{}) (*Request, error)

func (*Request) UnmarshalJSON

func (m *Request) UnmarshalJSON(data []byte) error

UnmarshalJSON is a custom JSON unmarshaler for the Request type.

type RequestId

type RequestId any

RequestId is the type used to represent the id of a JSON-RPC request.

type Response

type Response struct {
	// Id corresponds to the JSON schema field "id".
	Id RequestId `json:"id" yaml:"id" mapstructure:"id"`

	// Jsonrpc corresponds to the JSON schema field "jsonrpc".
	Jsonrpc string `json:"jsonrpc" yaml:"jsonrpc" mapstructure:"jsonrpc"`

	//Error
	Error *Error `json:"error,omitempty" yaml:"error,omitempty" mapstructure:"error"`

	// Result corresponds to the JSON schema field "result".
	Result json.RawMessage `json:"result,omitempty" yaml:"result,omitempty" mapstructure:"result"`
}

func NewResponse

func NewResponse(id RequestId, data []byte) *Response

NewResponse creates a new Response instance with the specified id and data.

func (*Response) UnmarshalJSON

func (m *Response) UnmarshalJSON(data []byte) error

UnmarshalJSON is a custom JSON unmarshaler for the Response type.

type StdLogger

type StdLogger struct {
	// contains filtered or unexported fields
}

StdLogger is a simple logger that writes to an io.Writer

func NewStdLogger

func NewStdLogger(writer io.Writer) *StdLogger

NewStdLogger creates a new StdLogger with the specified writer If writer is nil, os.Stderr is used as the default

func (*StdLogger) Errorf

func (l *StdLogger) Errorf(format string, args ...interface{})

Errorf implements Logger.Errorf by writing a formatted error message to the writer

type TypedRequest added in v0.7.0

type TypedRequest[T any] struct {
	Id      uint64 `json:"request_id"`
	Method  string `json:"method"`
	Request T
}

TypedRequest repesents typed reuquest

type UnauthorizedError added in v0.16.0

type UnauthorizedError struct {
	// StatusCode is the HTTP status code associated with the error (typically 401).
	StatusCode int
	// Body contains the raw response body, if available.
	Body []byte
}

UnauthorizedError represents an HTTP 401 Unauthorized error returned by a transport.

func NewUnauthorizedError added in v0.16.0

func NewUnauthorizedError(statusCode int, body []byte) *UnauthorizedError

NewUnauthorizedError constructs a new UnauthorizedError.

func (*UnauthorizedError) Error added in v0.16.0

func (e *UnauthorizedError) Error() string

Error implements the error interface.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL