Kafka as a data source
Flow can read data from a Kafka topic to stream data as part of a query, as well as write data back to Kafka.
Define a connection to your Kafka broker
Kafka connections are stored in your connections.conf
config file, under the kafka
element.
The connection specifies how to connect to a broker, but not the specifics of actual topics.
(Those are specified on the @KafkaOperation()
metadata on a Taxi operation definition.)
kafka {
my-kafka-connection {
connectionName=my-kafka-connection
connectionParameters {
brokerAddress="localhost:29092,localhost:39092"
groupId="flow"
}
}
another-kafka-connection {
connectionName=another-kafka-connection
connectionParameters {
brokerAddress="localhost:29092,localhost:39092"
groupId="flow"
}
}
}
The following configuration options are supported under the connectionParameters
:
Config option | Purpose |
---|---|
|
A comma-separated list of broker addresses when connecting to the Kafka broker |
|
The groupId to use when connecting to a topic |
Additional Kafka connection properties
In addition to brokers
and groupId
(which are shorthand for bootstrap.servers
and group.id
respectively), you can provide
any of the Kafka consumer and producer config settings.
For example, to define a secure connection:
kafka {
my-kafka-connection {
connectionName=my-kafka-connection
connectionParameters {
brokerAddress="localhost:29092,localhost:39092"
groupId="flow"
"security.protocol"="SASL_PLAINTEXT"
"sasl.mechanism"="PLAIN"
"sasl.jaas.config"="org.apache.kafka.common.security.plain.PlainLoginModule required username='myKafkaUser' password="${KAFKA_PASSWORD}";"
}
}
}
Troubleshoot connection problems
Use env variables in auth settings
It’s common in Kafka configs that usernames and passwords are embedded within settings, like sasl.jaas.config
.
It’s recommended that usernames and passwords are read from env variables, which can lead to tricky HOCON concatenation issues.
The solution is to rely on HOCON concatenation. For example:
"sasl.jaas.config"="org.apache.kafka.common.security.plain.PlainLoginModule required username="${KAFKA_USERNAME}" password="${KAFKA_PASSWORD}";"
Pay close attention to how the "
in the setting are paired - i.e., no escaping, and no using '
to nest inner values.
For example:
kafka {
my-kafka-connection {
// ... omitted for brevity
connectionParameters {
"sasl.jaas.config"="org.apache.kafka.common.security.plain.PlainLoginModule required username='myKafkaUser' password="${KAFKA_PASSWORD}";"
}
}
}
Query a Kafka topic
A Kafka topic can be queried using a standard Taxi query, including enriching data from Kafka with other services, and joining multiple streams together.
Expose a topic
Kafka topics are declared in Taxi as simple operations which return a stream of data.
// 1: Add the required imports
import flow.kafka.KafkaService
import flow.kafka.KafkaOperation
// 2: Annotate the service as a `KafkaService`.
@KafkaService( connectionName = "market-prices" )
service MyKafkaService {
// 3: Annotate the operation as a `@KafkaOperation`
@KafkaOperation( topic = "stockPrices", offset = "earliest" )
// The topic is declared as a stream, and returns a Stream<>
stream streamStockPrices():Stream<StockPrice>
}
-
Add the required imports
-
Annotate the service with
@KafkaService
-
The
connectionName
parameter should match a connection defined in the connections config file
-
-
Annotate the operation as a
@KafkaOperation
, specifying the topic and offset -
The return type should be a
Stream<>
of the defined message type
Keys, headers and metadata
The message key, message headers, and Kafka-specific metadata (such as offset, partition and timestamp) can be provided into a Taxi model using annotations.
Message key
The message key can be accessed using the flow.kafka.KafkaMessageKey
annotation on a field:
import flow.kafka.KafkaMessageKey
model Movie {
// The key from Kafka will be read into the id property
@KafkaMessageKey
id: MovieId inherits String
title: Title inherits String
}
// Rest of the kafka topic declaration continues...
@KafkaService(connectionName = "moviesConnection")
service MovieService {
@KafkaOperation(topic = "movies", offset = "earliest")
stream streamMovieQuery: Stream<Movie>
}
Kafka metadata
Kafka metadata (such as offset, partition and timestamp) can be accessed using the com.flow.kafka.KafkaMessageMetadata
annotation on a field.
KafkaMessageMetadata
takes a single parameter, which is the metadata type you wish to read. Defined by the enum
type KafkaMetadataType
, the following values are defined:
enum KafkaMetadataType {
Partition,
Offset,
Timestamp,
TimestampType
}
For example:
import flow.kafka.KafkaMessageMetadata
import flow.kafka.KafkaMetadataType
model Movie {
@KafkaMessageMetadata(KafkaMetadataType.Offset)
offset: Int
@KafkaMessageMetadata(KafkaMetadataType.Timestamp)
timestamp: Long
@KafkaMessageMetadata(KafkaMetadataType.TimestampType)
timestampType: String
@KafkaMessageMetadata(KafkaMetadataType.Partition)
partition: Int
// Other fields continue...
title: Title inherits String
}
// Rest of the Kafka topic declaration continues...
@KafkaService(connectionName = "moviesConnection")
service MovieService {
@KafkaOperation(topic = "movies", offset = "earliest")
stream streamMovieQuery: Stream<Movie>
}
Headers
Kafka supports including arbitrary message headers along with the message, which are often used for things like correlation keys, etc.
These headers can be accessed using the com.flow.kafka.KafkaHeader
annotation:
import flow.kafka.KafkaHeader
model Movie {
@KafkaHeader("correlationId")
correlationId: CorrelationId inherits String
title: Title inherits String
}
// Rest of the Kafka topic declaration continues...
@KafkaService(connectionName = "moviesConnection")
service MovieService {
@KafkaOperation(topic = "movies", offset = "earliest")
stream streamMovieQuery: Stream<Movie>
}
Control deserialization
Message deserialization is defined by the model type being exposed. By default, models are expected to be JSON.
However, this can be controlled by annotating the model with a format annotation.
Example queries
Stream data from Kafka
// Invokes the `streamStockPrices` stream declared above
stream { StockPrice }
Enrich data from Kafka with other data sources
Data from a Kafka topic can be projected to enrich it with data from other sources.
Data requested that is not present on the Kafka payload is looked up from other sources, using Flow’s standard projections.
stream { StockPrice } as {
ticker : StockTicker // avaialble on the Kafka topic
lastTradedPrice : LastTradedPrice // Looked up from another data source
}[]
Filter Kafka streams
This examples reads all messages from the Kafka topic, but only
emits those with a stock ticker of AAPL
on the resulting stream:
stream { StockPrice.filterEach( ( StockTicker ) -> StockTicker == 'AAPL' ) }
Stream from Kafka to a database
Streams from Kafka can be inserted into a database (or any other writable source, such as Hazelcast or Dynamo) using a mutating query.
As with all mutating queries, it’s not necessary for the data from Kafka to align with the format of the data being written to the data destination.
Flow will automatically adapt the query result to the required persistence format, which may involve projections and even calling additional services if needed.
// First, ensure that your data destination exposes a writeable data source
// Full config omitted for brevity
service MyDatabaseService {
@UpsertOperation
write operation updateStockPrices(StockPriceSnapshot):StockPriceSnapshot
}
// Then, define a streaming query.
// In this example, the data format for StockPrice coming off of Kafka
// is different from the data being written to our database (StockPriceSnapshot)
// so {short-product-name} transforms the data automatically
stream { StockPrice }
call MyDatabaseService::updateStockPrices
Write to a Kafka topic
To make a topic writable, declare a write operation
in a Kafka service:
// 1: Add the required imports
import flow.kafka.KafkaService
import flow.kafka.KafkaOperation
// 2: Annotate the service as a `KafkaService`.
@KafkaService( connectionName = "market-prices" )
service MyKafkaService {
// ...other kafka topics omitted...
// 3: Annotate the operation as a `@KafkaOperation`
@KafkaOperation( topic = "stockPrices", offset = "earliest" )
// The operation is declared as a write operation
write operation publishStockPrice(StockPrice):StockPrice
}
Examples
Write a static value onto a Kafka topic
given { stockPrice : StockPrice =
{
symbol : 'AAPL',
price : 12.00203,
}
}
call MyKafkaService::publishStockPrice
Consume from one Kafka topic, and write to another topic
To stream data from a Kafka topic, enrich and republish.
@KafkaService( connectionName = "market-prices" )
service MyKafkaService {
@KafkaOperation( topic = "stockPrices" )
stream prices : Stream<StockPrice>
@KafkaOperation( topic = "enrichedPrices" )
write operation publishEnrichedPrices(EnrichedStockPrice):EnrichedStockPrice
}
The following query will consume from the stockPrices
topic, and for each message,
transform to an EnrichedStockPrice
, invoking any other services required to inject required data.
stream { StockPrice }
// The input parameter to publishEnrichedPrices
// is a EnrichedStockPrice, so each incoming
// StockPrice message is transformed to a
// EnrichedStockPrice payload, and published onto the
// enrichedPrices topic
call MyKafkaService::publishEnrichedPrices
Build a REST API that publishes to Kafka
This is a full example, where we create an HTTP endpoint accepting a POST
request
with a ticker symbol.
type StockSymbol inherits String
// The inbound request sent over HTTP requesting a stock price
model StockPricePublicationRequest {
ticker : StockSymbol
}
// The message we'll be publishing to Kafka
parameter model StockPriceUpdate {
ticker : StockSymbol
currentPrice : StockPrice
}
closed model CurrentStockPrice {
price : StockPrice
}
service PriceService {
@HttpOperation(url="http://fakeurl/prices/{symbol}", method = "GET")
operation getCurrentPrice(@PathVariable("symbol") symbol:StockSymbol):CurrentStockPrice
}
@KafkaService( connectionName = "market-prices" )
service MyKafkaService {
@KafkaOperation( topic = "stockPrices", offset = "earliest" )
write operation publishStockPrice(StockPriceUpdate):StockPriceUpdate
}
@HttpOperation(path = "/api/q/publishStockPrice", method = "POST")
query MySavedQuery(@RequestBody request:StockPriceRequest) {
given { request }
call MyKafkaService::publishStockPrice
}
The above example works as follows:
-
A
POST
request is sent to/api/q/publishStockPrice
with a body of:
{ "ticker" : "AAPL" }
-
The query asks for
publishStockPrice
to be called, which means aStockPriceUpdate
must be constructed -
To build a
StockPriceUpdate
, thecurrentPrice : StockPrice
is required, which is available from theprice
field ofCurrentStockPrice
object, returned fromgetCurrentPrice
-
A request to
http://fakeurl/prices/AAPL
is issued to discover the current stock price, returning:
{ "price" : 117.34 }
-
Finally, we have enough information to build a
StockPriceRequest
, so the message is published to Kafka:
{ "ticker" : "AAPL", "currentPrice" : 117.34 }