13 min read

Writing Flink jobs using the Spring dependency injection framework

Introduction

Almost two decades ago, the first version of Spring framework was released.
During this time, Spring became the bedrock on which the majority of java enterprise applications were built on.

It is fair to say that there are more programmers that know how to write an application using Spring framework than programmers that know how to write Flink jobs using Streaming API. Many of those who would like to write Flink jobs for their enterprise, would like to use the same patterns and frameworks that they are familiar with and recognized as industry standards. In this article I would like to show you how you can use the Spring framework to write Flink Jobs. This is something that was requested many times on Flink’s user list and Slack channels and something that I think is still an uncharted territory.

Business Logic

First let's set up our example use case. We have a business logic expressed in a java library that currently runs as a Spring Boot microservice. The goal is to migrate this logic to Flink using the same application configuration and base code.

The task of our application is to process a stream of events expressed as an “Order” entity. Secondly, to apply anonymization on party information, then create or assign the already created session Id.

The business logic is expressed by the `BusinessLogic’ module.
The key components are:

  • Order and SessionizeOrder classes - represent a model.
  • An OrderProcessor interface that has implementations for:
    • anonymization - SideNameAnonymization class
    • adding session information - OrderSessionize class

The BusinessLogic module is framework agnostic. However, it is written in a way that all “dependencies” should be provided via constructors. So It follows the dependency injection pattern.

Spring

The Spring boot application running our business logic is implemented in the SpringBootBusinessApp module. The Spring boot configuration is represented by the SpringConfig class presented below:

@Configuration
@EnableScheduling
@EnableAutoConfiguration
public class SpringConfig {

    @Bean
    public SessionManager createSessionManager() {
        return new SimpleSessionManager();
    }

    @Bean
    public OrderSessionize orderSessionize(SessionManager sessionManager) {
        return new OrderSessionize(sessionManager);
    }

    @Bean
    public List<OrderProcessor<Order>> orderProcessors() {
        return List.of(new SideNameAnonymization());
    }

    @Bean("businessOrderProcessor")
    public OrderProcessor<SessionizeOrder> createOrderProcessor(
            List<OrderProcessor<Order>> orderProcessors,
            OrderSessionize orderSessionize) {
        return new BusinessOrderProcessor(orderProcessors, orderSessionize);
    }
}

This is a fairly straightforward configuration. The main logic is represented by the “businessOrderProcessor'' bean that requires a list of order processors and a OrdrSiessionize bean. Both dependencies are created by Spring based on this configuration class. The OrderSessionize bean delegates session management to SessionManager, who's instance is also injected by Spring. In this case we are using a simple Map based cache. We will later see that this can be changed for Flink Jobs.

Apache Flink with Spring

As we all know, Flink is not a microservice and a Flink job is also nothing like that. So you will not see anything like this:

  public static void main(String[] args) {

    SpringApplication.run(FlinkApplication.class, args);

  }


which is expected in Spring Boot applications. This article demonstrates how you can use Spring as a dependency injection framework to set up your Flink Job, similar to what you would do when writing a standard microservice application.The example Flink job that uses Spring is presented in the FlinkPipeline module.The entry point of our sample job is the DataStreamJob java class.

public class DataStreamJob {

	@Autowired
	private EventProducer<Order> eventProducer;

	@Autowired
	private SinkFunction<SessionizeOrder> sink;

    @Autowired
	private ProcessFunction<Order, SessionizeOrder> businessLogic;

	public static void main(String[] args) throws Exception {
		// Add Job argument to System arguments, so we can tell Spring via job arguments which bean
		// should be loaded. This is done by @ConditionalOnProperty
		ParameterTool parameterTool = ParameterTool.fromArgs(args);
		Properties argProperties = parameterTool.getProperties();
		System.getProperties().putAll(argProperties);
		
		new ContextRegistry()
			.autowiredBean(new DataStreamJob(), "org.example.config")
			.run(parameterTool);
	}

	private void run(ParameterTool parameterTool) throws Exception {
		StreamExecutionEnvironment env = createStreamEnv();
		env.addSource(new CheckpointCountingSource<>(5, 5, eventProducer))
			.setParallelism(1)
			.process(businessLogic)
			.setParallelism(2)
			.addSink(sink)
			.setParallelism(2);

		env.execute("Flink Job Powered By Spring DI.");
	}
}

This is a simple pipeline that contains:

  • source - CheckpointCountingSource (kudos to Grzegorz Kołakowski from Getindata for this one)
  • process function - implementation based on Spring configuration class.
  • sink - implementation based on Spring configuration class.


The CheckpointCountingSource produces a fixed number of events per checkpoint for the duration of X checkpoints. The exact type of produced event and logic for its creation is extracted to EventProducer. We can see that this one is marked as @Autowired, meaning its concrete implementation will be injected by Spring.

Also the concrete implementation of sink will also be injected by Spring:

@Autowired

private SinkFunction<SessionizeOrder> sink;

Now let's examine the FlinkBusinessLogic java class. This class implements Flink’s ProcessFunction and will be injected and initialized by Spring.

public class FlinkBusinessLogic extends ProcessFunction<Order, SessionizeOrder> {

    @Autowired
    @Qualifier("businessOrderProcessor")
    private transient OrderProcessor<SessionizeOrder> orderProcessor;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        new ContextRegistry().autowiredBean(this, "org.example.config");
    }

    @Override
    public void processElement(Order value, Context ctx, Collector<SessionizeOrder> out) throws Exception {
        SessionizeOrder newOrder = orderProcessor.process(value);
        out.collect(newOrder);
    }
}

This is a very simple implementation of Flink’s ProcessFunction that delegates business logic execution to OrderProcessor type from the BusinessLogic module.

The important thing here is that OrderProcessor is marked as:

@Autowired
@Qualifier("businessOrderProcessor")
private OrderProcessor<SessionizeOrder> orderProcessor;

This means that OrderProcessor concrete implementation will be injected by Spring by matching the bean name with the “businessOrderProcessor” string.The Spring configuration class for the Flink job is split into two.

@Configuration
public class FlinkPipelineConfig {

    @Bean
    public EventToStringConverter<SessionizeOrder> converter() {
        return event -> String.format("Order Details - %s", event.toString());
    }

    @Bean
    public SinkFunction<SessionizeOrder> sink(EventToStringConverter<SessionizeOrder> converter) {
        return new ConsoleSink<>(converter);
    }

    @Bean
    public EventProducer<Order> eventProducer() {
        return new OrderProducer();
    }

    @Bean
    @ConditionalOnProperty(
        value="business.logic",
        havingValue = "standard",
        matchIfMissing = true)
    public ProcessFunction<Order, SessionizeOrder> businessLogic() {
        return new FlinkBusinessLogic();
    }

    @Bean
    @ConditionalOnProperty(
        value="business.logic",
        havingValue = "sleep")
    public ProcessFunction<Order, SessionizeOrder> suspiciousLogic() {
        return new SuspiciousFlinkBusinessLogic();
    }
}

and:

@Configuration
public class BusinessLogicSpringConfig {

    @Bean
    public SessionManager createSessionManager() {
        return new SimpleSessionManager();
    }

    @Bean
    public OrderSessionize orderSessionize(SessionManager sessionManager) {
        return new OrderSessionize(sessionManager);
    }

    @Bean
    public List<OrderProcessor<Order>> orderProcessors() {
        return List.of(new SideNameAnonymization());
    }

    @Bean("businessOrderProcessor")
    public OrderProcessor<SessionizeOrder> createOrderProcessor(
        List<OrderProcessor<Order>> orderProcessors,
        OrderSessionize orderSessionize) {
        return new BusinessOrderProcessor(orderProcessors, orderSessionize);
    }
}

The first one represents a Spring configuration for the Flink pipeline. It defines the pipeline’s source and sink dependencies. It is worth mentioning that Spring will only use this implementation if the “business.logic” property (or job argument) is missing or set to “standard”. If the “business.logic” was set to “sleep”, then the SuspiciousFlinkBusinessLogic would be used instead. This is possible by using a spring-boot @ConditionalOnProperty annotation on the Bean factory method in the Spring configuration class.The second class defines core business logic dependencies in exactly the same way as it was done for the SpringBootBusinessApp module.When we submit our job to the Flink cluster, we can see the following created job graph:

job-graph-flink-cluster-getindata

When we examine the “Process” task, we will see that it runs on two Task managers:

task managers flink getindata

This validates the fact that the FlinkBusienssLogic process function was created twice on separate machines where the Spring context was created for each instance.

Finally, if we inspect the logs printed by Spring injected ConsoleSink, we will see this:

log-flink-getindata

As you can see, we have just deployed a working Flink Job running on multiple Task Manager nodes using the Spring Dependency injection framework.

How does it work? 

In the previous chapter we saw that we can inject class dependencies using the Spring framework for Flink jobs. The secret ingredient here is our open source flink-spring library that bundles all Spring dependencies into one jar and provides a simple API for creating and using Spring context in Flink Jobs. What you need to do is to add flink-spring-0.1.0-SNAPSHOT-jar-with-dependencies.jar into Flink’s lib folder and restart the cluster. The details for how to create this jar can be found in the flink-spring library manual.In order to have access to Spring classes from a Flink job, you need to add a new dependency.

    <dependency>
      <groupId>com.getindata</groupId>
      <artifactId>flink-spring</artifactId>
      <version>0.1.0-SNAPSHOT</version>
      <scope>provided</scope>
    </dependency>

The flink-spring will be expected during runtime and will not be added to your uber jar.With this dependency added, now you have access to Spring libraries and you can start using @Autowired, @Bean annotations.The key component for flink-spring library is ContextRegistry class.This class provides an API that allows users to initialize Spring context based on Spring Java configuration classes and inject all dependencies marked as @Autowired from that context. This is done simply by calling:

ContextRegistry registry = new ContextRegistry();
registry.autowireBean(objToInjectDependenciesTo, “configuration.package”)

In the case of our FlinkPipeline application we have done this in two places.The first time was in our main class DataStreamJob, where we injected concrete implementation for Flink Sink and our custom EventProducer interface, like so:

new ContextRegistry()
			.autowiredBean(new DataStreamJob(), "org.example.config")
			.run(parameterTool);

The second time was in our custom process function → FlinkBusinessLogic::open() method where we initialized the Spring context and injected the instances of OrderProcessor from our BusinessLogic module.

Those two examples show that we can initialize and use Spring context both in the Job Manager during the job submission phase as well as in the Task Manager during the task initialization phase.

The extra cool thing about ContextRegistry class is that it keeps a cache of already created context objects. Meaning that if you would like to “initialize” more than one object using the same Spring context, the context will be created only once. Having said that, you must keep in mind that this cache is transient. So the rule only applies to Job Manager and task Operator scope on Task Manager.

Side notes and considerations

The flink-spring library in its current state is a PoC project to show that using the Spring framework for dependency injection is a possible and fairly straightforward task for developing Flink Jobs using Streaming and Table API.

There are two considerations one must take into account:

  1. Currently no Spring dependencies are shaded in the flink-spring uber jar. This means that there might be a potential library version conflict between Flink and flink-spring. Despite this possibility, at the time this article was written, no such conflicts were observed.

  2. The ContextRegistry class loads Spring context on demand from provided configuration classes. Since the application lifecycle is not managed by Spring and the created context is short lived (only in the scope of ContextRegistry::autowireBean) this means that you have no global guarantee that the Spring scope of the created bean will be preserved. In other words, if we have a bean with default singleton scope, having two contextRegistry instances like so:

     new ContextRegistry().autowireBean(...);
     new ContextRegistry().autowireBean(...);

    would create two exact same beans even though the bean definition says that the scope should be Singleton.

    Due to the cache used in the ContextRegistry implementation, the scope of the bean is preserved in the context of individual ContextRegistry instances.

The second point is important especially when we think about the Spring context initialization cost. You can imagine a job where every operator will initialize its own ContextRegistry() instance. Multiply this by the parallelism level of the job/task and you can easily see that we can end up with the initialization of many Spring contexts where every operator is using only a handful of created Beans. This can increase the time needed for job initialization. A good practice here would be to initialize only the required beans. This can be achieved in two ways:

  1. Annotate your configuration classes with @Lazy
  2. Split your Spring configuration into multiple files and use only the one you need for Task initialization. 

Conclusion

In this blog post we have demonstrated that it is possible and fairly easy to use Spring as a dependency injection framework for building Flink jobs using our small flink-spring library.

We have also shown that you can use the Spring configuration classes that you already have for your other business applications and use them to set up Flink jobs using the Spring framework. We also proved that this can be done for Job and Task manager nodes and you can inject Flink based implementations (Sinks, sources) as well as your custom business code classes.Taking a step further, you could also swap the SessionManager implementation to the one that would use Flink’s map state instead of Java map.We hope that this will help some of you folks with introducing Flink to your organizations by displaying that well established industry frameworks and patterns can still be used for writing Flink Jobs.

apache flink
flink
spring framework
spring
flink jobs
16 May 2023

Want more? Check our articles

anomaly detection truecaller getindata machine learning
Success Stories

Revolutionizing Daily Analytics: Machine Learning for an Unusual Approach to Anomaly Detection. The Truecaller Story

Discovering anomalies with remarkable accuracy, our deployed model successfully identified 90% true anomalies within a 2-months evaluation period…

Read more
getindata nifi flow cicd notext
Tutorial

NiFi Ingestion Blog Series. PART II - We have deployed, but at what cost… - CI/CD of NiFi flow

Apache NiFi, a big data processing engine with graphical WebUI, was created to give non-programmers the ability to swiftly and codelessly create data…

Read more
power big data science
Tutorial

Power of Big Data: Science

Welcome to the next installment of the "Big Data for Business" series, in which we deal with the growing popularity of Big Data solutions in various…

Read more
wp 1 recommendation systems cover 2
Whitepaper

White Paper: Guide to Recommendation Systems

Our White Paper “Guide to Recommendation Systems” is already released. This article will give you a closer look at what you can find inside, what…

Read more
getindata blog big data knowledge sharing it jobs

How do we apply knowledge sharing in our teams? GetInData’s internal initiatives

Knowledge sharing is one of our main missions. We regularly speak at international conferences, we contribute to open-source technologies, organize…

Read more
power of big dataobszar roboczy 1 3x 100
Tutorial

Power of the Big Data: Industry

Welcome to the third part of the "Power of Big Data" series, in which we describe how Big Data tools and solutions support the development of modern…

Read more

Contact us

Interested in our solutions?
Contact us!

Together, we will select the best Big Data solutions for your organization and build a project that will have a real impact on your organization.


What did you find most impressive about GetInData?

They did a very good job in finding people that fitted in Acast both technically as well as culturally.
Type the form or send a e-mail: hello@getindata.com
The administrator of your personal data is GetInData Poland Sp. z o.o. with its registered seat in Warsaw (02-508), 39/20 Pulawska St. Your data is processed for the purpose of provision of electronic services in accordance with the Terms & Conditions. For more information on personal data processing and your rights please see Privacy Policy.

By submitting this form, you agree to our Terms & Conditions and Privacy Policy