A newer version of Hazelcast Platform is available.

View latest

Apache Kafka Connector

The Kafka connector allows you to stream, filter, and transform events between Hazelcast clusters and Kafka.

Apache Kafka is a popular distributed, persistent log store which is a great fit for stream processing systems. Data in Kafka is structured as topics and each topic consists of one or more partitions, stored in the Kafka cluster.

Installing the Connector

This connector is included in the full distribution of Hazelcast.

If you’re using the slim distribution, you must add the hazelcast-jet-kafka module to your member’s classpath.

Permissions

If you use Hazelcast Enterprise, your clients may need permissions to use this connector. For details, see Securing Jobs.

Configuration Options

Use these options to configure the Kafka connector.

To read from Kafka as a source, the only requirements are to provide deserializers and a topic name:

Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("key.deserializer", StringDeserializer.class.getCanonicalName());
props.setProperty("value.deserializer", StringDeserializer.class.getCanonicalName());
props.setProperty("auto.offset.reset", "earliest");

Pipeline p = Pipeline.create();
p.readFrom(KafkaSources.kafka(props, "topic"))
 .withNativeTimestamps(0)
 .writeTo(Sinks.logger());

The topics and partitions are distributed across the Hazelcast cluster, so that each member is responsible for reading a subset of the data.

When used as a sink, then the only requirements are the serializers:

Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("key.serializer", StringSerializer.class.getCanonicalName());
props.setProperty("value.serializer", StringSerializer.class.getCanonicalName());

Pipeline p = Pipeline.create();
p.readFrom(Sources.files("home/logs"))
 .map(line -> LogParser.parse(line))
 .map(log -> entry(log.service(), log.message()))
 .writeTo(KafkaSinks.kafka(props, "topic"));

Fault Tolerance

One of the most important features of using Kafka as a source is that it’s possible to replay data, which enables fault-tolerance. If the job has a processing guarantee configured, Hazelcast will periodically save the current offsets internally and then replay from the saved offset when the job is restarted. In this mode, Hazelcast will manually track and commit offsets, without interacting with the consumer groups feature of Kafka.

If the processing guarantees are disabled, the source will start reading from default offsets that are set in the auto.offset.reset property. You can enable offset committing by assigning a group.id, enabling auto offset committing using enable.auto.commit and configuring auto.commit.interval.ms in the given properties. Refer to Kafka documentation for the descriptions of these properties.

Transactional Guarantees

As a sink, the Kafka connector provides exactly-once guarantees at the cost of using Kafka transactions. Hazelcast commits the produced records after each snapshot is completed. This greatly increases the latency because consumers see the records only after they are committed.

If you use at-least-once guarantee, records are visible immediately, but in the case of a failure some records could be duplicated. You can also configure jobs in exactly-once mode and decrease the guarantee just for a particular Kafka sink.

Schema Registry

Kafka is often used together with Confluent Schema Registry as a repository of types. The use of the schema registry is done through adding it to the Properties object and using the KafkaAvroSerializer/Deserializer if Avro is being used:

properties.put("value.deserializer", KafkaAvroDeserializer.class);
properties.put("specific.avro.reader", true);
properties.put("schema.registry.url", schemaRegistryUrl);

Keep in mind that once the record deserialized, Jet still needs to know how to serialize/deserialize the record internally.

Version Compatibility

The Kafka sink and source are based on version 2.2.0, this means Kafka connector will work with any client and broker having version equal to or greater than 1.0.0.

Heterogeneous Messages

This connector supports heterogeneous messages. For example, say you have these messages in your topic:

{"name":"Alice","age":42}
{"name":"Bob","age":43,"petName":"Zaz"}

If you map the column petName, it will have the value null for the entry with key=1. This scenario is supported. Similar behavior works with Avro format.