Seeking guidance on data processing using Apache Spark


#1

I have a table in Apache Kudu with millions of rows. Each row captures a value for a measurement type for a device id. Assume the same device can output multiple measurement types. Each set of measurements is captured every minute.

For each value of a measurement type for a given device id, I would like to compute the mean and std-deviation of corresponding values over last 15, 30, and 60 minutes.

If I were to accomplish this in a traditional procedural way, I would select a bunch of rows where a computation has not been performed and loop over those records. For each record I would retreive the previous 15 minutes worth of data for the corresponding device id, measurement type and update that row.

However this is terribly inefficient.

Not being well versed in Apache spark, I am looking for guidance and tips on how I might accomplish this using set based operations.

An “algorithm” I came up with is, get the first row (via min) that has a null for the statistic value, then retrieve rows that are 15 minutes behind and 15 minutes ahead (30 minute window) and for each row with null stat compute the stat as a moving window. This does not seem clean.

Any recommendations or clues highly appreciated.


#2

Spark’s strength is for parallelizing computations that don’t easily fit into memory for a single computer. I’m not sure that Spark is necessary for this task. Can you load all of the points for a value into memory and then operate on them? Finding all of the means and std devs for even millions of data points is pretty fast. This code takes less than a second for me.

import numpy as np

N = 30  # number of points in the window
data = np.random.randint(100, size=1000000)  # generate some random data to work with
idx = np.arange(N) + np.arange(len(data)-N+1)[:,None]  # this index range matches from point i to point i + N
means = np.mean(data[idx], axis=1)  # calculate _all_ of the means using a sliding window based on idx
stddevs = np.std(data[idx], axis=1)  # calculate _all_ of the std devs using a sliding window based on idx

# check:
assert np.mean(data[100:100+N]) == means[100]  # the mean of the data from 100-100+N is the entry in means
assert np.std(data[100:100+N]) == stdevs[100]

#3

@aneel Thanks for the response and pointers.

This would mean I would be pulling the data from the Kudu tables (disk) into memory to process. I thought that was not the recommended way on “big data” platforms.


#4

Let me ask a context question: do you want to use Spark for this in order to get familiar with Spark and “big data” tools in general? Or because you think that Spark is the right tool for this particular task? Doing this to learn it is a totally valid thing, but your comments about efficiency make me think that you’re looking for the right tool.


Assuming you’re looking for the right tool:

If you think about it conceptually: You can’t do math on numbers on disk. You’ll need to get that data off of disk into memory to process it, no matter how you do it. If you can make a working set that fits in memory on a single machine, the most efficient thing is to do it all at once because you don’t spend any extra time loading and unloading. With modern computers, a few million numbers is not “big”, so it seems very reasonable to load it all.

Aside: It might seem like you can make your working set smaller by only loading data around the Null data. But how do you find those Nulls? You probably load each row of the data and check if it’s Null. So really, you’re still loading all of the data, even if you discard the part you don’t need. Note that there’s an important case where you can avoid loading a bunch of data. After you’ve filled in all of the Nulls for the past, you can definitely say “only look at the data after the last date we ran this process”.

Aside: What’s the working set? A minimal working set for this is enough data to calculate a window over 60 minutes for one value. Does it make sense to limit yourself to that small a working set? Not if you can avoid it. Doing one big load is probably faster than doing a lot of little loads.

If you can’t fit your working set into memory, you’ll have to decide how to divide it up and load it. Spark is good at helping you do that. With Spark you build a model of the computation you want to do, and you let it figure out how to divide the loads and computations across multiple machines. But, in any case, the data’s going to have to be loaded.


#5

@aneel Thanks again for taking time to comprehensively explain the mechanics. You are right, the actual computation has to be performed in memory and its availability determines the “working set”

Since the data I need to work with is on a Hadoop cluster, (expected to grow to billions or data points over the years we are expecting it to be tracked), I assumed Spark was the right tool for cluster computing.

The reason I was limiting to a 30 minute window when computing stats over 15 mins is because I am working with streaming data. My plan is to have a cron job kick off the process every minute (since new data arrives every minute) which computes stats over the window. At this point I have abut 2 millon rows without computed stats. If I let the program run for a few days, it would eventually catch up to the latest row and from that point onward, the stats would be computed for the latest row for the past 15 minutes.

Just wondering if I am over complicating this and if there’s a simpler way.


#6

Aha. Yes, that context does help make the case for using Spark. Have a look at this:
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html (especially the parts about aggregations and windowed operations)

I would argue against a cron job running every minute, because each separate job will duplicate a lot of the work done by the previous job. Better to have a single long-running job that doesn’t incur the startup overhead repeatedly. After each batch is finished, it can sleep until the next minute rolls around.


#7

@aneel this is exactly what I was looking for. THANK YOU! I realized my initial attempt is more complicated than necessary.