On the crest of streams with Flink
Stream processing and Apache Kafka
Stream processing and Apache Kafka is an essential piece of infrastructure that enables engineering data-driven culture and the ability to move forward with new projects quickly.
We rely heavily on Kafka to process millions of data points streaming to us every day. We need to be able to provide easy to use ad-hoc analytics, aggregate these data points as they stream into our systems, and also run arbitrary transformations to compute features and variables used later in the decisioning process, alert events and anomalies.
Apache Flink and AWS Kinesis Data Analytics
Apache Flink is a scalable and fault-tolerant processing framework for streams of data based on the idea that it should not be hard to express computations (like AVG or GROUP BY) while still be able to scale indefinitely, in a fault-tolerant manner.
In its core, it is the JVM based framework that was developed specifically for stateful computations over data streams. The project itself has an active open source community, both used and contributed by many companies that require to process large amounts of data.
Kinesis Data Analytics was announced by AWS In November 2019.
It is an AWS managed runtime environment for Apache Flink applications. Since that time using Apache Flink on AWS Kinesis Data Analytics is getting momentum given it important features:
- supports checkpoints and snapshots, that allows easy recovery from the offset that we reached in Kafka
- treats batch data sources and streaming data sources same way, so we can use the same implementation for backfill and streaming parts of our pipeline
- low latency computations with autoscaling options by AWS Kinesis Data Analytics
- Java-based on out-of-the-box integration with Apache Kafka, Kinesis Data Streams, and Apache Avro.
Flink connector for Kafka
In this article, I will be sharing our experience and key learnings of using Amazon Kinesis Data Analytics and Flink for processing Kafka events encoded using Apache Avro and discuss how to implement custom deserialization schema in Flink and why you might need that.
As the driving use case consider the solution that is responsible for a) ingesting trace events from different services via data consumption from different Kafka topics, b) making some non-sophisticated data massaging, and c) persisting the data for later ad-hoc analysis.
Flink provides Apache Kafka connector for reading data from and writing data to Kafka topics out of the box using FlinkKafkaConsumer.
In order to use FlinkKafkaConsumer
you normally configure
- The topic name/list of topic names
- A
DeserializationSchema
for deserializing the data from Kafka - Properties for the Kafka consumer, e.g.
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "");
properties.setProperty("group.id", "");
Example of using FlinkKafkaConsumer
in your application
DataStream<String> stream = env.addSource(
new FlinkKafkaConsumer<>(
"topic",
new SimpleStringSchema(),
properties));
In order to define how to turn the binary data in Kafka into Java/Scala objects you need to select the implementation of DeserializationSchema
Flink provides the following schemas out of the box
- JsonNodeDeserializationSchema (from
org.apache.flink:flink-json
library) that turns the serialized JSON into an ObjectNode object - AvroDeserializationSchema (from
org.apache.flink:flink-avro
library) that reads data serialized with Avro using a statically provided Avro schema - ConfluentRegistryAvroDeserializationSchema (from
org.apache.flink:flink-avro-confluent-registry
library) that reads data serialized with Avro using a statically provided reader schema and lookups the writer’s schema in schema-registry.
Custom DeserializationSchema
The main challenges that required us to implement custom deserialization schema for our use case were the following:
- One service can emit events using different (thus backward compatible) type versions into the same topic
- Multiple different services can emit events into the same Kafka topic using different trace types
- All data points from trace event should be persisted in the database, so it is not technically feasible to stick to a particular reader schema
- Existing https://issues.apache.org/jira/browse/FLINK-11030 (“Cannot use Avro logical types with ConfluentRegistryAvroDeserializationSchema”) affecting us as producers use Avro logical types.
Luckily Flink provides a good abstraction for custom deserialization logic in the form of KafkaDeserializationSchema interface that you need to implement in order to provide you custom logic on how to turn the binary data from Kafka into Java/Scala objects.
public interface KafkaDeserializationSchema<T> extends
Serializable, ResultTypeQueryable<T> { /**
* Method to decide whether the element signals end of stream ...
*/
boolean isEndOfStream(T nextElement);
/**
* Deserializes the Kafka record.
*/
T deserialize(ConsumerRecord<byte[], byte[]> record);
}
For our particular use case if was enough to use widely adopted io.confluent.kafka.serializers.KafkaAvroDeserializer
from io.confluent:kafka-avro-serializer library.
class GenericKafkaDeserializationSchema implements
KafkaDeserializationSchema<GenericRecord> { private KafkaAvroDeserializer deserializer; @Override
public GenericRecord deserialize(ConsumerRecord<byte[],
byte[]> consumerRecord) {
return (GenericRecord)deserializer.deserialize(
consumerRecord.topic(), consumerRecord.value());
} @Override
public boolean isEndOfStream(GenericRecord nextElement) {
return false;
}
Important performance optimizations
Flink job exchanges data records between its operators. Records need to be serialized to bytes first, because the records may not only be sent to another instance in the same JVM but instead to a separate process. Also, Flink’s off-heap state-backend is based on a local embedded RocksDB instance which is implemented in native C++ code and thus also needs transformation into bytes on every state access.
Please read Flink Serialization Tuning post for a detailed explanation of why wire and state serialization alone can easily cost a lot of your job’s performance if not executed correctly.
Apache Flink’s out-of-the-box serialization can be roughly divided into the following groups:
- Flink-provided special serializers — for basic types
- POJOs — a public, standalone class with a public no-argument constructor and all non-static, non-transient fields in the class hierarchy either public or with a public getter- and a setter-method
- Generic types — user-defined data types that are not recognized as a POJO and then serialized via Kryo.
Flink offers built-in support for the Apache Avro serialization framework. However, it is important to note that Avro’s GenericRecord types cannot, unfortunately, be used automatically since they require the user to specify a schema. Unfortunately in our case, it is not possible because we need to use GenericRecord in and custom DeserializationSchema exactly due to lack of possibility to strictly define Avro schema for the producers.
Without type information, Flink will fall back to Kryo for GenericRecord serialization which would serialize the schema into every record, over and over again. As a result, the serialized form will be bigger and more costly to create. We have observed a huge performance drop in terms of increased CPU load when dealing with records backed by complex Avro schemas.
Since Avro’s Schema class is not serializable, it can not be sent around as is. You can work around this by converting it to a String and parsing it back when needed or you can improve your KafkaDeserializationSchema by returning Tuple2 or Row entries instead.
The example below demonstrates the optimizations done in one of our projects where KafkaDeserializationSchema functionality was combined with the mapper operator to avoid GenericRecord deserialization happening between those operators. This was certainly a huge(performance) advantage.
@Override
public Tuple2<Boolean, Row> deserialize(ConsumerRecord<byte[],
byte[]> consumerRecord) {
try {
GenericRecord deserializedRecord =
(GenericRecord) deserializer.deserialize(
consumerRecord.topic(), consumerRecord.value()); return mapper.map(deserializedRecord); } catch (Throwable t) {
//...
}
}@Override
public TypeInformation<Tuple2<Boolean, Row>> getProducedType() {
return Types.TUPLE(Types.BOOLEAN, Types.ROW(Types.STRING, ...));
}
Additional optimizations
You should strongly consider using Schema Registry Client with client-side caching, e.g.CachedSchemaRegistryClient.
Also, be aware of potential internal optimizations for KafkaAvroDeserializer
related to caching DatumReader
instances (version not yet released at the moment) fixed in the scope of this issue. The actual gains from the fix vary somewhat based on hardware and Java version used but are generally between ~3x and ~8x. The cause is expensive DatumReader and DatumWriter objects being instantiated per record serialized or deserialized due to a lack of caching. The result is a lot of wasted CPU resources as well as potentially capping pipeline throughput.
Conclusion
- Flink provides good out-of-box primitives to work with Kafka and Avro
- Flink provides a good abstraction for custom deserialization logic
- We can process large amounts of data efficiently
- You can always find ways to optimize