Use Redis Instead of Spark Streaming to Count Statistics
Posted on 22 Nov 2015, tagged big data
redis
spark
In my work, I need to count basic statistics of streaming data, such as mean, variance, sum and so on. At first, I’m using Spark, then Spark Streaming. But after a while, I reimplemented it with Redis and find it is much better. I’d like to talk about them in this article.
The Problem
Let’s talk about the problem first. I’ve simplified it: assume we have a vector \(\vec{a}\), and there is a data stream, each element of it has the structure like (i, v). When an element arrives, we will update \(a_i\) to \(a_i + v\). We want to get basic statistics of this vector, such as mean, variance and sum.
Analyzing the Problem
Sum and mean is very easy to compute, the tricky one is to compute the variance. We will use this formula:
\(variance(\vec{a}) = {mean(\vec{a})^2} - {\sum_{i=1}^n a_i^2 \over |\vec{a}|}\)
So we need to count the sum of squares: \(\sum_{i=1}^n a_i^2\).
Since \(a_i\) is changing as the data is coming, we need to keep tracking of all the elements in \(\vec{a}\). This is the key to solve the problem.
Next, I’ll show you how to compute sum of squares with Spark Streaming and Redis.
Using Spark Streaming
def updateFunc(newValues: Seq[Double], oldValue: Option[Double]): Option[Double] = {
Some(newValues.sum + oldValue.getOrElse(0))
}
val sumOfSquares = sc.updateStateByKey[Double](updateFunc _).map(a => a * a)
The code is clean and easy to read. But there are two problems, both are about updateStateByKey
:
-
updateStateByKey
has a state in memory, we need to enable checkpointing for it in order to support fault tolerance. And in the case of updating the code, we need to save the state by ourself, as I have specified in an earlier blog. When the state is big, it will be very slow and complex. -
updateStateByKey
is not so fast. It will run against all the elements in it every time. It is not necessary in our situation. The Spark guys seems to realize this problem, too.
I tried this program with billions of elements and save the state data into Cassandra. I ran this on a cluster with 4 machines, each of them have 32GB RAM. The program simply cannot save the state data into Cassandra and crashed.
So what do we need? What we really need is just a place to store and update the vector \(\vec{a}\). Redis is the perfect tool to do this thing.
Using Redis
We will store the whole vector into Redis: for each element in the vector, we will use i
as the key and \(a_i\) as the value. When a new element arrives in the data stream, we will update the key-valule and the sum of sqaures. Here is the code:
val redis = RedisClient()
val resultKey = "sum_of_squares"
stream.foreach { elem =>
val newValue = redis.incrbyfloat(elem.i, elem.v).getOrElse(elem.v)
val oldValue = newValue - elem.v
val incValue = (newValue * newValue - oldValue * oldValue)
redis.incrbyfloat(resultKey, incValue)
}
Comparing to the Spark version, this has been turned into a single thread program (Redis is a single thread program). But this is a realtime data stream, CPU should not be the bottleneck before IO. And this program uses no more memory than the Spark version. If the memory doesn’t fit into a single machine, we can use Codis (or other Redis Cluster solutions).
This program can compute billions of elements in one day. And the memory usage of Redis is about 100GB for billions of keys. (The raw problem I’m solving is more complex than I described here so I’m storing more data than this problem needs.)
Conclusion
At first, I use Spark Streaming to solve this problem becuase it seems to be a “big data” problem. But after analyzing the problem, what I really need is just a place to store state. And CPU is really not the bottleneck. Spark is good for batch processing but not so good at this kind of problem. Choosing the right tool instead of the most awesome tool is very important.