README
¶
Queue Service
The Queue Service implements the QueueService gRPC interface using buf connect. It manages action execution by creating and managing Kubernetes TaskAction Custom Resources.
Architecture
Kubernetes-Native Design:
- No database required
- Creates TaskAction CRs in Kubernetes for each action
- Uses OwnerReferences for automatic cascading deletion
- Leverages Kubernetes garbage collection for cleanup
Features
- Enqueue actions - Creates TaskAction CRs in Kubernetes
- Abort queued runs - Deletes root TaskAction (children cascade automatically)
- Abort individual actions - Deletes specific TaskAction (children cascade automatically)
- OwnerReference hierarchy - Parent-child relationships via Kubernetes OwnerReferences
- Automatic cleanup - Kubernetes handles cascading deletion of dependent resources
- Health and readiness checks
Running the Service
Prerequisites
- Kubernetes cluster (k3d, kind, minikube, or any K8s cluster)
- Go 1.24 or later
- TaskAction CRD installed in the cluster
- Kubeconfig configured (or running in-cluster)
Quick Start with k3d
# Create a k3d cluster
k3d cluster create flyte-dev
# Verify cluster is running
kubectl cluster-info
# The service will automatically use your ~/.kube/config
Run the service
# From the queue directory
go build -o bin/queue-service ./cmd
./bin/queue-service --config config.yaml
The service will:
- Auto-detect Kubernetes config:
- Try in-cluster config (when running in K8s)
- Fall back to
~/.kube/config(for local development) - Or use explicit kubeconfig path from config
- Initialize Kubernetes client
- Start HTTP/2 server on port 8089 (configurable)
Configuration
Edit config.yaml:
queue:
server:
host: "0.0.0.0"
port: 8089
kubernetes:
namespace: "flyte"
# Optional: specify custom kubeconfig path
# kubeconfig: "/path/to/kubeconfig"
Kubeconfig Resolution:
- If
kubeconfigis set → uses that file - Else tries in-cluster config
- Falls back to default kubeconfig (
~/.kube/config,$KUBECONFIG)
How It Works
Enqueue Action
When EnqueueAction is called:
- Creates a TaskAction CR in Kubernetes
- Root action: No OwnerReference, labeled
flyte.org/is-root: "true" - Child action: Sets OwnerReference to parent TaskAction
- Executor watches TaskAction CRs and executes them
Example TaskAction CR:
apiVersion: flyte.org/v1
kind: TaskAction
metadata:
name: my-org-my-project-dev-run-001-task-001
namespace: flyte
labels:
flyte.org/org: my-org
flyte.org/project: my-project
flyte.org/domain: dev
flyte.org/run: run-001
flyte.org/action: task-001
flyte.org/is-root: "true"
spec:
taskActionBytes: <protobuf-encoded ActionSpec>
Child action with OwnerReference:
apiVersion: flyte.org/v1
kind: TaskAction
metadata:
name: my-org-my-project-dev-run-001-task-002
namespace: flyte
labels:
flyte.org/is-root: "false"
ownerReferences:
- apiVersion: flyte.org/v1
kind: TaskAction
name: my-org-my-project-dev-run-001-task-001
uid: <parent-uid>
blockOwnerDeletion: true
spec:
taskActionBytes: <protobuf-encoded ActionSpec>
Abort Queued Run
When AbortQueuedRun is called:
- Finds root TaskAction (labeled
flyte.org/is-root: "true") - Deletes the root TaskAction
- Kubernetes automatically cascades the deletion to all child TaskActions
Hierarchy example:
root-action (deleted manually)
├─ child-1 (deleted by K8s)
│ ├─ grandchild-1 (deleted by K8s)
│ └─ grandchild-2 (deleted by K8s)
└─ child-2 (deleted by K8s)
Abort Queued Action
When AbortQueuedAction is called:
- Deletes the specific TaskAction
- Kubernetes automatically cascades the deletion to any child TaskActions
- Parent and sibling actions remain unaffected
Testing
Check service health
# Health check
curl http://localhost:8089/healthz
# Readiness check
curl http://localhost:8089/readyz
View TaskActions in Kubernetes
# List all TaskActions
kubectl get taskactions -n flyte
# Watch TaskActions in real-time
kubectl get taskactions -n flyte -w
# Get details of a specific TaskAction
kubectl describe taskaction <name> -n flyte
# View TaskAction hierarchy (via OwnerReferences)
kubectl get taskaction <name> -n flyte -o yaml | grep -A 5 ownerReferences
Test with queue client
# Run the test client
go run testclient/main.go
Expected flow:
- Client calls
EnqueueAction→ TaskAction CR created in K8s - Client calls
AbortQueuedRun→ Root TaskAction deleted, children cascade deleted - Verify in K8s:
kubectl get taskactions -n flyte(should show deletions)
API Endpoints
The service exposes the following buf connect endpoints:
POST /flyteidl2.workflow.QueueService/EnqueueAction- Create TaskAction CRPOST /flyteidl2.workflow.QueueService/AbortQueuedRun- Delete root TaskAction (cascades)POST /flyteidl2.workflow.QueueService/AbortQueuedAction- Delete specific TaskAction (cascades)
Plus health endpoints:
GET /healthz- Health checkGET /readyz- Readiness check
Kubernetes Resources
TaskAction CR Structure
The TaskAction Custom Resource stores:
- Spec: Protobuf-encoded
ActionSpec(includes task definition, inputs, etc.) - Labels: For organization, filtering, and identifying root actions
- OwnerReferences: For automatic cascading deletion
Required RBAC
The Queue Service needs permissions to:
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: queue-service
namespace: flyte
rules:
- apiGroups: ["flyte.org"]
resources: ["taskactions"]
verbs: ["get", "list", "watch", "create", "delete"]
Project Structure
queue/
├── cmd/
│ └── main.go # Entry point with K8s client setup
├── config/
│ └── config.go # Configuration structs
├── k8s/
│ └── client.go # Kubernetes operations (create/delete TaskActions)
├── service/
│ └── queue_service.go # gRPC service handlers
├── client/
│ └── main.go # Test client
├── config.yaml # Configuration file
├── Makefile # Build and test commands
└── README.md # This file
Development
Build
# Build the service
make build
# Or manually
go build -o bin/queue-service ./cmd
Run locally
# Ensure you have a kubeconfig configured
export KUBECONFIG=~/.kube/config
# Run the service
./bin/queue-service --config config.yaml
Run in Kubernetes
# Build Docker image
docker build -t queue-service:latest .
# Deploy to K8s (requires deployment manifests)
kubectl apply -f k8s-manifests/
Advantages of Kubernetes-Native Design
✅ No Database - Kubernetes is the single source of truth ✅ Built-in Durability - K8s etcd provides persistence ✅ Automatic Cleanup - Cascading deletion via OwnerReferences ✅ Native Watching - Controllers can watch CR changes ✅ Scalability - Kubernetes handles distribution and scheduling ✅ Simpler Architecture - One less component to manage ✅ Idiomatic K8s - Leverages native Kubernetes patterns
Troubleshooting
Connection Issues
Error: "failed to get Kubernetes config"
# Verify kubeconfig is valid
kubectl cluster-info
# Or set explicit kubeconfig in config.yaml
queue:
kubernetes:
kubeconfig: "/path/to/kubeconfig"
Permission Issues
Error: "failed to create TaskAction CR: forbidden"
Ensure the service has proper RBAC permissions:
kubectl create role queue-service \
--verb=get,list,watch,create,delete \
--resource=taskactions \
-n flyte
kubectl create rolebinding queue-service \
--role=queue-service \
--serviceaccount=flyte:queue-service \
-n flyte
TaskAction CRD Not Found
Error: "no matches for kind TaskAction"
Install the TaskAction CRD:
kubectl apply -f executor/config/crd/