tdbg

package
v1.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 17, 2026 License: MIT Imports: 72 Imported by: 0

Documentation

Index

Examples

Constants

View Source
const (
	DefaultFrontendAddress = "127.0.0.1:7233"
)

Variables

View Source
var (
	FlagAddress                    = "address"
	FlagHistoryAddress             = "history-address"
	FlagNamespaceID                = "namespace-id"
	FlagNamespace                  = "namespace"
	FlagNamespaceAlias             = []string{"n"}
	FlagShardID                    = "shard-id"
	FlagWorkflowID                 = "workflow-id"
	FlagWorkflowIDAlias            = []string{"wid"}
	FlagRunID                      = "run-id"
	FlagRunIDAlias                 = []string{"rid"}
	FlagBusinessID                 = "business-id"
	FlagBusinessIDAlias            = []string{"bid", FlagWorkflowID, FlagWorkflowIDAlias[0]}
	FlagArchetype                  = "archetype"
	FlagNumberOfShards             = "number-of-shards"
	FlagMinEventID                 = "min-event-id"
	FlagMaxEventID                 = "max-event-id"
	FlagTaskQueue                  = "task-queue"
	FlagTaskQueueType              = "task-queue-type"
	FlagContextTimeout             = "context-timeout"
	FlagContextTimeoutAlias        = []string{"ct"}
	FlagCluster                    = "cluster"
	FlagTargetCluster              = "target-cluster"
	FlagPageSize                   = "pagesize"
	FlagFrom                       = "from"
	FlagPrintFullyDetail           = "print-full"
	FlagPrintJSON                  = "print-json"
	FlagHeartbeatedWithin          = "heartbeated-within"
	FlagInputFilename              = "input-filename"
	FlagOutputFilename             = "output-filename"
	FlagClusterMembershipRole      = "role"
	FlagSkipErrorMode              = "skip-errors"
	FlagTaskID                     = "task-id"
	FlagTaskCategory               = "task-category"
	FlagTaskVisibilityTimestamp    = "task-timestamp"
	FlagMinVisibilityTimestamp     = "min-visibility-ts"
	FlagMaxVisibilityTimestamp     = "max-visibility-ts"
	FlagEnableTLS                  = "tls"
	FlagTLSCertPath                = "tls-cert-path"
	FlagTLSKeyPath                 = "tls-key-path"
	FlagTLSCaPath                  = "tls-ca-path"
	FlagTLSDisableHostVerification = "tls-disable-host-verification"
	FlagTLSServerName              = "tls-server-name"
	FlagLastMessageID              = "last-message-id"
	FlagJobToken                   = "job-token"
	FlagReason                     = "reason"
	FlagYes                        = "yes"
	FlagMore                       = "more"
	FlagMinEventVersion            = "min-event-version"
	FlagMaxEventVersion            = "max-event-version"
	FlagMinTaskID                  = "min-task-id"
	FlagMaxTaskID                  = "max-task-id"
	FlagSubqueue                   = "subqueue"
	FlagDLQType                    = "dlq-type"
	FlagQueueType                  = "queue-type"
	FlagDLQVersion                 = "dlq-version"
	FlagMaxMessageCount            = "max-message-count"
	FlagProtoType                  = "type"
	FlagHexData                    = "hex-data"
	FlagHexFile                    = "hex-file"
	FlagBinaryFile                 = "binary-file"
	FlagBase64Data                 = "base64-data"
	FlagBase64File                 = "base64-file"
	FlagTaskCategoryID             = "task-category-id"
	FlagEncoding                   = "encoding"
	FlagPartitionID                = "partition-id"
	FlagStickyName                 = "sticky-name"
	FlagBuildIDs                   = "select-build-id"
	FlagUnversioned                = "select-unversioned"
	FlagAllActive                  = "select-all-active"
	FlagFair                       = "fair"
	FlagMinPass                    = "min-pass"
	FlagVisibilityQuery            = "query"
	FlagJobID                      = "job-id"
	FlagDecode                     = "decode"
	FlagScheduleID                 = "schedule-id"
	FlagScheduleIDAlias            = []string{"sid"}
	FlagTarget                     = "target"
)

Flags used to specify cli command line arguments

Functions

func AdminBatchRefreshWorkflowTasks

func AdminBatchRefreshWorkflowTasks(c *cli.Context, clientFactory ClientFactory, prompter *Prompter) error

AdminBatchRefreshWorkflowTasks starts a batch job to refresh workflow tasks for multiple workflows

func AdminDecodeBase64

func AdminDecodeBase64(c *cli.Context) error

func AdminDecodeProto

func AdminDecodeProto(c *cli.Context) error

func AdminDeleteWorkflow

func AdminDeleteWorkflow(c *cli.Context, clientFactory ClientFactory, prompter *Prompter) error

AdminDeleteWorkflow force deletes a workflow's mutable state (both concrete and current), history, and visibility records as long as it's possible. It should only be used as a troubleshooting tool since no additional check will be done before the deletion. (e.g. if a child workflow has recorded its result in the parent workflow) Please use normal workflow delete command to gracefully delete a workflow execution.

func AdminDescribeExecution

func AdminDescribeExecution(c *cli.Context, clientFactory ClientFactory) error

AdminDescribeExecution describes a Temporal execution (CHASM tree or workflow).

func AdminDescribeHistoryHost

func AdminDescribeHistoryHost(c *cli.Context, clientFactory ClientFactory) error

AdminDescribeHistoryHost describes history host

func AdminDescribeShard

func AdminDescribeShard(c *cli.Context, clientFactory ClientFactory) error

AdminDescribeShard describes shard by shard id

func AdminDescribeTaskQueuePartition

func AdminDescribeTaskQueuePartition(c *cli.Context, clientFactory ClientFactory) error

AdminDescribeTaskQueuePartition displays task queue partition information

func AdminForceUnloadTaskQueuePartition

func AdminForceUnloadTaskQueuePartition(c *cli.Context, clientFactory ClientFactory) error

AdminForceUnloadTaskQueuePartition forcefully unloads a task queue partition

func AdminGetShardID

func AdminGetShardID(c *cli.Context) error

AdminGetShardID get shardID

func AdminImportWorkflow

func AdminImportWorkflow(c *cli.Context, clientFactory ClientFactory) error

AdminImportWorkflow imports history

func AdminListClusterMembers

func AdminListClusterMembers(c *cli.Context, clientFactory ClientFactory) error

AdminListClusterMembers outputs a list of cluster members

func AdminListGossipMembers

func AdminListGossipMembers(c *cli.Context, clientFactory ClientFactory) error

AdminListGossipMembers outputs a list of gossip members

func AdminListShardTasks

func AdminListShardTasks(c *cli.Context, clientFactory ClientFactory, registry tasks.TaskCategoryRegistry) error

AdminListShardTasks outputs a list of a tasks for given Shard and Task Category

func AdminListTaskQueueTasks

func AdminListTaskQueueTasks(c *cli.Context, clientFactory ClientFactory) error

AdminListTaskQueueTasks displays task information

func AdminMigrateSchedule

func AdminMigrateSchedule(c *cli.Context, clientFactory ClientFactory) error

AdminMigrateSchedule migrates a schedule between V1 (workflow-backed) and V2 (CHASM).

func AdminRebuildMutableState

func AdminRebuildMutableState(c *cli.Context, clientFactory ClientFactory) error

AdminRebuildMutableState rebuild a workflow mutable state using persisted history events

func AdminRefreshWorkflowTasks

func AdminRefreshWorkflowTasks(c *cli.Context, clientFactory ClientFactory) error

AdminRefreshWorkflowTasks refreshes all the tasks of a workflow

func AdminRemoveTask

func AdminRemoveTask(
	c *cli.Context,
	clientFactory ClientFactory,
	taskCategoryRegistry tasks.TaskCategoryRegistry,
) error

AdminRemoveTask describes history host

func AdminReplicateWorkflow

func AdminReplicateWorkflow(
	c *cli.Context,
	clientFactory ClientFactory,
) error

AdminReplicateWorkflow force replicates a workflow by generating replication tasks

func AdminShardManagement

func AdminShardManagement(c *cli.Context, clientFactory ClientFactory) error

AdminShardManagement describes history host

func AdminShowWorkflow

func AdminShowWorkflow(c *cli.Context, clientFactory ClientFactory) error

AdminShowWorkflow shows history

func NewCliApp

func NewCliApp(opts ...Option) *cli.App

NewCliApp instantiates a new instance of the CLI application.

func StringToEnum

func StringToEnum(search string, candidates map[string]int32) (int32, error)

Types

type BoolFlagLookup

type BoolFlagLookup interface {
	Bool(name string) bool
}

BoolFlagLookup can be satisfied by github.com/urfave/cli/v2.Context.

type ClientFactory

type ClientFactory interface {
	AdminClient(c *cli.Context) adminservice.AdminServiceClient
	WorkflowClient(c *cli.Context) workflowservice.WorkflowServiceClient
}

ClientFactory is used to construct rpc clients

func NewClientFactory

func NewClientFactory(opts ...ClientFactoryOption) ClientFactory

NewClientFactory creates a new ClientFactory

type ClientFactoryOption

type ClientFactoryOption func(params *clientFactoryParams)

ClientFactoryOption is used to configure the ClientFactory via NewClientFactory.

func WithFrontendAddress

func WithFrontendAddress(address string) ClientFactoryOption

WithFrontendAddress ensures that admin clients created by the factory will connect to the specified address.

type DLQJobService

type DLQJobService struct {
	// contains filtered or unexported fields
}

func (*DLQJobService) CancelJob

func (ac *DLQJobService) CancelJob(c *cli.Context) error

func (*DLQJobService) DescribeJob

func (ac *DLQJobService) DescribeJob(c *cli.Context) error

type DLQMessage

type DLQMessage struct {
	// MessageID is the ID of the message within the DLQ. You can use this ID as an input to the `--last_message_id`
	// flag for the `purge` and `merge` commands.
	MessageID int64 `json:"message_id"`
	// ShardID is only used for non-namespace replication tasks.
	ShardID int32 `json:"shard_id"`
	// Payload contains the parsed task metadata from the server.
	Payload *TaskPayload `json:"payload"`
}

DLQMessage is used primarily to form the JSON output of the `read` command. It's only used for v2.

type DLQService

type DLQService interface {
	ReadMessages(c *cli.Context) error
	PurgeMessages(c *cli.Context) error
	MergeMessages(c *cli.Context) error
	ListQueues(c *cli.Context) error
}

type DLQServiceProvider

type DLQServiceProvider struct {
	// contains filtered or unexported fields
}

func NewDLQServiceProvider

func NewDLQServiceProvider(
	clientFactory ClientFactory,
	taskBlobEncoder TaskBlobEncoder,
	taskCategoryRegistry tasks.TaskCategoryRegistry,
	writer io.Writer,
	prompterFactory PrompterFactory,
) *DLQServiceProvider

func (*DLQServiceProvider) GetDLQJobService

func (p *DLQServiceProvider) GetDLQJobService() DLQJobService

GetDLQJobService returns a DLQJobService.

func (*DLQServiceProvider) GetDLQService

func (p *DLQServiceProvider) GetDLQService(
	c *cli.Context,
) (DLQService, error)

GetDLQService returns a DLQService based on FlagDLQVersion.

type DLQV1Service

type DLQV1Service struct {
	// contains filtered or unexported fields
}

func NewDLQV1Service

func NewDLQV1Service(clientFactory ClientFactory, prompter *Prompter, writer io.Writer) *DLQV1Service

func (*DLQV1Service) ListQueues

func (ac *DLQV1Service) ListQueues(c *cli.Context) error

func (*DLQV1Service) MergeMessages

func (ac *DLQV1Service) MergeMessages(c *cli.Context) error

func (*DLQV1Service) PurgeMessages

func (ac *DLQV1Service) PurgeMessages(c *cli.Context) error

func (*DLQV1Service) ReadMessages

func (ac *DLQV1Service) ReadMessages(c *cli.Context) (err error)

type DLQV2Service

type DLQV2Service struct {
	// contains filtered or unexported fields
}

DLQV2Service implements DLQService for persistence.QueueV2.

func NewDLQJobService

func NewDLQJobService(
	clientFactory ClientFactory,
	writer io.Writer,
) *DLQV2Service

func NewDLQV2Service

func NewDLQV2Service(
	category tasks.Category,
	sourceCluster string,
	targetCluster string,
	clientFactory ClientFactory,
	writer io.Writer,
	prompter *Prompter,
	taskBlobEncoder TaskBlobEncoder,
) *DLQV2Service

func (*DLQV2Service) ListQueues

func (ac *DLQV2Service) ListQueues(c *cli.Context) (err error)

func (*DLQV2Service) MergeMessages

func (ac *DLQV2Service) MergeMessages(c *cli.Context) error

func (*DLQV2Service) PurgeMessages

func (ac *DLQV2Service) PurgeMessages(c *cli.Context) error

func (*DLQV2Service) ReadMessages

func (ac *DLQV2Service) ReadMessages(c *cli.Context) (err error)

type DefaultFrontendAddressProvider

type DefaultFrontendAddressProvider struct{}

DefaultFrontendAddressProvider uses FlagAddress to determine the frontend address, defaulting to DefaultFrontendAddress if FlagAddress is not set or is empty.

func (DefaultFrontendAddressProvider) GetFrontendAddress

func (d DefaultFrontendAddressProvider) GetFrontendAddress(c *cli.Context) string

type HttpGetter

type HttpGetter interface {
	Get(url string) (resp *http.Response, err error)
}

HttpGetter defines http.Client.Get(...) as an interface so we can mock it

type Option

type Option func(params *Params)

Option modifies the Params for tdbg.

type Params

type Params struct {
	// ClientFactory creates Temporal service clients for tdbg to use.
	ClientFactory ClientFactory
	// TaskCategoryRegistry is used to determine which task categories are available for tdbg to use.
	TaskCategoryRegistry tasks.TaskCategoryRegistry
	// Writer is used to write output from tdbg. The default is os.Stdout.
	Writer io.Writer
	// ErrWriter is used to write errors from tdbg. The default is os.Stderr.
	ErrWriter io.Writer
	// TaskBlobEncoder is needed for custom task serialization. The default uses PredefinedTaskBlobDeserializer.
	TaskBlobEncoder TaskBlobEncoder
}

Params which are customizable for the CLI application.

type PredefinedTaskBlobDeserializer

type PredefinedTaskBlobDeserializer struct{}

PredefinedTaskBlobDeserializer is a TaskBlobProtoDeserializer that deserializes task blobs into the predefined task categories that are used by Temporal. If your server has custom categories, you'll want to build something on top of this.

func NewPredefinedTaskBlobDeserializer

func NewPredefinedTaskBlobDeserializer() PredefinedTaskBlobDeserializer

NewPredefinedTaskBlobDeserializer returns a TaskBlobProtoDeserializer that works for the stock task categories of the server. You need to extend this if you have custom task categories.

func (PredefinedTaskBlobDeserializer) Deserialize

func (d PredefinedTaskBlobDeserializer) Deserialize(categoryID int, blob *commonpb.DataBlob) (proto.Message, error)

Deserialize a task blob from one of the server's predefined task categories into a proto message.

type Prompter

type Prompter struct {
	// contains filtered or unexported fields
}

Prompter is a helper for prompting the user for confirmation.

func NewPrompter

func NewPrompter(c BoolFlagLookup, opts ...PrompterOption) *Prompter

NewPrompter creates a new Prompter. In most cases, the first argument should be github.com/urfave/cli/v2.Context.

func (*Prompter) Prompt

func (p *Prompter) Prompt(msg string)

Prompt the user for confirmation. If the user does not respond with "y" or "yes" (case-insensitive and without leading or trailing space), the process will exit with code 1.

type PrompterFactory

type PrompterFactory func(c BoolFlagLookup) *Prompter

func NewPrompterFactory

func NewPrompterFactory(opts ...PrompterOption) PrompterFactory

type PrompterOption

type PrompterOption func(*PrompterParams)

PrompterOption is used to override default PrompterParams.

type PrompterParams

type PrompterParams struct {
	// Reader defaults to os.Stdin.
	Reader io.Reader
	// Writer defaults to os.Stdout.
	Writer io.Writer
	// Exiter defaults to [os.Exit].
	Exiter func(code int)
}

PrompterParams is used to configure a new Prompter.

type ProtoTaskBlobEncoder

type ProtoTaskBlobEncoder struct {
	// contains filtered or unexported fields
}

ProtoTaskBlobEncoder is a TaskBlobEncoder that uses a TaskBlobProtoDeserializer to deserialize the blob into a proto message, and then uses protojson to marshal the proto message into a human-readable format.

func NewProtoTaskBlobEncoder

func NewProtoTaskBlobEncoder(deserializer TaskBlobProtoDeserializer) *ProtoTaskBlobEncoder

NewProtoTaskBlobEncoder returns a TaskBlobEncoder that uses a TaskBlobProtoDeserializer to deserialize the blob.

func (*ProtoTaskBlobEncoder) Encode

func (e *ProtoTaskBlobEncoder) Encode(writer io.Writer, categoryID int, blob *commonpb.DataBlob) error

Encode a blob for a given task category to a human-readable format by deserializing the blob into a proto message and then pretty-printing it using protojson.

type TaskBlobEncoder

type TaskBlobEncoder interface {
	Encode(writer io.Writer, taskCategoryID int, blob *commonpb.DataBlob) error
}

TaskBlobEncoder takes a blob for a given task category and encodes it to a human-readable format. Here's a breakdown of the relationship between all the related types needed to implement a custom encoder: - NewCliApp accepts a list of Option objects. - Option objects modify Params. - Params contain a TaskBlobEncoder. - TaskBlobEncoder is implemented by ProtoTaskBlobEncoder. - ProtoTaskBlobEncoder uses protojson to marshal proto.Message objects from a TaskBlobProtoDeserializer. - TaskBlobProtoDeserializer is implemented by the stock PredefinedTaskBlobDeserializer. - PredefinedTaskBlobDeserializer deserializes commonpb.DataBlob objects into proto.Message objects.

Example
package main

import (
	"bytes"
	"fmt"
	"io"
	"os"
	"strconv"

	commonpb "go.temporal.io/api/common/v1"

	"github.com/hanzoai/tasks/service/history/tasks"
	"github.com/hanzoai/tasks/tools/tdbg"
	enumspb "go.temporal.io/api/enums/v1"
)

var customCategory = tasks.Category{}

func main() {
	var output bytes.Buffer
	app := tdbg.NewCliApp(func(params *tdbg.Params) {
		params.Writer = &output
		stockEncoder := params.TaskBlobEncoder
		params.TaskBlobEncoder = tdbg.TaskBlobEncoderFn(func(
			writer io.Writer,
			taskCategoryID int,
			blob *commonpb.DataBlob,
		) error {
			if taskCategoryID == customCategory.ID() {
				_, err := writer.Write(append([]byte("hello, "), blob.Data...))
				return err
			}
			return stockEncoder.Encode(writer, taskCategoryID, blob)
		})
	})
	file, err := os.CreateTemp("", "*")
	if err != nil {
		panic(err)
	}
	defer func() {
		if err := os.Remove(file.Name()); err != nil {
			panic(err)
		}
	}()
	_, err = file.Write([]byte("\"world\""))
	if err != nil {
		panic(err)
	}
	err = app.Run([]string{
		"tdbg", "decode", "task",
		"--" + tdbg.FlagEncoding, enumspb.ENCODING_TYPE_JSON.String(),
		"--" + tdbg.FlagTaskCategoryID, strconv.Itoa(customCategory.ID()),
		"--" + tdbg.FlagBinaryFile, file.Name(),
	})
	if err != nil {
		panic(err)
	}

	fmt.Println(output.String())

}
Output:
hello, "world"

type TaskBlobEncoderFn

type TaskBlobEncoderFn func(writer io.Writer, taskCategoryID int, blob *commonpb.DataBlob) error

TaskBlobEncoderFn implements TaskBlobEncoder by calling a function.

func (TaskBlobEncoderFn) Encode

func (e TaskBlobEncoderFn) Encode(writer io.Writer, taskCategoryID int, blob *commonpb.DataBlob) error

Encode the task by calling the function.

type TaskBlobProtoDeserializer

type TaskBlobProtoDeserializer interface {
	Deserialize(taskCategoryID int, blob *commonpb.DataBlob) (proto.Message, error)
}

TaskBlobProtoDeserializer is used to deserialize task blobs into proto messages. This makes it easier to create an encoder if your tasks are all backed by protos. We separate this from the encoder because we don't want the encoder to be tied to protos as the wire format.

type TaskPayload

type TaskPayload struct {
	// contains filtered or unexported fields
}

TaskPayload implements both json.Marshaler and json.Unmarshaler. This allows us to pretty-print tasks using jsonpb when serializing and then store the raw bytes of the task payload for later use when deserializing. We need to store the raw bytes instead of immediately decoding to a concrete type because that logic is dynamic and can't depend solely on the task category ID in case there are additional task categories in use.

func (*TaskPayload) Bytes

func (p *TaskPayload) Bytes() []byte

Bytes returns the raw bytes of the deserialized TaskPayload. This will return nil if the payload has not been deserialized yet.

func (*TaskPayload) MarshalJSON

func (p *TaskPayload) MarshalJSON() ([]byte, error)

func (*TaskPayload) UnmarshalJSON

func (p *TaskPayload) UnmarshalJSON(b []byte) error

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL