Engineering Blog

Flume in Analytics Realtime Pipeline

by Ooyalan ‎05-30-2017 10:34 AM - edited ‎05-30-2017 10:35 AM (5,342 Views)

Written by Swapnesh Gandhi 5/30/17




In early 2016, we were building a JSON API to send data to IQ, we have a realtime pipeline which powers to IQ dashboard page. We'd receive Thrift and JSON pings in our logger machines, we had a process called livelogsd pushing data to Kafka for thrift data, Livelogsd would tail the player log file and send everything in that file to Kafka. 

To support JSON API in IQ dashboard, we have two choices:

1. Make changes to Livelogsd to support JSON API.

2. Revisit the Livelogsd model and use an open source project for this purpose instead of Livelogsd.

Using an open source project like Flume has its advantages. We do not have to maintain our own code for a use-case that can be done with open source project which is better maintained.



Why Flume?

If we take a step back, and look at the use case again, we needed a service which would store data to a datastore (Queue) where another service can process that data at a later time. This use-case fits Flume perfectly.

Flume has a model wherein it has a concept of source -> channel -> Sink.

Flume reads data from source, stores it in channel, and Sink would be the final storage of the data. As mentioned before this fits our use-case perfectly as we have clearly defined source (Nginx/ files) and a Sink (Kafka).




We do not need any heavy processing from Flume, however we need to process each ping and add a timestamp of when it was received. We do not need any aggregation of the data at this time, as we want raw data to be pushed to Kafka. Something like Spark would not be a good fit with our use-case.


Screen Shot 2017-05-22 at 2.11.25 PM.png



Source: For our use case, we can use two kinds of Flume sources (Taildir and HTTP).

We chose HTTP for a few reasons:

  • Taildir source : Has this nice feature where Flume reads files from a certain directory but we found a bug wherein, it would hold a file handler until it completely empties it and won't consume file with the same name, which means it is sequential, since we want data processed in real time, this is an issue for us.


  • HTTP source: Since we are using HTTP to pass messages, we do not have worry about filesystem operations, why this is important is that working on file system can cause weird bugs like mentioned above, and the network solution is much cleaner


Channel: We have two options, Memory Channel or a FileSystem backed channel (more reliable). We chose Memory Channel for performance reasons, which also means that we could lose the data in case Flume process restarts. In case of real time pipelines we do not need very reliable system since our batch pipeline would process the data that Flume might miss.


Sink: We have used Kafka Sink, since we need data to be ultimately pushed to Kafka. We do use 'Async' producer for Kafka which is highly performant in high load conditions. This can batch messages before sending them to Kafka brokers.



Performance Issues

We compared the performance of Flume with Livelogsd, and we saw few issues with Flume. We used CPU profiling to find out how we can solve those issues.

  1. We did not use "if (logger.isDebugEnabled)" while logging, this was causing performance to be impacted negatively. 
  2. Another reason was most of the Flume framework is supposed to use a Java collection however, we used Scala collections for our implementation, and these collections were being converted back and forth between Java and Scala and this was affecting performance as well.

After making improvements on these issues, we are able to get processing throughput of over 3k/sec on a single logger box.