Introduction
Almost two decades ago, the first version of Spring framework was released.
During this time, Spring became the bedrock on which the majority of java enterprise applications were built on.
It is fair to say that there are more programmers that know how to write an application using Spring framework than programmers that know how to write Flink jobs using Streaming API. Many of those who would like to write Flink jobs for their enterprise, would like to use the same patterns and frameworks that they are familiar with and recognized as industry standards. In this article I would like to show you how you can use the Spring framework to write Flink Jobs. This is something that was requested many times on Flink’s user list and Slack channels and something that I think is still an uncharted territory.
Business Logic
First let's set up our example use case. We have a business logic expressed in a java library that currently runs as a Spring Boot microservice. The goal is to migrate this logic to Flink using the same application configuration and base code.
The task of our application is to process a stream of events expressed as an “Order” entity. Secondly, to apply anonymization on party information, then create or assign the already created session Id.
The business logic is expressed by the `BusinessLogic’ module.
The key components are:
Order
and SessionizeOrder
classes - represent a model.- An
OrderProcessor
interface that has implementations for: - anonymization -
SideNameAnonymization
class - adding session information -
OrderSessionize
class
The BusinessLogic
module is framework agnostic. However, it is written in a way that all “dependencies” should be provided via constructors. So It follows the dependency injection pattern.
Spring
The Spring boot application running our business logic is implemented in the SpringBootBusinessApp module. The Spring boot configuration is represented by the SpringConfig class presented below:
@Configuration
@EnableScheduling
@EnableAutoConfiguration
public class SpringConfig {
@Bean
public SessionManager createSessionManager() {
return new SimpleSessionManager();
}
@Bean
public OrderSessionize orderSessionize(SessionManager sessionManager) {
return new OrderSessionize(sessionManager);
}
@Bean
public List<OrderProcessor<Order>> orderProcessors() {
return List.of(new SideNameAnonymization());
}
@Bean("businessOrderProcessor")
public OrderProcessor<SessionizeOrder> createOrderProcessor(
List<OrderProcessor<Order>> orderProcessors,
OrderSessionize orderSessionize) {
return new BusinessOrderProcessor(orderProcessors, orderSessionize);
}
}
This is a fairly straightforward configuration. The main logic is represented by the “businessOrderProcessor'' bean that requires a list of order processors and a OrdrSiessionize bean. Both dependencies are created by Spring based on this configuration class. The OrderSessionize
bean delegates session management to SessionManager, who's instance is also injected by Spring. In this case we are using a simple Map based cache. We will later see that this can be changed for Flink Jobs.
Apache Flink with Spring
As we all know, Flink is not a microservice and a Flink job is also nothing like that. So you will not see anything like this:
public static void main(String[] args) {
SpringApplication.run(FlinkApplication.class, args);
}
which is expected in Spring Boot applications. This article demonstrates how you can use Spring as a dependency injection framework to set up your Flink Job, similar to what you would do when writing a standard microservice application.The example Flink job that uses Spring is presented in the FlinkPipeline module.The entry point of our sample job is the DataStreamJob
java class.
public class DataStreamJob {
@Autowired
private EventProducer<Order> eventProducer;
@Autowired
private SinkFunction<SessionizeOrder> sink;
@Autowired
private ProcessFunction<Order, SessionizeOrder> businessLogic;
public static void main(String[] args) throws Exception {
// Add Job argument to System arguments, so we can tell Spring via job arguments which bean
// should be loaded. This is done by @ConditionalOnProperty
ParameterTool parameterTool = ParameterTool.fromArgs(args);
Properties argProperties = parameterTool.getProperties();
System.getProperties().putAll(argProperties);
new ContextRegistry()
.autowiredBean(new DataStreamJob(), "org.example.config")
.run(parameterTool);
}
private void run(ParameterTool parameterTool) throws Exception {
StreamExecutionEnvironment env = createStreamEnv();
env.addSource(new CheckpointCountingSource<>(5, 5, eventProducer))
.setParallelism(1)
.process(businessLogic)
.setParallelism(2)
.addSink(sink)
.setParallelism(2);
env.execute("Flink Job Powered By Spring DI.");
}
}
This is a simple pipeline that contains:
- source - CheckpointCountingSource (kudos to Grzegorz Kołakowski from Getindata for this one)
- process function - implementation based on Spring configuration class.
- sink - implementation based on Spring configuration class.
The CheckpointCountingSource
produces a fixed number of events per checkpoint for the duration of X checkpoints. The exact type of produced event and logic for its creation is extracted to EventProducer
. We can see that this one is marked as @Autowired
, meaning its concrete implementation will be injected by Spring.
Also the concrete implementation of sink will also be injected by Spring:
@Autowired
private SinkFunction<SessionizeOrder> sink;
Now let's examine the FlinkBusinessLogic
java class. This class implements Flink’s ProcessFunction and will be injected and initialized by Spring.
public class FlinkBusinessLogic extends ProcessFunction<Order, SessionizeOrder> {
@Autowired
@Qualifier("businessOrderProcessor")
private transient OrderProcessor<SessionizeOrder> orderProcessor;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
new ContextRegistry().autowiredBean(this, "org.example.config");
}
@Override
public void processElement(Order value, Context ctx, Collector<SessionizeOrder> out) throws Exception {
SessionizeOrder newOrder = orderProcessor.process(value);
out.collect(newOrder);
}
}
This is a very simple implementation of Flink’s ProcessFunction
that delegates business logic execution to OrderProcessor
type from the BusinessLogic module.
The important thing here is that OrderProcessor
is marked as:
@Autowired
@Qualifier("businessOrderProcessor")
private OrderProcessor<SessionizeOrder> orderProcessor;
This means that OrderProcessor
concrete implementation will be injected by Spring by matching the bean name with the “businessOrderProcessor” string.The Spring configuration class for the Flink job is split into two.
@Configuration
public class FlinkPipelineConfig {
@Bean
public EventToStringConverter<SessionizeOrder> converter() {
return event -> String.format("Order Details - %s", event.toString());
}
@Bean
public SinkFunction<SessionizeOrder> sink(EventToStringConverter<SessionizeOrder> converter) {
return new ConsoleSink<>(converter);
}
@Bean
public EventProducer<Order> eventProducer() {
return new OrderProducer();
}
@Bean
@ConditionalOnProperty(
value="business.logic",
havingValue = "standard",
matchIfMissing = true)
public ProcessFunction<Order, SessionizeOrder> businessLogic() {
return new FlinkBusinessLogic();
}
@Bean
@ConditionalOnProperty(
value="business.logic",
havingValue = "sleep")
public ProcessFunction<Order, SessionizeOrder> suspiciousLogic() {
return new SuspiciousFlinkBusinessLogic();
}
}
and:
@Configuration
public class BusinessLogicSpringConfig {
@Bean
public SessionManager createSessionManager() {
return new SimpleSessionManager();
}
@Bean
public OrderSessionize orderSessionize(SessionManager sessionManager) {
return new OrderSessionize(sessionManager);
}
@Bean
public List<OrderProcessor<Order>> orderProcessors() {
return List.of(new SideNameAnonymization());
}
@Bean("businessOrderProcessor")
public OrderProcessor<SessionizeOrder> createOrderProcessor(
List<OrderProcessor<Order>> orderProcessors,
OrderSessionize orderSessionize) {
return new BusinessOrderProcessor(orderProcessors, orderSessionize);
}
}
The first one represents a Spring configuration for the Flink pipeline. It defines the pipeline’s source and sink dependencies. It is worth mentioning that Spring will only use this implementation if the “business.logic”
property (or job argument) is missing or set to “standard”
. If the “business.logic”
was set to “sleep”, then the SuspiciousFlinkBusinessLogic would be used instead. This is possible by using a spring-boot @ConditionalOnProperty
annotation on the Bean factory method in the Spring configuration class.The second class defines core business logic dependencies in exactly the same way as it was done for the SpringBootBusinessApp
module.When we submit our job to the Flink cluster, we can see the following created job graph:
When we examine the “Process” task, we will see that it runs on two Task managers:
This validates the fact that the FlinkBusienssLogic process function was created twice on separate machines where the Spring context was created for each instance.
Finally, if we inspect the logs printed by Spring injected ConsoleSink
, we will see this:
As you can see, we have just deployed a working Flink Job running on multiple Task Manager nodes using the Spring Dependency injection framework.
How does it work?
In the previous chapter we saw that we can inject class dependencies using the Spring framework for Flink jobs. The secret ingredient here is our open source flink-spring library that bundles all Spring dependencies into one jar and provides a simple API for creating and using Spring context in Flink Jobs. What you need to do is to add flink-spring-0.1.0-SNAPSHOT-jar-with-dependencies.jar into Flink’s lib folder and restart the cluster. The details for how to create this jar can be found in the flink-spring library manual.In order to have access to Spring classes from a Flink job, you need to add a new dependency.
<dependency>
<groupId>com.getindata</groupId>
<artifactId>flink-spring</artifactId>
<version>0.1.0-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
The flink-spring will be expected during runtime and will not be added to your uber jar.With this dependency added, now you have access to Spring libraries and you can start using @Autowired
, @Bean
annotations.The key component for flink-spring
library is ContextRegistry
class.This class provides an API that allows users to initialize Spring context based on Spring Java configuration classes and inject all dependencies marked as @Autowired
from that context. This is done simply by calling:
ContextRegistry registry = new ContextRegistry();
registry.autowireBean(objToInjectDependenciesTo, “configuration.package”)
In the case of our FlinkPipeline
application we have done this in two places.The first time was in our main class DataStreamJob
, where we injected concrete implementation for Flink Sink
and our custom EventProducer
interface, like so:
new ContextRegistry()
.autowiredBean(new DataStreamJob(), "org.example.config")
.run(parameterTool);
The second time was in our custom process function → FlinkBusinessLogic::open()
method where we initialized the Spring context and injected the instances of OrderProcessor
from our BusinessLogic
module.
Those two examples show that we can initialize and use Spring context both in the Job Manager during the job submission phase as well as in the Task Manager during the task initialization phase.
The extra cool thing about ContextRegistry
class is that it keeps a cache of already created context objects. Meaning that if you would like to “initialize” more than one object using the same Spring context, the context will be created only once. Having said that, you must keep in mind that this cache is transient. So the rule only applies to Job Manager and task Operator scope on Task Manager.
Side notes and considerations
The flink-spring library in its current state is a PoC project to show that using the Spring framework for dependency injection is a possible and fairly straightforward task for developing Flink Jobs using Streaming and Table API.
There are two considerations one must take into account:
Currently no Spring dependencies are shaded in the flink-spring uber jar. This means that there might be a potential library version conflict between Flink and flink-spring. Despite this possibility, at the time this article was written, no such conflicts were observed.
The ContextRegistry
class loads Spring context on demand from provided configuration classes. Since the application lifecycle is not managed by Spring and the created context is short lived (only in the scope of ContextRegistry::autowireBean
) this means that you have no global guarantee that the Spring scope of the created bean will be preserved. In other words, if we have a bean with default singleton scope, having two contextRegistry instances like so:
new ContextRegistry().autowireBean(...);
new ContextRegistry().autowireBean(...);
would create two exact same beans even though the bean definition says that the scope should be Singleton
.
Due to the cache used in the ContextRegistry implementation
, the scope of the bean is preserved in the context of individual ContextRegistry
instances.
The second point is important especially when we think about the Spring context initialization cost. You can imagine a job where every operator will initialize its own ContextRegistry() instance
. Multiply this by the parallelism level of the job/task and you can easily see that we can end up with the initialization of many Spring contexts where every operator is using only a handful of created Beans. This can increase the time needed for job initialization. A good practice here would be to initialize only the required beans. This can be achieved in two ways:
- Annotate your configuration classes with
@Lazy
- Split your Spring configuration into multiple files and use only the one you need for Task initialization.
Conclusion
In this blog post we have demonstrated that it is possible and fairly easy to use Spring as a dependency injection framework for building Flink jobs using our small flink-spring
library.
We have also shown that you can use the Spring configuration classes that you already have for your other business applications and use them to set up Flink jobs using the Spring framework. We also proved that this can be done for Job and Task manager nodes and you can inject Flink based implementations (Sinks, sources) as well as your custom business code classes.Taking a step further, you could also swap the SessionManager
implementation to the one that would use Flink’s map state instead of Java map.We hope that this will help some of you folks with introducing Flink to your organizations by displaying that well established industry frameworks and patterns can still be used for writing Flink Jobs.