jd

package
v0.51.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 29, 2025 License: MIT Imports: 21 Imported by: 7

Documentation

Overview

Package jd provides a comprehensive framework for interacting with Job Distributor (JD) services in the Chainlink Deployments Framework ecosystem.

Overview

The JD package enables seamless integration with Job Distributor services through gRPC communication. It supports multiple authentication mechanisms and provides a unified interface for job management operations.

Architecture

The package consists of three main components:

1. JD Client (client.go) - Core gRPC client implementation 2. Client Provider (provider/client_provider.go) - Connects to existing JD services 3. CTF Provider (provider/ctf_provider.go) - Creates and manages JD Docker containers for testing

Basic Usage

Simple Connection

For basic connectivity without authentication:

import (
	"context"
	"google.golang.org/grpc/credentials/insecure"
	"github.com/smartcontractkit/chainlink-deployments-framework/offchain/jd"
)

config := jd.JDConfig{
	GRPC:  "localhost:9090",
	WSRPC: "ws://localhost:9091"
}

client, err := jd.NewJDClient(config)
if err != nil {
	log.Fatal(err)
}

// Use client for operations
import csav1 "github.com/smartcontractkit/chainlink-protos/job-distributor/v1/csa"
keypairs, err := client.ListKeypairs(ctx, &csav1.ListKeypairsRequest{})

Provider Interface

import (
	"github.com/smartcontractkit/chainlink-deployments-framework/offchain/jd/provider"
)

providerConfig := provider.ClientOffchainProviderConfig{
	GRPC:  "localhost:9090",
	WSRPC: "ws://localhost:9091",
	Creds: insecure.NewCredentials(),
}

prov, err := provider.NewClientOffchainProvider(providerConfig)
if err != nil {
	log.Fatal(err)
}

// Initialize with environment
err = prov.Initialize(env)
if err != nil {
	log.Fatal(err)
}

// Get client
client, err := prov.OffchainClient()

// client is compatible with the Offchain field in the Environment struct

CTF Provider (Testing)

For testing scenarios where you need to spin up JD Docker containers:

import (
	"testing"
	"github.com/smartcontractkit/chainlink-deployments-framework/offchain/jd/provider"
)

func TestWithJD(t *testing.T) {
	config := provider.CTFOffchainProviderConfig{
		Image: "job-distributor:latest",
		// OR use environment variable: CTF_JD_IMAGE

		// Optional PostgreSQL configuration
		PostgresPort:      5432,
		PostgresHost:      "localhost",
		PostgresUser:      "chainlink",
		PostgresPassword:  "chainlink",
		PostgresDBName:    "chainlink",

		// Optional JD configuration
		GRPCPort:           14231,
		WebSocketRPCPort:   8080,
		CSAEncryptionKey:   "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef",
		JDSQLDumpPath:      "./migrations.sql", // Optional
	}

	// Create CTF provider
	jdProvider := provider.NewCTFOffchainProvider(t, config)

	// Initialize (starts Docker containers with health check)
	ctx := context.Background()
	client, err := jdProvider.Initialize(ctx)
	if err != nil {
		t.Fatalf("Failed to initialize JD: %v", err)
	}

	// Use client for testing
	keypairs, err := client.ListKeypairs(ctx, &csav1.ListKeypairsRequest{})
	if err != nil {
		t.Fatalf("Failed to list keypairs: %v", err)
	}

	t.Logf("JD service ready with %d keypairs", len(keypairs.Keypairs))
}

The CTF provider automatically: - Starts PostgreSQL container with proper schema - Starts JD container with correct configuration - Performs health checks using retry logic - Cleans up containers when tests complete

Environment Variable Configuration

You can specify the JD Docker image via environment variable:

export CTF_JD_IMAGE=localhost:5001/job-distributor:latest

config := provider.CTFOffchainProviderConfig{
	// Image field can be omitted when CTF_JD_IMAGE is set
}

Health Check

The CTF provider includes built-in health checking that retries `GetKeypair` calls: - 10 retry attempts with 2-second delays - Ensures JD service is fully ready before returning

Authentication

The package supports three authentication mechanisms:

1. No Authentication

For development or internal networks:

config := jd.JDConfig{
	GRPC:  "localhost:9090",
	Creds: insecure.NewCredentials(),
}

2. OAuth2 Authentication

For services requiring OAuth2 Bearer tokens:

import "golang.org/x/oauth2"

tokenSource := oauth2.StaticTokenSource(&oauth2.Token{
	AccessToken: "your-access-token",
	TokenType:   "Bearer",
})

config := jd.JDConfig{
	GRPC:        "secure.jobdistributor.com:443",
	WSRPC:       "wss://secure.jobdistributor.com:443/ws",
	OAuth2:      tokenSource,
	Creds:       credentials.NewTLS(&tls.Config{}),
}

Client Operations

The JD client supports various operations through gRPC service interfaces:

CSA Key Management

import csav1 "github.com/smartcontractkit/chainlink-protos/job-distributor/v1/csa"

// List CSA keypairs
keypairs, err := client.ListKeypairs(ctx, &csav1.ListKeypairsRequest{})
if err != nil {
	log.Printf("Failed to list CSA keypairs: %v", err)
}

for _, keypair := range keypairs.Keypairs {
	log.Printf("CSA Public Key: %s", keypair.PublicKey)
}

Job Management

import jobv1 "github.com/smartcontractkit/chainlink-protos/job-distributor/v1/job"

// List existing jobs
jobs, err := client.ListJobs(ctx, &jobv1.ListJobsRequest{})
if err != nil {
	log.Printf("Failed to list jobs: %v", err)
}

for _, job := range jobs.Jobs {
	log.Printf("Job ID: %s", job.Id)
}

// Propose a new job
jobSpec := &jobv1.ProposeJobRequest{
	NodeIds: []string{"node-1", "node-2"},
	Spec:    "job specification here",
}

response, err := client.ProposeJob(ctx, jobSpec)
if err != nil {
	log.Printf("Failed to propose job: %v", err)
}

Node Management

import nodev1 "github.com/smartcontractkit/chainlink-protos/job-distributor/v1/node"

// List registered nodes
nodes, err := client.ListNodes(ctx, &nodev1.ListNodesRequest{})
if err != nil {
	log.Printf("Failed to list nodes: %v", err)
}

for _, node := range nodes.Nodes {
	log.Printf("Node ID: %s", node.Id)
}

Configuration Validation

The provider automatically validates configurations:

config := provider.ClientOffchainProviderConfig{
	GRPC: "", // Invalid - will cause validation error
}

prov, err := provider.NewClientOffchainProvider(config)
if err != nil {
	// Handle validation error
	log.Printf("Invalid configuration: %v", err)
}

Dry Run Mode

The package includes a dry run client that provides safe testing capabilities without affecting real Job Distributor operations:

import (
	"github.com/smartcontractkit/chainlink-deployments-framework/offchain/jd"
	"github.com/smartcontractkit/chainlink-common/pkg/logger"
)

// Create a real client for read operations
realClient, err := jd.NewJDClient(jd.JDConfig{
	GRPC: "localhost:9090",
	Creds: insecure.NewCredentials(),
})
if err != nil {
	log.Fatal(err)
}

// Wrap with dry run client
dryRunClient := jd.NewDryRunJobDistributor(realClient, logger.DefaultLogger)

// Read operations work normally (forwarded to real backend)
jobs, err := dryRunClient.ListJobs(ctx, &jobv1.ListJobsRequest{})

// Write operations are simulated (logged but not executed)
response, err := dryRunClient.ProposeJob(ctx, &jobv1.ProposeJobRequest{
	NodeId: "test-node",
	Spec:   "test job spec",
})
// Returns mock response without actually proposing the job

For a cleaner approach, use the provider's functional option:

import (
	"github.com/smartcontractkit/chainlink-deployments-framework/offchain/jd/provider"
	"github.com/smartcontractkit/chainlink-common/pkg/logger"
)

// Create provider with dry run mode enabled
jdProvider := provider.NewClientOffchainProvider(
	provider.ClientOffchainProviderConfig{
		GRPC: "localhost:9090",
		Creds: insecure.NewCredentials(),
	},
	provider.WithDryRun(logger),
)

// Initialize - returns a dry run client automatically
client, err := jdProvider.Initialize(ctx)
if err != nil {
	log.Fatal(err)
}

// All operations now use dry run mode
jobs, err := client.ListJobs(ctx, &jobv1.ListJobsRequest{})        // Read: forwarded
response, err := client.ProposeJob(ctx, &jobv1.ProposeJobRequest{  // Write: simulated
	NodeId: "test-node",
	Spec:   "test job spec",
})

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type CognitoAuth added in v0.37.0

type CognitoAuth struct {
	AWSRegion       string
	AppClientID     string
	AppClientSecret string
	Username        string
	Password        string
}

CognitoAuth contains the Cognito authentication information required to generate a token from Cognito.

type CognitoClient added in v0.37.0

type CognitoClient interface {
	InitiateAuth(
		ctx context.Context,
		params *cognitoidentityprovider.InitiateAuthInput,
		optFns ...func(*cognitoidentityprovider.Options),
	) (*cognitoidentityprovider.InitiateAuthOutput, error)
}

CognitoClient defines the interface for Cognito Identity Provider operations. This interface allows for mocking the AWS Cognito client in unit tests.

type CognitoTokenSource added in v0.37.0

type CognitoTokenSource struct {
	// contains filtered or unexported fields
}

CognitoTokenSource provides a oath2 token that is used to authenticate with the Job Distributor.

func NewCognitoTokenSource added in v0.37.0

func NewCognitoTokenSource(auth CognitoAuth) *CognitoTokenSource

NewCognitoTokenSource creates a new CognitoTokenSource with the given CognitoAuth configuration. If client is nil, a real AWS Cognito client will be created when Authenticate is called.

func (*CognitoTokenSource) Authenticate added in v0.37.0

func (c *CognitoTokenSource) Authenticate(ctx context.Context) error

Authenticate performs user authentication against AWS Cognito using the USER_PASSWORD_AUTH flow.

Authentication results are cached in the authResult field and used by the Token() method to provide OAuth2 access tokens without re-authenticating.

func (*CognitoTokenSource) RefreshToken added in v0.38.0

func (c *CognitoTokenSource) RefreshToken(ctx context.Context) error

RefreshToken refreshes the access token using the stored refresh token via the REFRESH_TOKEN_AUTH flow.

This method uses the refresh token from the cached authentication result to obtain new access and ID tokens without reusing the user's credentials. This is more efficient than full re-authentication and follows OAuth2 best practices.

The method:

  1. Uses the cached refresh token to call InitiateAuth with REFRESH_TOKEN_AUTH flow
  2. Updates the cached authentication result with new tokens
  3. Calculates and stores the new token expiry time

Returns an error if the refresh fails (e.g., refresh token expired or invalid).

func (*CognitoTokenSource) Token added in v0.37.0

func (c *CognitoTokenSource) Token() (*oauth2.Token, error)

Token retrieves an OAuth2 access token for authenticating with the Job Distributor service.

This method implements a lazy loading pattern with automatic token refresh:

  1. If no authentication result is cached, it authenticates with AWS Cognito
  2. If the cached token has expired, it refreshes using the refresh token (REFRESH_TOKEN_AUTH flow)
  3. Otherwise, it returns the cached access token
  4. Returns the access token wrapped in an oauth2.Token struct

The method implements the oauth2.TokenSource interface, making it compatible with standard OAuth2 client libraries and HTTP clients that support token sources. Following OAuth2 best practices, it uses the refresh token when available to refresh the access token.

Note: This method uses context.Background() for authentication/refresh if no cached token exists or if the token needs to be refreshed. For more control over authentication context and timeout behavior, consider calling Authenticate() or RefreshToken() explicitly before calling Token().

Returns an OAuth2 token containing the Cognito access token

func (*CognitoTokenSource) TokenExpiresAt added in v0.38.0

func (c *CognitoTokenSource) TokenExpiresAt() time.Time

TokenExpiresAt returns the time when the cached token expires.

type DryRunJobDistributor added in v0.37.1

type DryRunJobDistributor struct {
	// contains filtered or unexported fields
}

DryRunJobDistributor is a readonly JD client. Read operations are forwarded to the real backend, while write operations are ignored.

func NewDryRunJobDistributor added in v0.37.1

func NewDryRunJobDistributor(realBackend offchain.Client, lggr logger.Logger) *DryRunJobDistributor

NewDryRunJobDistributor creates a new DryRunJobDistributor.

func (*DryRunJobDistributor) BatchProposeJob added in v0.37.1

BatchProposeJob simulates proposing multiple jobs in a batch to the Job Distributor without actually submitting them. In dry run mode, this returns an empty response indicating the batch operation was logged but not executed.

func (*DryRunJobDistributor) DeleteJob added in v0.37.1

DeleteJob simulates deleting a job from the Job Distributor without actually executing the deletion. In dry run mode, this returns an empty response indicating the deletion was logged but not executed.

func (*DryRunJobDistributor) DisableNode added in v0.37.1

DisableNode simulates disabling a node in the Job Distributor without actually executing the operation. In dry run mode, this returns an empty response indicating the node disable operation was logged but not executed.

func (*DryRunJobDistributor) EnableNode added in v0.37.1

EnableNode simulates enabling a node in the Job Distributor without actually executing the operation. In dry run mode, this returns an empty response indicating the node enable operation was logged but not executed.

func (*DryRunJobDistributor) GetJob added in v0.37.1

GetJob retrieves a specific job by its ID from the Job Distributor. This operation is forwarded to the real backend since it's a read-only operation.

func (*DryRunJobDistributor) GetKeypair added in v0.37.1

GetKeypair retrieves a specific CSA keypair from the Job Distributor. This operation is forwarded to the real backend since it's a read-only operation.

func (*DryRunJobDistributor) GetNode added in v0.37.1

GetNode retrieves information about a specific node from the Job Distributor. This operation is forwarded to the real backend since it's a read-only operation.

func (*DryRunJobDistributor) GetProposal added in v0.37.1

GetProposal retrieves a specific job proposal by its ID from the Job Distributor. This operation is forwarded to the real backend since it's a read-only operation.

func (*DryRunJobDistributor) ListJobs added in v0.37.1

ListJobs retrieves a list of all jobs from the Job Distributor. This operation is forwarded to the real backend since it's a read-only operation.

func (*DryRunJobDistributor) ListKeypairs added in v0.37.1

ListKeypairs retrieves a list of all CSA keypairs from the Job Distributor. This operation is forwarded to the real backend since it's a read-only operation.

func (*DryRunJobDistributor) ListNodeChainConfigs added in v0.37.1

ListNodeChainConfigs retrieves chain configuration information for nodes from the Job Distributor. This operation is forwarded to the real backend since it's a read-only operation.

func (*DryRunJobDistributor) ListNodes added in v0.37.1

ListNodes retrieves a list of all nodes registered with the Job Distributor. This operation is forwarded to the real backend since it's a read-only operation.

func (*DryRunJobDistributor) ListProposals added in v0.37.1

ListProposals retrieves a list of all job proposals from the Job Distributor. This operation is forwarded to the real backend since it's a read-only operation.

func (*DryRunJobDistributor) ProposeJob added in v0.37.1

ProposeJob simulates proposing a new job to the Job Distributor without actually submitting it. In dry run mode, this returns a mock proposal response with a dummy job ID indicating the job was not actually proposed to the node.

func (*DryRunJobDistributor) RegisterNode added in v0.37.1

RegisterNode simulates registering a new node with the Job Distributor without actually executing the registration. In dry run mode, this returns an empty response indicating the node registration was logged but not executed.

func (*DryRunJobDistributor) RevokeJob added in v0.37.1

RevokeJob simulates revoking a job from the Job Distributor without actually executing the revocation. In dry run mode, this returns an empty response indicating the revocation was logged but not executed.

func (*DryRunJobDistributor) UpdateJob added in v0.37.1

UpdateJob simulates updating an existing job in the Job Distributor without actually executing the update. In dry run mode, this returns an empty response indicating the update was logged but not executed.

func (*DryRunJobDistributor) UpdateNode added in v0.37.1

UpdateNode simulates updating an existing node in the Job Distributor without actually executing the update. In dry run mode, this returns an empty response indicating the node update was logged but not executed.

type JDConfig

type JDConfig struct {
	GRPC  string
	WSRPC string
	Creds credentials.TransportCredentials
	Auth  oauth2.TokenSource
}

JDConfig is the configuration for the Job Distributor client.

type JobDistributor

JobDistributor is the client for the Job Distributor service.

func NewJDClient

func NewJDClient(cfg JDConfig) (*JobDistributor, error)

NewJDClient creates a new Job Distributor client

func (*JobDistributor) GetCSAPublicKey

func (jd *JobDistributor) GetCSAPublicKey(ctx context.Context) (string, error)

GetCSAPublicKey returns the public key for the CSA service

func (*JobDistributor) ProposeJob

ProposeJob proposes jobs through the jobService and accepts the proposed job on selected node based on ProposeJobRequest.NodeId

Directories

Path Synopsis
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL