Documentation
¶
Overview ¶
Package jd provides a comprehensive framework for interacting with Job Distributor (JD) services in the Chainlink Deployments Framework ecosystem.
Overview ¶
The JD package enables seamless integration with Job Distributor services through gRPC communication. It supports multiple authentication mechanisms and provides a unified interface for job management operations.
Architecture ¶
The package consists of three main components:
1. JD Client (client.go) - Core gRPC client implementation 2. Client Provider (provider/client_provider.go) - Connects to existing JD services 3. CTF Provider (provider/ctf_provider.go) - Creates and manages JD Docker containers for testing
Basic Usage ¶
Simple Connection ¶
For basic connectivity without authentication:
import (
"context"
"google.golang.org/grpc/credentials/insecure"
"github.com/smartcontractkit/chainlink-deployments-framework/offchain/jd"
)
config := jd.JDConfig{
GRPC: "localhost:9090",
WSRPC: "ws://localhost:9091"
}
client, err := jd.NewJDClient(config)
if err != nil {
log.Fatal(err)
}
// Use client for operations
import csav1 "github.com/smartcontractkit/chainlink-protos/job-distributor/v1/csa"
keypairs, err := client.ListKeypairs(ctx, &csav1.ListKeypairsRequest{})
Provider Interface ¶
import (
"github.com/smartcontractkit/chainlink-deployments-framework/offchain/jd/provider"
)
providerConfig := provider.ClientOffchainProviderConfig{
GRPC: "localhost:9090",
WSRPC: "ws://localhost:9091",
Creds: insecure.NewCredentials(),
}
prov, err := provider.NewClientOffchainProvider(providerConfig)
if err != nil {
log.Fatal(err)
}
// Initialize with environment
err = prov.Initialize(env)
if err != nil {
log.Fatal(err)
}
// Get client
client, err := prov.OffchainClient()
// client is compatible with the Offchain field in the Environment struct
CTF Provider (Testing) ¶
For testing scenarios where you need to spin up JD Docker containers:
import (
"testing"
"github.com/smartcontractkit/chainlink-deployments-framework/offchain/jd/provider"
)
func TestWithJD(t *testing.T) {
config := provider.CTFOffchainProviderConfig{
Image: "job-distributor:latest",
// OR use environment variable: CTF_JD_IMAGE
// Optional PostgreSQL configuration
PostgresPort: 5432,
PostgresHost: "localhost",
PostgresUser: "chainlink",
PostgresPassword: "chainlink",
PostgresDBName: "chainlink",
// Optional JD configuration
GRPCPort: 14231,
WebSocketRPCPort: 8080,
CSAEncryptionKey: "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef",
JDSQLDumpPath: "./migrations.sql", // Optional
}
// Create CTF provider
jdProvider := provider.NewCTFOffchainProvider(t, config)
// Initialize (starts Docker containers with health check)
ctx := context.Background()
client, err := jdProvider.Initialize(ctx)
if err != nil {
t.Fatalf("Failed to initialize JD: %v", err)
}
// Use client for testing
keypairs, err := client.ListKeypairs(ctx, &csav1.ListKeypairsRequest{})
if err != nil {
t.Fatalf("Failed to list keypairs: %v", err)
}
t.Logf("JD service ready with %d keypairs", len(keypairs.Keypairs))
}
The CTF provider automatically: - Starts PostgreSQL container with proper schema - Starts JD container with correct configuration - Performs health checks using retry logic - Cleans up containers when tests complete
Environment Variable Configuration ¶
You can specify the JD Docker image via environment variable:
export CTF_JD_IMAGE=localhost:5001/job-distributor:latest
config := provider.CTFOffchainProviderConfig{
// Image field can be omitted when CTF_JD_IMAGE is set
}
Health Check ¶
The CTF provider includes built-in health checking that retries `GetKeypair` calls: - 10 retry attempts with 2-second delays - Ensures JD service is fully ready before returning
Authentication ¶
The package supports three authentication mechanisms:
1. No Authentication ¶
For development or internal networks:
config := jd.JDConfig{
GRPC: "localhost:9090",
Creds: insecure.NewCredentials(),
}
2. OAuth2 Authentication ¶
For services requiring OAuth2 Bearer tokens:
import "golang.org/x/oauth2"
tokenSource := oauth2.StaticTokenSource(&oauth2.Token{
AccessToken: "your-access-token",
TokenType: "Bearer",
})
config := jd.JDConfig{
GRPC: "secure.jobdistributor.com:443",
WSRPC: "wss://secure.jobdistributor.com:443/ws",
OAuth2: tokenSource,
Creds: credentials.NewTLS(&tls.Config{}),
}
Client Operations ¶
The JD client supports various operations through gRPC service interfaces:
CSA Key Management ¶
import csav1 "github.com/smartcontractkit/chainlink-protos/job-distributor/v1/csa"
// List CSA keypairs
keypairs, err := client.ListKeypairs(ctx, &csav1.ListKeypairsRequest{})
if err != nil {
log.Printf("Failed to list CSA keypairs: %v", err)
}
for _, keypair := range keypairs.Keypairs {
log.Printf("CSA Public Key: %s", keypair.PublicKey)
}
Job Management ¶
import jobv1 "github.com/smartcontractkit/chainlink-protos/job-distributor/v1/job"
// List existing jobs
jobs, err := client.ListJobs(ctx, &jobv1.ListJobsRequest{})
if err != nil {
log.Printf("Failed to list jobs: %v", err)
}
for _, job := range jobs.Jobs {
log.Printf("Job ID: %s", job.Id)
}
// Propose a new job
jobSpec := &jobv1.ProposeJobRequest{
NodeIds: []string{"node-1", "node-2"},
Spec: "job specification here",
}
response, err := client.ProposeJob(ctx, jobSpec)
if err != nil {
log.Printf("Failed to propose job: %v", err)
}
Node Management ¶
import nodev1 "github.com/smartcontractkit/chainlink-protos/job-distributor/v1/node"
// List registered nodes
nodes, err := client.ListNodes(ctx, &nodev1.ListNodesRequest{})
if err != nil {
log.Printf("Failed to list nodes: %v", err)
}
for _, node := range nodes.Nodes {
log.Printf("Node ID: %s", node.Id)
}
Configuration Validation ¶
The provider automatically validates configurations:
config := provider.ClientOffchainProviderConfig{
GRPC: "", // Invalid - will cause validation error
}
prov, err := provider.NewClientOffchainProvider(config)
if err != nil {
// Handle validation error
log.Printf("Invalid configuration: %v", err)
}
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type CognitoAuth ¶ added in v0.36.0
type CognitoAuth struct {
AWSRegion string
AppClientID string
AppClientSecret string
Username string
Password string
}
CognitoAuth contains the Cognito authentication information required to generate a token from Cognito.
type CognitoClient ¶ added in v0.36.0
type CognitoClient interface {
InitiateAuth(
ctx context.Context,
params *cognitoidentityprovider.InitiateAuthInput,
optFns ...func(*cognitoidentityprovider.Options),
) (*cognitoidentityprovider.InitiateAuthOutput, error)
}
CognitoClient defines the interface for Cognito Identity Provider operations. This interface allows for mocking the AWS Cognito client in unit tests.
type CognitoTokenSource ¶ added in v0.36.0
type CognitoTokenSource struct {
// contains filtered or unexported fields
}
CognitoTokenSource provides a oath2 token that is used to authenticate with the Job Distributor.
func NewCognitoTokenSource ¶ added in v0.36.0
func NewCognitoTokenSource(auth CognitoAuth) *CognitoTokenSource
NewCognitoTokenSource creates a new CognitoTokenSource with the given CognitoAuth configuration. If client is nil, a real AWS Cognito client will be created when Authenticate is called.
func (*CognitoTokenSource) Authenticate ¶ added in v0.36.0
func (c *CognitoTokenSource) Authenticate(ctx context.Context) error
Authenticate performs user authentication against AWS Cognito using the USER_PASSWORD_AUTH flow.
Authentication results are cached in the authResult field and used by the Token() method to provide OAuth2 access tokens without re-authenticating.
func (*CognitoTokenSource) Token ¶ added in v0.36.0
func (c *CognitoTokenSource) Token() (*oauth2.Token, error)
Token retrieves an OAuth2 access token for authenticating with the Job Distributor service.
This method implements a lazy loading pattern:
- If an authentication result is already cached, it returns the cached access token
- If no cached result exists, it automatically authenticates with AWS Cognito first
- Returns the access token wrapped in an oauth2.Token struct
The method implements the oauth2.TokenSource interface, making it compatible with standard OAuth2 client libraries and HTTP clients that support token sources.
Note: This method uses context.Background() for authentication if no cached token exists. For more control over authentication context and timeout behavior, consider calling Authenticate() explicitly before calling Token().
Returns an OAuth2 token containing the Cognito access token
type JDConfig ¶
type JDConfig struct {
GRPC string
WSRPC string
Creds credentials.TransportCredentials
Auth oauth2.TokenSource
}
JDConfig is the configuration for the Job Distributor client.
type JobDistributor ¶
type JobDistributor struct {
nodev1.NodeServiceClient
jobv1.JobServiceClient
csav1.CSAServiceClient
WSRPC string
}
JobDistributor is the client for the Job Distributor service.
func NewJDClient ¶
func NewJDClient(cfg JDConfig) (*JobDistributor, error)
NewJDClient creates a new Job Distributor client
func (*JobDistributor) GetCSAPublicKey ¶
func (jd *JobDistributor) GetCSAPublicKey(ctx context.Context) (string, error)
GetCSAPublicKey returns the public key for the CSA service
func (*JobDistributor) ProposeJob ¶
func (jd *JobDistributor) ProposeJob(ctx context.Context, in *jobv1.ProposeJobRequest, opts ...grpc.CallOption) (*jobv1.ProposeJobResponse, error)
ProposeJob proposes jobs through the jobService and accepts the proposed job on selected node based on ProposeJobRequest.NodeId