Connect to Amazon Kinesis
Amazon Kinesis Data Streams (KDS) is a massively scalable and durable real-time data streaming service. It can be easily combined with Hazelcast for building any number of useful data pipelines. KDS can be used by Hazelcast either as a data source or as a data sink.
In this tutorial, you will build two pipelines:
- 
A pipeline that takes a continuous flow of simulated tweets and pushes them into KDS. 
- 
A pipeline that consumes the tweets and computes the traffic intensity in events per second. 
Before You Begin
To complete this tutorial, you need the following:
| Prerequisites | Useful resources | 
|---|---|
| A Hazelcast cluster running in client/server mode | 
Step 1. Set up Amazon Kinesis
- 
To create a data stream, follow the steps in the KDS Developer Guide. Instead of StockTradeStream, name your data streamTweets.
- 
To set up permissions, follow the steps in the KDS Developer Guide. For the sake of this tutorial, depending on your security constraints, it might be acceptable to enable all permissions for the needed services. Keep in mind that you are setting up a stream called Tweetsinstead ofStockTradeStream.
- 
To check that everything is set up correctly, install the AWS CLI and perform some basic operations with it. 
Step 2. Create a New Java Project
We’ll assume you’re using an IDE. Create a blank Java project named
kinesis-tutorial and copy the Gradle or Maven file into it:
plugins {
    id 'java'
}
group 'org.example'
version '1.0-SNAPSHOT'
repositories.mavenCentral()
dependencies {
    implementation 'com.hazelcast:hazelcast:5.4.1'
    implementation 'com.hazelcast.jet:hazelcast-jet-kinesis:5.4.1'
}<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>org.example</groupId>
    <artifactId>kinesis-tutorial</artifactId>
    <version>1.0-SNAPSHOT</version>
    <properties>
        <maven.compiler.target>1.8</maven.compiler.target>
        <maven.compiler.source>1.8</maven.compiler.source>
    </properties>
    <dependencies>
        <dependency>
            <groupId>com.hazelcast.jet</groupId>
            <artifactId>hazelcast-jet</artifactId>
            <version>5.4.1</version>
        </dependency>
        <dependency>
            <groupId>com.hazelcast.jet</groupId>
            <artifactId>hazelcast-jet-kinesis</artifactId>
            <version>5.4.1</version>
        </dependency>
    </dependencies>
</project>Step 3. Publish a Stream to Kinesis
This code publishes "tweets" (just some simple strings) to the Kinesis
 data stream Tweets, with varying intensity:
package org.example;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.jet.Traversers;
import com.hazelcast.jet.config.JobConfig;
import com.hazelcast.jet.kinesis.KinesisSinks;
import com.hazelcast.jet.pipeline.Pipeline;
import com.hazelcast.jet.pipeline.test.TestSources;
import java.util.Map.Entry;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.LongStream;
import java.util.stream.Stream;
import static com.hazelcast.jet.Util.entry;
public class TweetPublisher {
  public static void main(String[] args) {
    Pipeline pipeline = Pipeline.create();
    pipeline.readFrom(TestSources.itemStream(3))
     .withoutTimestamps()
     .flatMap(event -> {
       ThreadLocalRandom random = ThreadLocalRandom.current();
       long count = random.nextLong(1, 10);
       Stream<Entry<String, byte[]>> tweets = LongStream.range(0, count)
           .map(l -> event.sequence() * 10 + l)
           .boxed()
           .map(l -> entry(
               Long.toString(l % 10),
               String.format("tweet-%0,4d", l).getBytes())
           );
       return Traversers.traverseStream(tweets);
     })
     .writeTo(KinesisSinks.kinesis("Tweets").build());
    JobConfig cfg = new JobConfig().setName("tweet-publisher");
    HazelcastInstance hz = Hazelcast.bootstrappedInstance();
    hz.getJet().newJob(pipeline, cfg);
  }
}You may run this code from your IDE, and it will work, but it will create its own Hazelcast member. To run it on the Hazelcast member you already started, use the command line like this:
gradle build
bin/hz-cli submit -c org.example.TweetPublisher build/libs/kinesis-tutorial-1.0-SNAPSHOT.jarmvn package
bin/hz-cli submit -c org.example.TweetPublisher target/kinesis-tutorial-1.0-SNAPSHOT.jarLet it run in the background while we go on to creating the next class.
Step 4. Use Hazelcast to Analyze the Stream
This code lets Hazelcast connect to Kinesis and show how many events per second were published to the Kinesis stream at a given time:
package org.example;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.jet.config.JobConfig;
import com.hazelcast.jet.kinesis.KinesisSources;
import com.hazelcast.jet.pipeline.Pipeline;
import com.hazelcast.jet.pipeline.Sinks;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import static com.hazelcast.jet.aggregate.AggregateOperations.counting;
import static com.hazelcast.jet.pipeline.WindowDefinition.sliding;
public class JetJob {
  static final DateTimeFormatter TIME_FORMATTER =
      DateTimeFormatter.ofPattern("HH:mm:ss:SSS");
  public static void main(String[] args) {
    StreamSource<Map.Entry<String, byte[]>> source = KinesisSources.kinesis("Tweets")
     .withInitialShardIteratorRule(".*", "LATEST", null)
     .build();
    Pipeline pipeline = Pipeline.create();
    pipeline.readFrom(source)
     .withNativeTimestamps(3_000) //allow for some lateness in KDS timestamps
     .window(sliding(1_000, 500))
     .aggregate(counting())
     .writeTo(Sinks.logger(wr -> String.format(
         "At %s Kinesis got %,d tweets per second",
         TIME_FORMATTER.format(LocalDateTime.ofInstant(
             Instant.ofEpochMilli(wr.end()), ZoneId.systemDefault())),
         wr.result())));
    JobConfig cfg = new JobConfig().setName("kinesis-traffic-monitor");
    HazelcastInstance hz = Hazelcast.bootstrappedInstance();
    hz.getJet().newJob(pipeline, cfg);
  }
}You may run this code from your IDE and it will work, but it will create its own Hazelcast instance. To run it on the Hazelcast instance you already started, use the command line like this:
gradle build
bin/hz-cli submit -c org.example.JetJob build/libs/kinesis-tutorial-1.0-SNAPSHOT.jarmvn package
bin/hz-cli submit -c org.example.JetJob target/kinesis-tutorial-1.0-SNAPSHOT.jarNow go to the window where you started Hazelcast. Its log output will contain the output from the pipeline.
If TweetPublisher was running while you were following these steps,
you’ll now get a report on the whole history and then a steady stream of
real-time updates. If you restart this program, you’ll get all the
history again. That’s how Hazelcast behaves when working with a
replayable source.
Sample output:
... At 16:11:27:500 Kinesis got 13 tweets per second
... At 16:11:28:000 Kinesis got 17 tweets per second
... At 16:11:28:500 Kinesis got 8 tweets per secondStep 5. Clean up
- 
Cancel the jobs bin/hz-cli cancel tweet-publisher bin/hz-cli cancel kinesis-traffic-monitor
- 
Shut down the Hazelcast cluster bin/hz-stop
- 
Clean up the Tweetsstream in Kinesis, using the AWS Console or the CLI.
Next Steps
Learn more about the Kinesis connector to find out how to override backend parameters like region, endpoint, and security keys.