Totango Engineering

Chronicles of a Distributed Data Pipeline (part 2)

Quick recap

So... in part 1 we had these daily data pipelines running on a bunch of servers. Jenkins schedules the daily runs and Luigi manages the logical flow of each pipeline. The system works, but it's starting to show strain as our data grows. There's a new requirement to allow triggering these pipelines ad-hoc and instead of a quick fix, we decide to re-imagine everything.


First we needed a reliable way to buffer pipeline requests while they wait for free Luigi workers

Here at Totango we're deeply involved in many aspects of the AWS ecosystem. We leverage ec2, S3 and other Amazon services to do awesome stuff, so SQS was kind of a natural choice for us as a queueing mechanism.
What we discovered is that beyond simply buffering pending pipelines, SQS can be applied rather elegantly to manage several other critical aspects of the system


SQS is designed to ensure that each queued message is only available to a single client at any one time (not a simple feat for a distributed queue!). Once a client reads a message that message becomes invisible to the world for a short (and configurable) time window. Within this window the client decides whether to delete the message or set it free again. This makes SQS perfect for a distribution mechanism to share pipelines among our cluster of Luigi workers! All the workers poll the queue and we can be assured that each pipeline will be taken by only one of them. Also worker nodes can easily be added or removed without much configuration.

One concern we had was how to assure that the tasks are distributed evenly. Theoretically, the allocation should be random, nonetheless we threw in a neat trick to improve on this. The polling rates of our workers are set to be inversely proportionate to how busy they are. So for example a worker with one task currently running will poll SQS every second for new tasks while a worker with 3 tasks running will poll SQS only once every 10 seconds. This works nicely, it spreads the tasks out evenly without any need for the workers to coordinate with each-other. Here's an analytics view of our workers doing their stuff:
Each color line here is a separate worker node, max pipelines per worker at any one time is set to 5. Notice how generally all the workers are at capacity, but they receive new jobs evenly.


Hard fact of life, pipelines fail sometimes.
Instead of rerunning the pipeline with application logic, we simply reintroduce the pipelines trigger into SQS. This has 2 big advantages:
1. In case the failure was caused by a problem with a specific worker node, retrying on the same node will just make the problem worse, instead we allow the pipeline to be redistributed to any of the other nodes on its retry via the queue.
2. The invisibility window mentioned before gives a nature way of spacing out retries. We add a few seconds more to this window on each successive retry so that the newly queued message will not be immediately available for workers. Just watch out for one gotcha: Max window length in SQS is 15 minutes! If you try and increment the window passed this amount you'll be woken up in the middle of the night with tons of SQS failures which helpfully tell you so.

API Trigger

Most pipeline tasks need to run on a daily or hourly basis, however sometimes we need to fire one off ad-hoc. For example, we use this to rebuild client data after a configuration change so as not to wait an entire day to see the changes implemented.
This means we need some sort of API endpoint to accept these ad-hoc requests. Such an endpoint would need to run somewhere, or in several places so as not to introduce a single point of failure. So this is quickly turning into a bit of a headache. Unless... your using something like SQS which has its own distributed messaging API anyway!
It's just simple REST over HTTP with the message to be queued sent as JSON, life as it should be.

Priority Queue

Another issue that came up is how to allow some pipelines to push their way to the front of the queue. For example, hourly tasks need to run more urgently than daily tasks, or if we need to trigger an urgent pipeline to fix something we don't want it to wait for hours in the queue.
Within a traditional system this would be a job for a priority queue, unfortunately in the world of distributed processing life isn't so simple. SQS doesn't guarantee anything about which order queued messages will be retrieved, it's more or less random. This is the price paid for the high reliability offered by SQS's distributed system. While some other alternatives like Kinesis at least offer FIFO behaviour, we couldn't find any suitable solution with priorities.
In the end we came up with a simple solution - more queues.
Actually our system runs 3 SQS queues in parallel, one for high priority messages , one medium and finally, low for offline batch tasks.
Our worker instances poll all three queues in that order. So first the high priority queue is polled, only if nothing is waiting there then the next priority is polled and so forth. On the financial side, Amazon charges per message not per queue so there's no added expense for this setup.

Onto Worker Nodes...

Speaking of polling the queue, this would also require something new on the Luigi workers side. As mentioned previously, Luigi isn't an active server. Rather Luigi python scripts run workflows on a once off basis when triggered. So we needed some sort of Logic to sit on the Luigi instances and connect Luigi to SQS. This logic would be responsible for executing pipelines with retries and any needed synchronization. We wanted some sort of lightweight server technology which would be easy to work with. Enters Dropwizard...

Aharon Levine