Introduction
Designing a data pipeline comes with it's own set of problems. Take lambda achitecture for example. In the batch layer, if data somewhere in the past is incorrect, you'd have to run the computation function on the whole (possibly terabytes large) dataset, the result of which would be absorbed in serving layer and correct results are reflected. In the speed layer alone, we have the problem of data latency, data lag, data correctness, data completeness, and out of order data. Data might come later than expected, there might be an unusual delay from one stage of pipeline to the other stage of the pipeline (lag), it might be incomplete, some data might have gone missing or lost somewhere in the pipeline (completeness). The data might have been corrupted at the source or at a particular place in the pipeline where a computation function is applied to it (correctness), or data might not come in order, some packets arriving later than usual (out of order).
The purpose of this post is to illustrate how to deal with data completenes, data lag and out of order data using a dictionary with expiring keys with a TTL attached to them, called dictttl. The github repo can be found at
https://github.com/srivassid/DictTTL (link)
You can install it by typing
pip install dictttl
The code for this post can be found at
https://github.com/srivassid/DictTTLExample (link)
Problem Statement
The problem statement at hand is to make sure the data quality in the speed layer is good. This is done by tackling data lag, correctness, latency, completeness and out of order data. In this post i will be working with data completeness, lag and out of order data problems.
Definition
1) Data lag: Data lag is defined as the time taken by data to go from one stage in the pipeline to another stage of the pipeline, not to be confused with latency, which means time taken by packet to go from source (for example a sensor) to the server.
2) Data Completeness: If we have received all the chunks of data for a particular key (or sensor) in a given time frame, we say data is complete.
3) Out of order data: If the ordering of data is not strict, with packets having earlier timestamp arriving after packet with later timestamps, then it is an out of order problem.
Sometimes data lag is too high, either because of plubming of the pipeline, or simly because sometimes we have too much data in a particular window for a key, so running the computation function on it takes longer than usual.
At times, computation functions drop some data, or all the packets are not picked up at the source, which causes data completeness problem.
And finally, streaming tools such as kafka or sensors do not always send data in order. It is important to handle those scenarios as well.
Approach
The consequence of
a) lag is that the data packets arrive late,
b) completeness problem is that all the data packets do not arrive, and
c) out of order problem is that some packets arrive later than the others.
Now, my current, really simplified version of data pipeline looks like this
Streaming Tool > Big data tool > Storage tool
For example, data would be arriving at kafka from a sensor, from there it will go to spark for aggregation and from there it will be stored in MongoDB.
I can put a check at big data tool stage to make sure i handle the mentioned problems. How to i put a check?
To handle lag, i can wait for say, n units of time to make sure all of them arrive. If they do not arrive within that time, i can 1) choose to discard them entirely or 2) log them that they arrived late, process them and update serving layer with the new results. In this post, i will choose to discard them. Also, these packets are called straggler packets.
To handle out of order problem, i can 1) choose to discard them entirely, or 2) take them in to account in any next window that they come in, process them and update my serving view. In this post, i will choose to discard the ones that come early. Those which come late, they will end the current window, aggreagting the data. But, this is not the perfect approach, if a packet with a timestamp 11:01 comes at 10:58 it will end the window, and packets with timestamp 10:59 will not be processed. In this case, we can create a buffer, for example we can say that 1% of out of order is allowed. For simplicity, i have not taken this in account in this post.
To handle data completeness problem, i can use a metric based on the value of the sensor. For example, i can say that the sum of all the values for a sensor in a particular window should be at least m. If it is less than that, i would discard it.
With this logic in mind, let's design the simplified version of the pipeline.
Design
To design the pipeline, i will be using Kafka and Pandas. Kafka for streaming the data, and pandas for aggregating and quality checks. I have chosen not to store them in any database or flat file.
To handle out of order and lag situations, i will wait for 5 seconds till all the data from the sensor arrives. If it does not, i will discard it and move on.
To handle completeness problem, i will sum all the values for a sensor and say that it should be at least 7000. If it is less than that, i will discard that row (and use batch layer to compute the speed view).
I will send data from a producer to a consumer, producer will send data regularly, except at two intervals. At one point it will send a custom value of 1 to demonstrate completeness problem, and at another point it will sleep for 7 seconds to demonstrate out of order and lag problem.
The consumer will consume the data, by creating a tumbling window. I create a window of 5 seconds that will tumble from one to the another, and within a window i will collect all data points for a sensor, aggregate them (mean), and print them.
I use a python library called dictttl for the purpose of handling lag and out of order data problems. In a nutshell, this is a dictionary that has keys that expire after a manually set time.
Producer
To generate data, i have used the following code, it can also be found at github .
It is basically a command line program that taken in an input sensor and range(value_start, value_end), and it generates data based on those parameters, and sends to to the topic. Also, sensor id is the same, sensor_1, and i am using defaultDict(list) variant of the dictttl, so the values would be appended to a list for a particular window, like this
{ key : (timestamp, [ data ])}
e,g
{'sensor_1' : (16000000000, [1,2,4,4,2,4,5,5,6,3,1,2,3,4]}
The part
makes sure that once a reaches 10000, it sends data value of 1, so that the consumer can check the sum of elements and discard it. It will be shown later
The part
makes sure that once a reaches 20000, it sends data but then halts for 7 seconds. This would be useful later.
Consumer
The consumer consumes the data and checks if the timestamp of the data is less than the starting point of the window. If it is less, it discards it, partially solving the out of order and late data problem (which could have arisen from the packets belonging to data from previous window), it if it in the window, it appends it to the expiring dict, which has a time to live of 5 seconds.
The part
checks if the data is out of the window. First, it checks if the expiring dictionary is empty, if it is, it creates the next window and continues. The dicitonary could be empty because it might have expired, as it happens because at one point in time the producer halted for 7 seconds, and the threshold for discarding data was 5 seconds.
If it is not empty, it creates a dataframe from the dictionary, and the following lines
will check if the sum of all the values for the sensor are more than 7000. If not, i will print "drop the row".
The following line
will make an aggregation and prints it.
Result
The result looks like the following
Note that the first 3 timestamps are sequential in order, and at the fourth entry, length of dict is 0 becuase the dictionary expired, and the next timestamp has a difference of 5 seconds as compared to the previous timestamp. Also, the sum of values is less than 7000 in the fourth entry so it is dropped. Next three rows are again sequential.
Summary
In this post, i went through what kind of problems arise when dealing with streaming data, and how to tackle them. I used a library called dictttl which has expiring keys, and in those situations where we choose to discard data they are really useful, as we don't have to manually search and delete those items. As seen above, we handled out of order and lag problem by creating a buffer to absorb data within a theshold and if any data arrives after the threshold, it is discarded. Again, this is a very simple example, and it assumes mostly ordered data, because if a packet arrives ahead of time it will simply create the new window. But in cases where out of order problem is high, we can create a buffer for that too, we can say after 1% of out of order cases we will discard the values.
We also saw how to handle data completeness problem, by taking sum of the values. If the sum is less than the threshold, we simply delete those items.
I hope you enjoyed my post.
Comments
Post a Comment