Stateful Stream Processing: Apache Flink State Backends

This episode of our Flink Friday Tip explores stateful stream processing and more precisely the different state backends available in Apache Flink. In the following sections, we present the 3 state backends of Apache Flink, their limitations and when to use each of them depending on your case-specific requirements.
With stateful stream processing, when a developer enables checkpointing for a Flink application, state is persisted to prevent data loss and ensure full recovery in case of failure. Choosing the state backend for your application will impact both how and where the state is persisted.

Sign up for our Apache Flink Public training for practical examples of managing Apache Flink’s state and developing stateful streaming applications.

Apache Flink comes with three available state backends: the MemoryStateBackend, the FsStateBackend, and the RocksDBStateBackend.
stateful stream processing, Flink state, State backend

The MemoryStateBackend

The MemoryStateBackend is an internal state backend that maintains state on the Java heap. Key/value state and window operators hold hash tables that store the values and timers.
When your application checkpoints, this backend will take a snapshot of your state before sending it to Apache Flink’s Job Manager that stores it on the Java heap as well.
The MemoryStateBackend is configured by default to support asynchronous snapshots. Asynchronous snapshots avoid potential blocking pipelines that can cause backpressure for your streaming application.

What to look out for when using the MemoryStateBackend:
  • The size of each individual state is by default limited to 5 MB. You can increase further the size in the MemoryStateBackend constructor.
  • The state size is limited by the akka frame size and cannot be greater than the akka frame size no matter what you set as the maximal state size in the configuration (you can find more information in the configuration).
  • The aggregate state must fit into the JobManager memory.

When to use the MemoryStateBackend:
  • It is advised to use the MemoryStateBackend for local developments or debugging because of its limited state size
  • The MemoryStateBackend best fits use cases and stateful stream processing applications that hold small state size,  such as jobs that consist only of record-at-a-time functions (Map, FlatMap, or Filter) or use the Kafka consumer.


The FsStateBackend

The FsStateBackend configuration is completed with a file system such as URL (type, address, path). Some example file systems could be:

  • “hdfs://namenode:40010/flink/checkpoints” or
  • “s3://flink/checkpoints”.

When you choose the FsStateBackend the in-flight data is held in the Task Manager’s memory. On checkpoint, this backend will write the state snapshots into files in the configured file system and directory, while it will store minimal metadata in the JobManager’s memory or in Zookeeper (for high-availability cases).
The FsStateBackend is configured by default to provided asynchronous snapshots in order to avoid blocking the processing pipeline while writing state checkpoints. The feature can be disabled by instantiating a FsStateBackend with the corresponding boolean flag in the constructor set to false, e.g.:

new FsStateBackend(path, false);

When to use the FsStateBackend:
  • The FsStateBackend best fits Apache Flink stateful stream processing jobs that handle large state, long windows, or large key/value states.
  • The FsStateBackend is best suited for every high-availability setup.


The RocksDBStateBackend

The configuration of the RocksDBStateBackend is executed with a file system (type, address, path), like the examples below:

  • “hdfs://namenode:40010/flink/checkpoints” or
  • “s3://flink/checkpoints”.

The RocksDBStateBackend holds in-flight data on local disk using a RocksDB database. On checkpoint, either the entire RocksDB database will be checkpointed into the configured file system or incremental diffs in the case of very large state jobs. At the same time, Apache Flink stores some minimal metadata in the JobManager’s memory or in Zookeeper (for high-availability cases). RocksDB is by default configured to perform asynchronous snapshots.

What to look out for when using the RocksDBStateBackend:
  • RocksDB’s maximum supported size per key and per value is 2^31 bytes each. This is due to the fact that the JNI bridge API of the RocksDB is based on byte[].
  • We need to emphasize here that for stateful stream processing applications using states with merge operations, such as ListState, can accrue value sizes greater than 2^31 bytes overtime which will cause them to fail on any follow-up retrieval.

When to use the RocksDBStateBackend:
  • The RocksDBStateBackend best fits Apache Flink stateful stream processing jobs that handle large state, long windows, or large key/value states.
  • The RocksDBStateBackend is best suited for every high-availability setup.
  • The RocksDBStateBackend is the only state backend currently available to support incremental checkpointing for your stateful stream processing application.

When using RocksDB, state size is only limited by the amount of disk space available, which makes the RocksDBStateBackend a great choice for managing very large state. The tradeoff when using RocksDB is that all state accesses and retrievals require serialization (or deserialization) to cross the JNI boundary. This might impact an application's throughput compared the to on-heap backends presented above.
The different state backends serve multiple developer requirements and should be chosen after careful consideration and extensive planning before starting the development of an application. This ensures that the right state backend is chosen to suit best the application and business requirements. If you have questions or want to get additional training on Apache Flink’s State Backends, sign up to our Apache Flink Public Training below or contact us for more information.

New call-to-action

New call-to-action

Tags: Apache Flink