jd

package
v0.37.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 28, 2025 License: MIT Imports: 18 Imported by: 3

Documentation

Overview

Package jd provides a comprehensive framework for interacting with Job Distributor (JD) services in the Chainlink Deployments Framework ecosystem.

Overview

The JD package enables seamless integration with Job Distributor services through gRPC communication. It supports multiple authentication mechanisms and provides a unified interface for job management operations.

Architecture

The package consists of three main components:

1. JD Client (client.go) - Core gRPC client implementation 2. Client Provider (provider/client_provider.go) - Connects to existing JD services 3. CTF Provider (provider/ctf_provider.go) - Creates and manages JD Docker containers for testing

Basic Usage

Simple Connection

For basic connectivity without authentication:

import (
	"context"
	"google.golang.org/grpc/credentials/insecure"
	"github.com/smartcontractkit/chainlink-deployments-framework/offchain/jd"
)

config := jd.JDConfig{
	GRPC:  "localhost:9090",
	WSRPC: "ws://localhost:9091"
}

client, err := jd.NewJDClient(config)
if err != nil {
	log.Fatal(err)
}

// Use client for operations
import csav1 "github.com/smartcontractkit/chainlink-protos/job-distributor/v1/csa"
keypairs, err := client.ListKeypairs(ctx, &csav1.ListKeypairsRequest{})

Provider Interface

import (
	"github.com/smartcontractkit/chainlink-deployments-framework/offchain/jd/provider"
)

providerConfig := provider.ClientOffchainProviderConfig{
	GRPC:  "localhost:9090",
	WSRPC: "ws://localhost:9091",
	Creds: insecure.NewCredentials(),
}

prov, err := provider.NewClientOffchainProvider(providerConfig)
if err != nil {
	log.Fatal(err)
}

// Initialize with environment
err = prov.Initialize(env)
if err != nil {
	log.Fatal(err)
}

// Get client
client, err := prov.OffchainClient()

// client is compatible with the Offchain field in the Environment struct

CTF Provider (Testing)

For testing scenarios where you need to spin up JD Docker containers:

import (
	"testing"
	"github.com/smartcontractkit/chainlink-deployments-framework/offchain/jd/provider"
)

func TestWithJD(t *testing.T) {
	config := provider.CTFOffchainProviderConfig{
		Image: "job-distributor:latest",
		// OR use environment variable: CTF_JD_IMAGE

		// Optional PostgreSQL configuration
		PostgresPort:      5432,
		PostgresHost:      "localhost",
		PostgresUser:      "chainlink",
		PostgresPassword:  "chainlink",
		PostgresDBName:    "chainlink",

		// Optional JD configuration
		GRPCPort:           14231,
		WebSocketRPCPort:   8080,
		CSAEncryptionKey:   "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef",
		JDSQLDumpPath:      "./migrations.sql", // Optional
	}

	// Create CTF provider
	jdProvider := provider.NewCTFOffchainProvider(t, config)

	// Initialize (starts Docker containers with health check)
	ctx := context.Background()
	client, err := jdProvider.Initialize(ctx)
	if err != nil {
		t.Fatalf("Failed to initialize JD: %v", err)
	}

	// Use client for testing
	keypairs, err := client.ListKeypairs(ctx, &csav1.ListKeypairsRequest{})
	if err != nil {
		t.Fatalf("Failed to list keypairs: %v", err)
	}

	t.Logf("JD service ready with %d keypairs", len(keypairs.Keypairs))
}

The CTF provider automatically: - Starts PostgreSQL container with proper schema - Starts JD container with correct configuration - Performs health checks using retry logic - Cleans up containers when tests complete

Environment Variable Configuration

You can specify the JD Docker image via environment variable:

export CTF_JD_IMAGE=localhost:5001/job-distributor:latest

config := provider.CTFOffchainProviderConfig{
	// Image field can be omitted when CTF_JD_IMAGE is set
}

Health Check

The CTF provider includes built-in health checking that retries `GetKeypair` calls: - 10 retry attempts with 2-second delays - Ensures JD service is fully ready before returning

Authentication

The package supports three authentication mechanisms:

1. No Authentication

For development or internal networks:

config := jd.JDConfig{
	GRPC:  "localhost:9090",
	Creds: insecure.NewCredentials(),
}

2. OAuth2 Authentication

For services requiring OAuth2 Bearer tokens:

import "golang.org/x/oauth2"

tokenSource := oauth2.StaticTokenSource(&oauth2.Token{
	AccessToken: "your-access-token",
	TokenType:   "Bearer",
})

config := jd.JDConfig{
	GRPC:        "secure.jobdistributor.com:443",
	WSRPC:       "wss://secure.jobdistributor.com:443/ws",
	OAuth2:      tokenSource,
	Creds:       credentials.NewTLS(&tls.Config{}),
}

Client Operations

The JD client supports various operations through gRPC service interfaces:

CSA Key Management

import csav1 "github.com/smartcontractkit/chainlink-protos/job-distributor/v1/csa"

// List CSA keypairs
keypairs, err := client.ListKeypairs(ctx, &csav1.ListKeypairsRequest{})
if err != nil {
	log.Printf("Failed to list CSA keypairs: %v", err)
}

for _, keypair := range keypairs.Keypairs {
	log.Printf("CSA Public Key: %s", keypair.PublicKey)
}

Job Management

import jobv1 "github.com/smartcontractkit/chainlink-protos/job-distributor/v1/job"

// List existing jobs
jobs, err := client.ListJobs(ctx, &jobv1.ListJobsRequest{})
if err != nil {
	log.Printf("Failed to list jobs: %v", err)
}

for _, job := range jobs.Jobs {
	log.Printf("Job ID: %s", job.Id)
}

// Propose a new job
jobSpec := &jobv1.ProposeJobRequest{
	NodeIds: []string{"node-1", "node-2"},
	Spec:    "job specification here",
}

response, err := client.ProposeJob(ctx, jobSpec)
if err != nil {
	log.Printf("Failed to propose job: %v", err)
}

Node Management

import nodev1 "github.com/smartcontractkit/chainlink-protos/job-distributor/v1/node"

// List registered nodes
nodes, err := client.ListNodes(ctx, &nodev1.ListNodesRequest{})
if err != nil {
	log.Printf("Failed to list nodes: %v", err)
}

for _, node := range nodes.Nodes {
	log.Printf("Node ID: %s", node.Id)
}

Configuration Validation

The provider automatically validates configurations:

config := provider.ClientOffchainProviderConfig{
	GRPC: "", // Invalid - will cause validation error
}

prov, err := provider.NewClientOffchainProvider(config)
if err != nil {
	// Handle validation error
	log.Printf("Invalid configuration: %v", err)
}

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type CognitoAuth added in v0.36.0

type CognitoAuth struct {
	AWSRegion       string
	AppClientID     string
	AppClientSecret string
	Username        string
	Password        string
}

CognitoAuth contains the Cognito authentication information required to generate a token from Cognito.

type CognitoClient added in v0.36.0

type CognitoClient interface {
	InitiateAuth(
		ctx context.Context,
		params *cognitoidentityprovider.InitiateAuthInput,
		optFns ...func(*cognitoidentityprovider.Options),
	) (*cognitoidentityprovider.InitiateAuthOutput, error)
}

CognitoClient defines the interface for Cognito Identity Provider operations. This interface allows for mocking the AWS Cognito client in unit tests.

type CognitoTokenSource added in v0.36.0

type CognitoTokenSource struct {
	// contains filtered or unexported fields
}

CognitoTokenSource provides a oath2 token that is used to authenticate with the Job Distributor.

func NewCognitoTokenSource added in v0.36.0

func NewCognitoTokenSource(auth CognitoAuth) *CognitoTokenSource

NewCognitoTokenSource creates a new CognitoTokenSource with the given CognitoAuth configuration. If client is nil, a real AWS Cognito client will be created when Authenticate is called.

func (*CognitoTokenSource) Authenticate added in v0.36.0

func (c *CognitoTokenSource) Authenticate(ctx context.Context) error

Authenticate performs user authentication against AWS Cognito using the USER_PASSWORD_AUTH flow.

Authentication results are cached in the authResult field and used by the Token() method to provide OAuth2 access tokens without re-authenticating.

func (*CognitoTokenSource) Token added in v0.36.0

func (c *CognitoTokenSource) Token() (*oauth2.Token, error)

Token retrieves an OAuth2 access token for authenticating with the Job Distributor service.

This method implements a lazy loading pattern:

  1. If an authentication result is already cached, it returns the cached access token
  2. If no cached result exists, it automatically authenticates with AWS Cognito first
  3. Returns the access token wrapped in an oauth2.Token struct

The method implements the oauth2.TokenSource interface, making it compatible with standard OAuth2 client libraries and HTTP clients that support token sources.

Note: This method uses context.Background() for authentication if no cached token exists. For more control over authentication context and timeout behavior, consider calling Authenticate() explicitly before calling Token().

Returns an OAuth2 token containing the Cognito access token

type JDConfig

type JDConfig struct {
	GRPC  string
	WSRPC string
	Creds credentials.TransportCredentials
	Auth  oauth2.TokenSource
}

JDConfig is the configuration for the Job Distributor client.

type JobDistributor

JobDistributor is the client for the Job Distributor service.

func NewJDClient

func NewJDClient(cfg JDConfig) (*JobDistributor, error)

NewJDClient creates a new Job Distributor client

func (*JobDistributor) GetCSAPublicKey

func (jd *JobDistributor) GetCSAPublicKey(ctx context.Context) (string, error)

GetCSAPublicKey returns the public key for the CSA service

func (*JobDistributor) ProposeJob

ProposeJob proposes jobs through the jobService and accepts the proposed job on selected node based on ProposeJobRequest.NodeId

Directories

Path Synopsis
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL