In the beginning
Here at Totango we crunch loads of data. Where it’s possible we try to do this in realtime, however inevitably most of our meaningful analytics are processed in batch pipelines on a daily or hourly basis.
When Totango was in its infancy these pipelines were basically a vast Luigi tree of tasks which were triggered periodically by Jenkins Cron builds.
As our customer base grew, we realized that we needed a far sturdier beast to subdue the legions of data threatening to overwhelm us.
We set out on a quest to find this creature…
Fast forward several months
We still base our individual pipelines on Luigi, however now these pipelines are orchestrated by a cluster of self organising Dropwizard servers. New pipeline requests are received by priority via SQS and processed by whoever's free in the cluster.
We’re very happy with the setup, the system is rock solid and scales easily.
Of course development is an ongoing process but we feel now that our setup is flexible enough to keep ahead.
In the next few posts I’m going to try share this journey - the challenges , solutions and design decisions we made.
Our data pipelines are built on Luigi, a workflow framework made by Spotify.
Luigi is cool since it allows organising a bunch of tasks as dependencies of each other in a treelike fashion. It then manages running these tasks efficiently, deciding which tasks can run in parallel and which ones need to wait for inputs first. One can choose to run all the tasks from any point in the tree onwards if needed or rerun only the uncompleted dependencies in the case of a failure.
Each of our Luigi tasks does different cool things on our data, we have Spark jobs, Elasticsearch indexes and more, Luigi is the glue that holds it all together.
Also, in comparison to other workflow frameworks, Luigi is refreshingly lightweight. Most workflows require the task dependancy tree to be defined separately in a configuration xml file (in standard formats like XPDL). Luigi simply exposes a method called 'requires' on each task object which lists that tasks dependent tasks recursively. So there is no 'overall' definition of the workflow, rather each stage of the way decides what happens next. The workflow can even be changed dynamically at runtime! Anyone who's experienced the relief of discovering REST with JSON after struggling with SOAP knows the feeling...
Another advantage of Luigi is that its written in Python. Actually this surprised me since I'm a big fan of using strongly typed languages for business logic. Maintaining order in a high performance codebase is difficult enough, adding dynamic variables to the mix is, in my opinion, a recipe for disaster. However regarding the workflow, which is sort of like the glue which holds the business logic together, it's quite nice to have plain Python. It doesn't get in the way as much and is easy to play with.
We did have issues integrating our Java business logic into this since the Luigi tasks have to run the Java directly and this gets messy. We're still thinking about the best way to tackle this, hopefully I'll post our endeavors on this soon!
The issue with Luigi is that it’s a static framework which focuses on a single pipeline of commands on one system. Luigi runs all its Tasks as new OS sub-processes, they can run in parallel within the bounds of a single server but theres no support for distributed architecture. Furthermore something needs to schedule these Luigi runs (with retries on failure?).
At the outset we tried to work around these problems as simply as possible. We set up a bunch of servers with Luigi. These servers needed to run a daily pipeline for each of our clients (scattered around different time-zones) at midnight. We had a simple hash on the client id to make sure that different servers ran different clients to sort of share the workload. The pipelines were triggered by Jenkins Cron, this triggering happened every half an hour so that failed pipelines would retry.
This setup worked more or less. Of course the workload wasn’t really evenly spread and a server failure meant that pipelines for several of our clients (with that servers hash) wouldn’t run. However it gave us breathing room to get up on our feet.
Then came a requirement to rerun daily pipelines ad-hoc…
Sometimes clients make configuration changes which affect computations, no one wants to wait 24 hours to see these changes taking effect. Additionally we needed to control concurrency - not to rerun a client’s pipeline while their normal daily pipeline is still running. We could have somehow bolted this new functionality onto the existing system as a web service endpoint which triggers pipelines alongside Cron, however we decided that this was a great time for a makeover!
In general, this is a super design philosophy which I’m going to repeat many times in this story: Don’t overdesign up front… But bravely refactor when the time comes.
Brave new world
After some consideration, we decided that our pipeline system would need the following qualities:
- Pipelines should be distributed evenly over a cluster of systems.
- Scheduling mechanism for daily runs per service
- On demand mechanism for ad-hoc pipelines
- Failed pipelines need to retry
- Resilience: Hardware or networking should never cause pipelines to be lost
- Queueing mechanism for when the cluster is at full capacity
- Mutex to make sure certain conflicting pipelines don’t run concurrently