Build event streams
This tutorial shows how to build bespoke, enriched event streams from Kafka.
Describe the data sources
Our demo has a few services running, which we’ll join together to create a bespoke event stream:
-
Kafka Service: A topic that publishes a message whenever a new review is published
-
FilmsDb: A database containing a list of films
-
ListingsApi: A REST API that tells us where our movies are currently playing
Our services share a taxonomy used to describe the elements we can fetch:
namespace reviews {
type FilmId inherits String
type FilmTitle inherits String
type ReviewText inherits String
type PerformanceDate inherits DateTime
type TheatreName inherits String
}
Add the Kafka broker
Configure the Kafka connection in connections.conf
:
kafka {
"my-kafka" {
connectionName=my-kafka
connectionParameters {
brokerAddress="kafka:19092"
groupId=flow
}
}
}
Describe the Kafka topic
Next, add some Taxi metadata to the message written onto our Kafka topic:
@lang.taxi.formats.ProtobufMessage(packageName = "" , messageName = "NewReviewPostedMessage")
model NewReviewPostedMessage {
@lang.taxi.formats.ProtobufField(tag = 1 , protoType = "int32") filmId : reviews.FilmId
@lang.taxi.formats.ProtobufField(tag = 2 , protoType = "string") reviewText : reviews.ReviewText
}
Finally, declare the Kafka topic:
@flow.kafka.KafkaService(connectionName = "my-kafka")
service KafkaService {
@flow.kafka.KafkaOperation(topic = "reviews" , offset = "latest")
stream reviews : Stream<NewReviewPostedMessage>
}
Create an event stream
We can create a dedicated event stream with the data we need by submitting the following query to Flow:
The following query is an example of how your streaming query might look if a Kafka topic named reviews , a database named FilmsDb , and a REST API named ListingsApi are connected.
|
stream { NewReviewPostedMessage } as {
// We define the field names that matter to us.
// Flow matches on data types, as field names often differ between systems.
id : FilmId
review : ReviewText
name : FilmTitle // Fetched by a database call
nextPerformances: FilmListing[] // Fetched by a REST API call
}
Specify The GroupId For Kafka subscriptions
When Flow subscribes to a Kafka topic for a streaming query, it will use the groupId
connection parameter defined in connections.conf
to set the value of group id
for the corresponding Kafka consumer.
kafka {
"myKafkaBroker" {
connectionName=myKafkaBroker
connectionParameters {
brokerAddress="localhost:9092"
groupId=flow
}
}
}
In the above configuration, the groupId
is set to flow
. However, you can override the groupId
value in your queries by using the @StreamConsumer
annotation. Here is an example:
@StreamConsumer(id = "reviewStreamGroup")
stream { NewReviewPostedMessage } as {
id : FilmId
review : ReviewText
name : FilmTitle // Fetched by a database call
nextPerformances: FilmListing[] // Fetched by a REST API call
}
The Kafka consumer created for the above streaming query will set the group id
to reviewStreamGroup