Last year, I worked on a project at my job which involved building a data processing pipeline which integrated tasks like the extraction of information from documents using Natural Language Processing (NLP) and content recommendation using Machine Learning (ML).
Many tasks, including the above examples, are often heterogeneous and need to be interoperable. For example, in the domain of Artificial Intelligence (AI)/ML/NLP/data science, libraries written in Python, Java, and R are commonly used. Furthermore, tasks are often reactive, i.e., one computation is triggered from another, and larger tasks can be split into smaller subtasks. Tasks can frequently also be performed in parallel.
So-called workflow managers and job/task queuing and scheduling systems are popular components of data processing pipelines.
Inconveniently, most of these systems are only tailored towards one, or a small set, of programming languages. They also often assume that tasks can be processed by programs with a short startup time and are ephemeral. However, some programs of data processing pipelines may need to be long-lived since they have long startup times. For instance, this might occur when a large ML model needs to be loaded into memory. Most systems also make the assumption that flows are directed acyclic graphs (DAGs) of tasks and are of a batch nature (not continuously running).
I initially looked at popular workflow managers, such as Airflow by Airbnb, Luigi by Spotify, and Pinball by Pinterest, as well as popular job/task queuing systems like Celery and Sidekiq.
Unfortunately, none of these existing systems fit the requirements I mentioned above, since the workflow managers are heavily tied to Python and only support batch jobs. The job/task queuing systems are also tied to a single language (Python and Ruby), making it difficult to integrate code written in other languages. All of them are complex to configure and operate because they implement their own queuing system, program scheduling, and other features.
A simple, yet powerful and flexible solution can be implemented using off-the-shelf components.
Splitting the system into components
Let’s look at how to architect a system which fits the requirements by breaking it into smaller components. Basically, a data processing pipeline can be defined with two concepts:
Workers: Programs which accept incoming messages, perform a single computation, and optionally generate output messages.
Messages: Information flowing between workers, allowing them to communicate flow/task details.
Therefore, processing pipelines are constructed by linking workers through messages, though it is not a data streaming solution: Workers do not move data from one to the other – messages passed between them only contain metadata. More specifically, let’s define messages to consist of an event name and a payload. Pipelines are coupled through event names alone. Each worker defines which events it listens to and which events it emits. The payload could be formalized, but can also be more loosely defined.
Two types of events allow the constructions of almost any pipeline:
Most events should only be delivered to one worker of a certain type. This allows the parallelization of time-consuming tasks, e.g. processing of documents or generation of recommendations, by distributing them among multiple workers.
Some events should be delivered to every worker of a certain type. This allows informing all workers of environment changes, e.g. updating of state or invalidation of caches.
The task to extract entities from a document would be performed by a worker called
entity-extractor. Documents can be processed in parallel so that there are multiple workers of this type.
Here’s what this would look like:
Each entity extraction worker would listen to the event
document-added, indicating that a document was added to a data source and emit the event
entities-extracted once it finished processing a document. The payload would be the ID of the document.
Only one of the workers of this type would receive such a message, as every document should only be processed once. Another worker could listen to the emitted event to perform further operations on the document.
The entity extraction workers would also listen to the event
entity-added, which indicates that a new entity was added to the data source of the worker, like a database storing information about entities. Messages with this event would be delivered to every worker of this type, as every worker needs to update its state. The payload would be the ID of the entity.
To build a system based on workers and messages requires two systems:
Scheduling system: responsible for scheduling workers
Message Queueing system: Also known as a message broker, responsible for routing and delivery of messages between workers
Instead of reinventing the wheel and building these systems from scratch, we use established off the shelf software:
Kubernetes: a platform for automatic deployment, scaling, scheduling, and management of containerized applications
RabbitMQ: a message broker implementing the AMQP 0-9-1 protocol, with support for message queueing, reliable delivery, and flexible routing. It also includes many useful extensions, such as delayed message delivery
Both Kubernetes and RabbitMQ are production ready software: they are reliable, scalable, and can be run in a highly available configuration. They have great support for operations and monitoring (metrics, logging), and have great tooling (CLI, UI).
Containerized, supervised workers
Kubernetes allows us to run any program as a worker, as long as it is containerized. This has the advantage that workers can be written in any programming language.
To avoid having to reimplement message queuing code in every language we want to support and to decouple the worker code from the message queuing environment, we extract that logic and implement it once in a single binary. We will call this process the supervisor. Instead of the worker process communicating directly with RabbitMQ, the supervisor performs the configuration of the RabbitMQ exchange and queues and is responsible for receiving and sending messages to and from RabbitMQ.
In contrast, writing a library for every supported language would mean that one would have to reimplement all the details of message consumption and production, which has the potential of creating inconsistencies and introducing more bugs.
We chose to implement the supervisor in Go, because it lends itself well to generating small static binaries, and since well-written and robust RabbitMQ libraries exist for it.
The single binary idea is inspired by and similar to a combination of amqp-consume and amqp-publish for long-running processes.
The worker process only communicates with the supervisor via named pipes (FIFO), an extension of pipes which can be used for IPC, through a simple line-based protocol.
Using named pipes and file-based IPC have the benefit of supporting as many programming languages as possible since receiving and sending messages is as simple as reading from/writing to files, which is even supported in shell scripts. In addition, both the worker process and the supervisor can be tested independently from the queuing system using a simple file-based protocol.
You might wonder why named pipes are used instead of using standard input and standard output pipes already provided to every process: We don’t want to interfere with these since they are likely already used in existing code bases and a migration might be expensive. For instance, standard output is frequently already used for logging output.
As you can see above, the supervisor listens to messages from RabbitMQ and delivers them through a named pipe to the supervised worker process. Once a message has been processed by the worker process, it reports the status back to the supervisor through another named pipe (output). It may also send messages, e.g. to trigger other workers.
IPC using named pipes
Let’s look at how the worker process and supervisor process interact in more depth.
When the container starts, the supervisor is started in the following manner:
The supervisor creates the two named pipes using
The supervisor starts the worker process and waits until it exits.
The configuration for the supervisor is provided in environment variables:
SUPERVISOR_WORKER_PROGRAM: The worker program the supervisor should start
SUPERVISOR_WORKER_ARGUMENTS: A comma-separated list of arguments which the supervisor should pass to the program it launches (given in
SUPERVISOR_WORKER_NAME: Name of the worker
The configuration for the worker process is also provided by the supervisor through environment variables, i.e., the paths of the named pipes (e.g
Now that both processes are running, the worker process may initialize itself – it won’t receive any messages until it has opened the files.
The opening of the files needs to be performed in a specific order so as to synchronize and avoid a deadlock, as opening a pipe read-only or write-only is blocking (see the part about
O_NONBLOCK in the man page for
The worker process opens the input file for reading (mode
os.O_RDONLY), which blocks the process, as the supervisor has not opened it yet
The Supervisor opens the input file for writing (mode
os.O_WRONLY), which will not block, as the worker process has opened it for reading
The worker process opens the output file for writing, which again blocks the worker process, as the supervisor has not opened it yet
The supervisor opens the output file for reading, which will not block, as the worker process has opened it for writing
NB: Since the files are named pipes, they need to be opened with the mode
Once the files have been opened, the supervisor will start sending messages to the worker by writing them to the input file, one message/line at a time. It will wait until the worker has processed a message before writing a new message.
This can be accomplished by starting two goroutines: one which reads and decodes lines from the output pipe of the worker process (e.g. using
bufio.Scanner) and a second which consumes messages from RabbitMQ and writes them to the input pipe.
Once the worker has finished processing a message, it needs to write a line to the output file indicating to the supervisor whether or not the message was successfully processed. In addition, the worker can emit multiple messages in a single completion line. JSON could be a suitable encoding for this purpose.
When the worker process exits, the supervisor uses the exit code to determine if the last message was processed properly, if any.
The supervisor also needs to forward the signals it receives, e.g.
SIGTERM. Most importantly, if it receives
SIGTERM, it should stop consuming messages from RabbitMQ and try to stop the worker process by sending it the termination signal and waiting a configurable grace period to allow the worker to clean up. If it doesn’t terminate in the allowed time, it is finally sent the kill signal.
There is one edge case: If the worker terminated abnormally and both pipes were not successfully opened, the supervisor needs to perform the opening, as opening the pipes in the supervisor will deadlock if there are no equivalent open calls from the child process side.
The process for the supervisor to perform the opening is as follows:
Open the input pipe read-only
Open the output pipe write-only
Close the input pipe
Close the output pipe
The blog post Introduction to Interprocess Communication Using Named Pipes helped me understand the details and edge-cases when using named pipes for bi-directional IPC.
Scheduling workers using Kubernetes
Workers are scheduled using Kubernetes. As it manages containers, we need to containerize the worker by creating an image which consists of the supervisor and the worker binaries. Once bundled in an image, Kubernetes’ deployment concept can be used to describe the desired state of the worker instances (image, environment variables, count).
Once applied, Kubernetes takes care of creating and managing the containers. They can now easily be scaled, either manually, by specifying the number of pods, or automatically, such as when based on CPU usage of the containers using the Horizontal Pod Autoscaler. The automatic scaling could also be extended to be based on RabbitMQ queue sizes.
The supervisor doesn’t need to worry about worker process crashes: it simply terminates itself. Kubernetes supervises the whole container and will restart it if needed.
Exchange, queues, and consumers using RabbitMQ
Message queueing proceeds as follows:
First, the supervisor declares a single exchange, where all messages are published. The following options are used when declaring it:
direct: all incoming messages are routed to the queues based on the routing key, which is simply the event name.
false: ensures the exchange is not deleted when the last queue is unbound from it
true: Ensures the exchange survives RabbitMQ restarts/crashes
Recall that we have two types of messages/events: ones that should only be delivered to one worker of a certain type, and ones that should be delivered to every worker of a certain type. To support the two types of messages/events, we will have two types of queues:
Shared work queues: to distribute messages among multiple workers of one type. RabbitMQ’s documentation includes a helpful tutorial for work queues.
Unique worker queues: for messages that need delivery to every worker. RabbitMQ’s documentation includes a helpful tutorial for routing and multiple bindings.
Therefore, the supervisor declares one queue of each type using the following options:
false: Exclusive queues can only be used (consumed from, purged, deleted, etc.) by their declaring connection. However, shared work queues should be usable by multiple workers.
For the shared work queue
false: The queue should never get deleted
For the unique worker queue
true: The queue should get deleted when its single consumer unsubscribes, i.e., its worker stops or restarts
true: Ensures the queue survives RabbitMQ restarts/crashes
RabbitMQ provides a tutorial for the publish/subscribe pattern, which at first seems like a suitable approach for implementing the unique worker queues: A
fanout exchange is declared and queues get auto-generated names by using the special empty name (
""). However, the queues won’t survive a RabbitMQ restart.
Instead, we use the single, direct exchange and create unique queue names using UUIDs.
Now that the exchange and the queues are declared, the queues need to be bound to the exchange. The
routing-keys are simply the event names the worker/supervisor should listen to.
The event names are given to the supervisor again through environment variables:
SUPERVISOR_SHARED_EVENTS: A comma-separated list of event names to listen to (one worker instance will receive the message)
SUPERVISOR_UNIQUE_EVENTS: A comma-separated list of unique events to listen to (all worker instances will receive the message)
Finally, the supervisor can start consuming from the two queues.
Messages in the shared worker queue should be fairly distributed to all workers consuming from it, i.e., new messages should be dispatched to the next non-busy worker and no new messages should be dispatched to a worker until it has processed and acknowledged the previous one.
This behavior is achieved in RabbitMQ by setting the channel prefetch setting (QoS). In particular, the
prefetch-count option needs to be set to at least
1. RabbitMQ’s has detailed explanations of consumer acknowledgments and publisher confirms, as well as consumer prefetch.
Also, when consuming from the queues the
exclusive option has to be set accordingly:
For the shared work queue
false: Other workers of the same type also need to be able to consume from the queue
For the unique worker queue
true: Other workers should never be able to consume from this queue
Once the message was consumed and successfully processed by the worker process, the delivered message can be acknowledged.
The final piece is the messages themselves. Messages should never get lost, so we also need them to persist. Because the durability of a queue does not also make messages durable, the
delivery_mode publish option has to be set to
Queues and workers using RabbitMQ and Kubernetes
Now that we covered the implementation of both the worker scheduling and the message queuing part, let’s have a look at how the pieces fit and work together in a larger example:
Here, we have two types of workers,
B. There are two workers of type
A, and two workers of type
Workers of type
A listen for messages of event types
E2, which are both shared work events. For this reason, the exchange routes them to the shared work queue
Queue A. The messages are then fairly distributed among both workers:
Worker 1 and
Worker 2. These workers also emit messages of event type
Workers of type
B listen for messages of shared work events
E3. Again, the exchange routes them to the shared work queue
Queue B and they are fairly distributed among both workers:
Worker 3 and
Worker 4. However, workers of type
B also listen to messages of unique worker event
E4. Incoming messages of this type are routed by the exchange to both unique worker queues:
Queue W3 and
Queue W4; and both workers will receive the messages.
We can easily scale this configuration and speed up the processing of messages of event types
E3 by adding more workers of both types.
As I mentioned earlier, I implemented the supervisor binary in Go. Two libraries made that easy and straightforward:
amqp: A low-level AMQP 0.9.1 library which supports many RabbitMQ extensions
cony: A high-level wrapper for the low-level library above. It allows the declarative definition of exchanges, queues, and bindings, and handles reconnects with backoff, redeclaring everything properly