Agglo: A Process-Anywhere Framework for Event Stream Processing

Overview
Agglo is an experimental event stream processing framework that enables
lightweight, reliable and scalable stream processing alongside persistent
object storage, key-value storage or stream processing platforms.
Binge (BINary-at-the-edGE) is the main artifact of the framework. As the name
implies, it is a single, compiled binary and can run as a stateless daemon,
persistent daemon or as a stand-alone command. This allows the same binary to
be deployed in edge gateways, as Kubernetes deployments, in load balancers,
cloud (lambda) functions, or anywhere else you need to perform stream
processing. The deployed artifact is simply a binary and a single JSON config
that defines the event processing pipelines and configuration for connecting to
external object stores, key-value stores, HTTP/RPC endpoints and stream
processing systems.
With the exception of persistent daemon mode, binge is completely stateless and
can be seamlessly scaled, killed and restarted. When run as a persistent
daemon, binge requires a volume (local or cloud managed, such as EBS) to
maintain a durable queue. If a cloud-managed block storage volume is used to
back the durable queue, a persistent daemon may also be treated as if it was
stateless. In this way, we like to think of binge as
nginx-for-stream-processing.
Rationale and Approach
We already have plenty of stable, reliable stream processing frameworks, so
why do we need this one?
Most stream processing is done explicitly using a framework, such as Spark,
Flink, Kafka Streams, etc. or via a custom microservice architecture that
leverages a reliable queue or event bus. Many stream processing tasks are
simple transformations, filters, aggregations, annotations, etc. that can be
done at the point of ingestion. This becomes more and more important with IoT
use cases, where ingestion points (the edge) is far away from your operations
VPC(s). In short, we believe that many stream processing tasks can be
accomplished without the use of stream processing systems and/or custom
microservice architectures.
Our approach is to provide a single multi-use binary that can handle most
stream processing tasks, where any stateful interactions are handled by
external key-value stores, object stores or file systems, and more complex
stream processing tasks can be forwarded to the appropriate stream processing
system. This provides the flexibility to deploy the binary to the most
appropriate "edge" (e.g., load balancer, IoT gateway, Lambda function, etc.)
depending on the use case. In addition, it provides the flexibility to rely on
external systems for stateful interactions, which can also exist at the edge
(e.g., Agglo provides in-memory key-value stores), existing on-prem deployment
or cloud-managed deployment.
Agglo/binge is not intended to replace existing stream processing systems.
While there are many cases where it could be the main technology used for
stream processing, it is also a perfect fit for complimenting existing stream
processing workloads, since it can preprocess and route events from the edge
to a centrally managed stream processing system or event bus.
The main insight here is that there is no need to do all stream processing in
traditional SPE. The value here is to provide the ability to tradeoff cost,
latency, throughput, consistency by using binge with other systems. Our
hypothesis is that binge can be used as the basis of any event-driven
architecture, where the binge processes are distributed to different edges (IoT
gateways, LB, Kubernetes, Lambda) and utilize managed systems for persistence
and more involved processing.
While we believe that Agglo/binge is best used in conjunction with existing
stream processing systems, we also believe that it can be the main building
block for IoT event processing, building complex iPaaS
(integration-platform-as-a-service) architectures and any future use case that
requires highly distributed event processing from varied sources.
Overview
The four main components that influence the use of Agglo are shown in the figure below: event generation, event
processing, persistence and additional processing. Events are generated by webhooks (e.g. integrations such as Slack
, Jira, Facebook, etc.), explicit API calls, IoT devices, or really anything that can call an API endpoint. The core
of Agglo is the binge process, which can run as a Lambda/Cloud function, a pod in a Kubernetes ReplicaSet, a process
on compute instances, a process in an IoT gateway, or anywhere that can run a Linux process with a network
connection. In addition to basic stream processing tasks, binge also provides the ability to call external systems
for persistence and additional processing.
The binge component is the icon with a pipe, gear and trifurcated arrows. As shown in the figure, binge runs as
the main event processing process and can also be used for additional processing after initial processing.

Getting Started
To make life easier, it is best to make sure the tests pass before doing anything else. This requires that the
following is installed:
- aws-cli: Local AWS services are used for testing
- go-mock: Mocks are generated at test time (I'll probably just check the mocks in eventually)
- minio-client: Used for testing local object stores
First, make sure everything builds and the unit-tests pass:
make test
Once the tests pass, you can try out some of the Examples to get an idea of how
agglo and binge work.
Configuration Generator
The raw JSON pipeline configs can become very unwieldy. Please use the Agglo Config Generator
to generate all of you configs. A few caveats:
- The maintainer (@kmgreen2) is not a front-end developer. I tried to do things the "React-way", but am sure things can be way better.
- It does not explicitly save your progress, so you should periodically "Download" from the "Editor" screen to ensure you do not lose you edits.
This will be enhanced as I find time or others to help. This was my first React project, so am sure someone else can do way better.
Main Components
There are 5 main components to Agglo:
- Process: A stage in the pipeline that will consume an input map, perform an operation (annotation, aggregation, completion, filter, spawner, transformation, tee, continuation, entwine) and output a map.
- Pipeline: An ordered collection of processes applied to an event.
- External systems: A connector to an external system that can used by a process. Today, we support S3-like Object Stores, POSIX filesystems, Key-value stores, messaging/pubsub systems and REST endpoints.
- Binge: A event processing binary and can run as a stateless daemon, persistent daemon or as a stand-alone command.
- Pipeline configuration: A binge instance is instantiated using the pipeline configuration that contains one or more pipelines and their dependent processes and external systems.
Processes
All processes must implement the following interface:
type PipelineProcess interface {
Process(ctx context.Context, in map[string]interface{}) (map[string]interface{}, error)
}
In general, a process will perform one or more actions based on the input map
in. While the input map can technically be anything serialized to/from
map[string]interface{}, we currently support JSON. Each process will also
output a map, which is the input to the next process in the pipeline, or gets
dropped on the floor if it is the last process in a pipeline. The input to the
first process is the raw event posted to or read by binge.
See the process README for detailed information on processes.
Pipelines
Please refer to the examples for
examples of pipelines.
External Systems
Adding a new external system is as "simple" as implementing the proper interface.
-
KVStores
Current Support: In-memory local key-value store, local DynamoDB and managed DynamoDB
See KVStores for the current
implementations.
Implementing a new key-value store is a matter of implementing the following interface:
type KVStore interface {
AtomicPut(ctx context.Context, key string, prev, value []byte) error
AtomicDelete(ctx context.Context, key string, prev []byte) error
Put(ctx context.Context, key string, value []byte) error
Get(ctx context.Context, key string) ([]byte, error)
Head(ctx context.Context, key string) error
Delete(ctx context.Context, key string) error
List(ctx context.Context, prefix string) ([]string, error)
ConnectionString() string
Close() error
}
-
Object Stores
Current Support: In-memory local object store and Minio (supports most managed object stores).
See ObjectStores for the current
implementations.
Implementing a new object store is a matter of implementing the following interface:
type ObjectStore interface {
Put(ctx context.Context, key string, reader io.Reader) error
Get(ctx context.Context, key string) (io.Reader, error)
Head(ctx context.Context, key string) error
Delete(ctx context.Context, key string) error
List(ctx context.Context, prefix string) ([]string, error)
ConnectionString() string
ObjectStoreBackendParams() ObjectStoreBackendParams
}
-
Pubsub
Current Support: In-memory local pub/sub (Kafka coming soon)
See Streaming for the current
implementations.
Implementing a new publisher is a matter of implementing the following interface:
type Publisher interface {
Publish(ctx context.Context, b []byte) error
Flush(ctx context.Context, timeout time.Duration) error
Close() error
ConnectionString() string
}
-
REST Endpoints
All HTTP/S requests are supported.
Entwine: Immutable, Partial Ordering of Events
Entwine is a special process that provides the ability to "entwine" event
timelines. It is similar to existing blockchain technologies in that is relies
on hash chains for immutability and zero-knowledge proofs for secure
verification of individual event timelines. There is no consensus mechanism.
The assumption is that each event timeline is generated by an individual,
single organization, binge binary or collection of binge binaries.
See the Entwine README for more information.