Documentation
¶
Overview ¶
Package framework implements all the grunt work involved in running a simple controller.
Example ¶
package main
import (
"fmt"
"sync"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache"
"github.com/GoogleCloudPlatform/kubernetes/pkg/controller/framework"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
)
func main() {
// source simulates an apiserver object endpoint.
source := framework.NewFakeControllerSource()
// This will hold the downstream state, as we know it.
downstream := cache.NewStore(cache.MetaNamespaceKeyFunc)
// This will hold incoming changes. Note how we pass downstream in as a
// KeyLister, that way resync operations will result in the correct set
// of update/delete deltas.
fifo := cache.NewDeltaFIFO(cache.MetaNamespaceKeyFunc, nil, downstream)
// Let's do threadsafe output to get predictable test results.
outputSetLock := sync.Mutex{}
outputSet := util.StringSet{}
cfg := &framework.Config{
Queue: fifo,
ListerWatcher: source,
ObjectType: &api.Pod{},
FullResyncPeriod: time.Millisecond * 100,
RetryOnError: false,
// Let's implement a simple controller that just deletes
// everything that comes in.
Process: func(obj interface{}) error {
// Obj is from the Pop method of the Queue we make above.
newest := obj.(cache.Deltas).Newest()
if newest.Type != cache.Deleted {
// Update our downstream store.
err := downstream.Add(newest.Object)
if err != nil {
return err
}
source.Delete(newest.Object.(runtime.Object))
} else {
// Update our downstream store.
err := downstream.Delete(newest.Object)
if err != nil {
return err
}
// fifo's KeyOf is easiest, because it handles
// DeletedFinalStateUnknown markers.
key, err := fifo.KeyOf(newest.Object)
if err != nil {
return err
}
// Record some output.
outputSetLock.Lock()
defer outputSetLock.Unlock()
outputSet.Insert(key)
}
return nil
},
}
// Create the controller and run it until we close stop.
stop := make(chan struct{})
framework.New(cfg).Run(stop)
// Let's add a few objects to the source.
for _, name := range []string{"a-hello", "b-controller", "c-framework"} {
// Note that these pods are not valid-- the fake source doesn't
// call validation or anything.
source.Add(&api.Pod{ObjectMeta: api.ObjectMeta{Name: name}})
}
// Let's wait for the controller to process the things we just added.
time.Sleep(500 * time.Millisecond)
close(stop)
outputSetLock.Lock()
for _, key := range outputSet.List() {
fmt.Println(key)
}
}
Output: a-hello b-controller c-framework
Index ¶
- type Config
- type Controller
- type FakeControllerSource
- func (f *FakeControllerSource) Add(obj runtime.Object)
- func (f *FakeControllerSource) Delete(lastValue runtime.Object)
- func (f *FakeControllerSource) List() (runtime.Object, error)
- func (f *FakeControllerSource) Modify(obj runtime.Object)
- func (f *FakeControllerSource) Watch(resourceVersion string) (watch.Interface, error)
- type ProcessFunc
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct {
// The queue for your objects; either a cache.FIFO or
// a cache.DeltaFIFO. Your Process() function should accept
// the output of this Oueue's Pop() method.
cache.Queue
// Something that can list and watch your objects.
cache.ListerWatcher
// Something that can process your objects.
Process ProcessFunc
// The type of your objects.
ObjectType runtime.Object
// Reprocess everything at least this often.
// Note that if it takes longer for you to clear the queue than this
// period, you will end up processing items in the order determined
// by cache.FIFO.Replace(). Currently, this is random. If this is a
// problem, we can change that replacement policy to append new
// things to the end of the queue instead of replacing the entire
// queue.
FullResyncPeriod time.Duration
// If true, when Process() returns an error, re-enqueue the object.
// TODO: add interface to let you inject a delay/backoff or drop
// the object completely if desired. Pass the object in
// question to this interface as a parameter.
RetryOnError bool
}
Config contains all the settings for a Controller.
type Controller ¶
type Controller struct {
// contains filtered or unexported fields
}
Controller is a generic controller framework.
func (*Controller) Run ¶
func (c *Controller) Run(stopCh <-chan struct{})
Run begins processing items, and will continue until a value is sent down stopCh. It's an error to call Run more than once. Run does not block.
type FakeControllerSource ¶
type FakeControllerSource struct {
// contains filtered or unexported fields
}
FakeControllerSource implements listing/watching for testing.
func NewFakeControllerSource ¶
func NewFakeControllerSource() *FakeControllerSource
func (*FakeControllerSource) Add ¶
func (f *FakeControllerSource) Add(obj runtime.Object)
Add adds an object to the set and sends an add event to watchers. obj's ResourceVersion is set.
func (*FakeControllerSource) Delete ¶
func (f *FakeControllerSource) Delete(lastValue runtime.Object)
Delete deletes an object from the set and sends a delete event to watchers. obj's ResourceVersion is set.
func (*FakeControllerSource) List ¶
func (f *FakeControllerSource) List() (runtime.Object, error)
List returns a list object, with its resource version set.
func (*FakeControllerSource) Modify ¶
func (f *FakeControllerSource) Modify(obj runtime.Object)
Modify updates an object in the set and sends a modified event to watchers. obj's ResourceVersion is set.
type ProcessFunc ¶
type ProcessFunc func(obj interface{}) error
ProcessFunc processes a single object.