Use-cases/Project
13 min read

Streaming analytics better than classic batch — when and why?

While a lot of problems can be solved in batch, the stream processing approach can give you even more benefits. Today, we’ll discuss a real-world example of user session analytics to give you an insight to a use-case with a comprehensive overview of business and technical problems that modern stream processing technologies like Apache Flink can solve. This post is jointly written by Adam Kawa (CEO at GetInData) and Dawid Wysakowicz (a Flink contributor — previously at GetInData and currently at Ververica) and it is based on the talk “Streaming analytics better than batch — when and why ?” given at Big Data Tech Warsaw Summit 2017 in February 2017. Because the talk was very well received by the audience, we decided to convert it into blog post. Originally we have published it on our website on March 2017, and now we republish it on our new blog on Medium.com.

User Sessionization

Our sessionization use-case is inspired by Spotify. When analyzing user sessions at Spotify, you can compute many basic statistics like: duration of the session, a number of songs, skips, and interruptions as well as more advanced things like the current mood of the user and so on.

User Sessionization — when and why?

You can do at least three types of things with the output of your session analytics jobs.

First, you can visualize KPIs on dashboards. For example, you can show how long users listen to a new episode of Discover Weekly playlist (personalized playlist with a song propositions depending on the songs heard and current recommendations) i.e.how many consecutive songs they listen to or skip.

Secondly, using the stream analytics functionalities, we can spot current users behaviours using multiple metrics and react accordingly. For example, if Australian users don’t listen to Discover Weekly as long as usual on Monday morning, we can quickly trigger an alert to Spotify. Perhaps, we will be able to identify a problem before European or American users wake up and the negative incident can be properly managed.

Third, we can also use the insight from the current sessions to recommend even better songs and ads based on what a user listens to or feels right now.

0LThQo4TotB93NHz6

Of course, the above use-cases can be achieved by classic batch processing, with hourly or daily jobs. But obviously, we can get more value out of our data if we process it real-time with low latency.

Classic Batch Architecture

Many companies use technologies like Kafka, Hadoop, Spark, and Oozie to analyze user sessions.

00krEUSI1yLZfThyJ

How do they do it?

At first, a system needs to gather the data from the users — events that represent users activity are continuously sent to Kafka in real-time. Next, we use a batch tool like Camus to copy events from Kafka to HDFS (Hadoop system) periodically, let’s say each hour. As a next step, we use a technology like Spark to run batch jobs to group individual user events into user sessions. A single user session at Spotify can last many hours, e.g. when a user is listening to music at work. This makes the process of building complete and correct user sessions challenging. Events from the same user session can be located in many hourly buckets and you don’t know them upfront.

Sessions after midnight

To mitigate this problem, our Spark job can run daily (e.g. at midnight) and process the last 24 hourly buckets to generate a dataset with user sessions for a particular day. This approach allows you to build many complete and correct user sessions. However, there will be a problem with the sessions that start before and end after midnight. Because Spotify is a global product, there will be many of such sessions since users listen to music all the time and everywhere.

With the alternative approach, our Spark job can run hourly but each time it will combine intermediate data about active sessions from past hour to generate sessions that ended at a particular hour. Keeping such an intermediate data (state) about the in-flight session is a non-trivial task, though.

Regardless from the approach we follow, in the logic of our Spark job, we must group events by user, sort them by a timestamp, remove duplicates and what’s most important, assign the same session ID to events that occur close enough to each other.

Drawbacks of Classic Batch

First of all, it has many moving parts. You need to decide how to segregate events hourly or daily in HDFS, learn tools like Camus for ingestion and Oozie for scheduling. You need to write a lot of gluing code to integrate these technologies together into a single pipeline. Then you need to monitor this pipeline and make sure that each component is up and running.

Even if your pipeline don’t fail and finish successfully, it will take hours to complete and process the data. Such a time gap makes the value of this data lower and you can use it only for making reactive and historical analyses. If you want to make actionable or predictive decisions (i.e. triggering instant alerts or updating music recommendations), then you need to generate session datasets with much lower latency.

0g7Rq2qD2qKJikDA6

Micro-Batch Architecture

The easiest way to decrease latency and shorten the feedback loop is to use Spark Streaming.

0tpo4RRKwDDH8zPgN

With continuously running Spark Streaming job, the infrastructure is much simpler, because you no longer need Camus, Oozie or partition data in HDFS. You can generate results faster by configuring your Spark Streaming job to process all new events in small batches created each hour or each 10 minutes or even each minute.

Sessionization is actually not natively supported in Spark Streaming (SPARK-10816) and is far from being easy in general case. However, it’s feasible with a few tricks and a custom code. A specific code is needed because Spark Streaming internally divides your continuous stream of events into separate micro-batches. Because a single user session can span multiple of micro-batches, you need to implement own custom code to build the user session from the events that belong to many micro-batches.

03cj1yHyWoWfgpgeM

This can be achieved with the mapWithState method that maintains internal session state for each user across batches. There are a few blog posts that explain how to implement it and describe its pros and cons — you can find them here. Please note that these blog posts focus only on how to build a user session, but they don’t describe other problems that can happen that we describe below.

Reality of Event Stream

Problems can happen when events arrive late due to such problems like network connectivity issues. With Spotify, you can listen to music in offline mode when the songs stored locally on your device will be played. When you are offline, e.g. because you take a flight to Warsaw, the events of your music session are cached locally on your phone by the Spotify client. They will not be sent to Kafka yet as it requires internet connection. Therefore all previously buffered events will be routed to Kafka once user is back online. The events happen to be included in the same micro-batch as new events that are generated in the online mode afterwards. If your processing logic doesn’t somehow differentiate the original event time (generated offline) from the current time (generated online), then you will get incorrect results because old and new events will be considered equally will be processed in the same micro-batch.

Data Out of Order

The variation of this problem can happen when a user changes devices. Assume that you fly to Warsaw again. You listen to music in the offline mode on your laptop because you want to save your phone battery for later. These events are again buffered locally and can’t be sent to Kafka. When the fight ends, you shutdown your laptop, try to get out of the airport and you order a taxi. While driving home, you listen to music on your mobile phone in the online mode and these mobile events are sent to Kafka immediately. However, when you arrive home and relax on your sofa, you turn on your laptop and start listening to music from it. This time the laptop connects to the Internet successfully, so all previously buffered desktop events are sent to Kafka now. This means that events for a particular user are sent to Kafka out of order. First events from a mobile, then events from a laptop but the reality was obviously different. Again, if we don’t handle this scenario in our processing logic, then we will get incorrect results.

As you see there are a few serious problems to solve. Some of them are caused by classic batch technologies. That forced us to recognize data not as a continuous stream of events, but as files in HDFS or micro-batches that are processed periodically. What’s more, due to lack of functionalities to handle late or out-of-order events, the results of our jobs are incorrect.

Solving a Streaming Problem

At GetInData we do believe that modern stream processing engines allow us to process data in a simple and correct way.

This can be achieved with Apache Flink. Just like with Spark Streaming we can access Kafka events directly, but this time, with Flink, we process them in their native representation — as a stream instead of batch abstraction. Of course, it does not mean that further processing will be more complicated. Quite the opposite — this means no more data lakes, just a flowing river.

1Qh8xQ Ra7EH9f hSO482WQ

Example Implementation

We’ve implemented user sessionization in Flink to show you how easily all those problems can be resolved with a few lines of code. The use-case that we are solving with this example is to count how long user listens to music in a single session (case A) or how many consecutive songs a user plays from a particular playlist (case B) as shown in the picture below.

19Qsxf29ZmwMeu DdGU1LFg

The first step of our pipeline is reading events from Kafka. Flink provides a Kafka consumer that, with the help of internal checkpointing mechanism, gives us the power of exactly-once processing. All we need to do is to provide simple connection parameters like topic or Kafka broker address.

sEnv.addSource(new FlinkKafkaConsumer09[Event](conf.topic(),

getSerializationSchema,

kafkaProperties(conf.kafkaBroker()))

)

Next, we have to create user sessions — first by grouping incoming events by userId as a key and then assigning to session windows. It is as simple as specifying a gap between events that constitute a window and all the event-time magic will happen automatically underneath.

.keyBy(_.userId)

.window(EventTimeSessionWindows.withGap(Time.minutes(15)))

We can complete the most basic example with a computation function that will be applied for each user session window. As a result, we have a nice processing pipeline that handles out of order events that fits into five lines of code. Isn’t it neat?

val sessionStream : DataStream[SessionStats] = sEnv

.addSource(new FlinkKafkaConsumer09[Event](…))

.keyBy(_.userId)

.window(EventTimeSessionWindows.withGap(Time.minutes(15)))

.apply(new CountSessionStats())

Advanced time handling

We do not stop at the basic example because there is still some room for improvement. For example, we want to be able to handle late events. Flink also comes with a solution for that real-world issue. Let’s assume that we know some events may be late by the maximum amount of time i.e. 60 minutes. We just need to set that parameter in our pipeline. Flink will keep all windows state for that extra period so that when some outdated events come, we can react to them by updating our aggregates.

.allowedLateness(Time.minutes(60))

What else can we ask for? A good idea is to shorten the feedback loop. We can do that by writing an early firing trigger that will emit each window every few minutes with intermediate results.

.trigger(EarlyTriggeringTrigger.every(Time.minutes(10)))

We believe that the above examples shows pretty well that all time-specific issues, even hard ones, become simple and robust with Flink.

Beyond the code

Though the code examples are descriptive, not every aspect of stream processing can be expressed with them. What is not visible from the provided case study is that Flink:

  • is still a very efficient engine with both low latency and high throughput
  • provides exactly-once stateful processing
  • makes operational tasks easier

All those features are highly rated as confirmed by results of recent user survey conducted by our fellow colleagues in Data Artisans here and here.

The most important advice to take

Stream processing is not only about triggering alerts or getting results with low latency.

The stream is often a natural representation of data for many real-world problems. It can be successfully used for implementing ETL pipelines, calculating KPI metrics, powering business reports etc. — in use-cases where many companies have traditionally used batch processing technologies.

With modern processing frameworks like Flink, you can process your data in an easy, accurate and continuous manner.

streaming
analytics
batch processing
big data
24 April 2019

Want more? Check our articles

noweobszar roboczy 1 3

GetInData in 2022 - achievements and challenges in Big Data world

Time flies extremely fast and we are ready to summarize our achievements in 2022. Last year we continued our previous knowledge-sharing actions and…

Read more
runningkedroeverywhereobszar roboczy 1 4
Tutorial

Running Kedro… everywhere? Machine Learning Pipelines on Kubeflow, Vertex AI, Azure and Airflow

Building reliable machine learning pipelines puts a heavy burden on Data Scientists and Machine Learning engineers. It’s fairly easy to kick-off any…

Read more
copy of copy of gid commit 2 2 1
Tutorial

dbt Semantic Layer - What Is and How to Use

note: Read the second part of this post here. Introduction Many companies nowadays are facing the question, “How can I get value from my data easier…

Read more
apache2xobszar roboczy 1 4
Tutorial

Introduction to GeoSpatial streaming with Apache Spark and Apache Sedona

We are  producing more and more geospatial data these days. Many companies struggle to analyze and process such data, and a lot of this data comes…

Read more
backendobszar roboczy 1 2 3x 100
Tutorial

Data Mesh as a proper way to organise data world

Data Mesh as an answer In more complex Data Lakes, I usually meet the following problems in organizations that make data usage very inefficient: Teams…

Read more
acast anomali detection
Use-cases/Project

Anomaly detection implemented in podcasting company

Being a Data Engineer is not only about moving the data but also about extracting value from it. Read an article on how we implemented anomalies…

Read more

Contact us

Interested in our solutions?
Contact us!

Together, we will select the best Big Data solutions for your organization and build a project that will have a real impact on your organization.


What did you find most impressive about GetInData?

They did a very good job in finding people that fitted in Acast both technically as well as culturally.
Type the form or send a e-mail: hello@getindata.com
The administrator of your personal data is GetInData Poland Sp. z o.o. with its registered seat in Warsaw (02-508), 39/20 Pulawska St. Your data is processed for the purpose of provision of electronic services in accordance with the Terms & Conditions. For more information on personal data processing and your rights please see Privacy Policy.

By submitting this form, you agree to our Terms & Conditions and Privacy Policy