Integrate a database, a REST API and Kafka
Overview
In this tutorial, we’ll set up a local instance of Flow, and then see how to use Flow to link data from a REST API, a database, and a Kafka topic.
This is a step-by-step introduction for beginners which walks you through the basics of linking data sources together using Flow’s UI. |
Our use case is to find a list of films and discover which online streaming service has the films available to watch.
This involves linking the Films list from our database with streaming service information from a REST API.
Finally, we’ll listen for updates on a Kafka topic, sending Protobuf messages, and join those with the database and API.
Here’s what our demo ecosystem looks like:
Prerequisites
You should have Docker and Docker Compose.
Start a local instance of Flow
In this tutorial, we’re going to deploy Flow, as well as a few demo projects that we’ll use throughout.
Everything you need is packaged up in a Docker Compose file to make getting started easier.
To launch, clone the repository and follow instructions in the README file:
git clone https://github.com/hazelcast/hazelcast-flow-public-demos.git
cd hazelcast-flow-public-demos/films-demo
After about a minute, Flow should be available at http://localhost:9021.
To make sure everything is ready to go, head to the Projects Explorer to make sure that some schemas are registered. It should look like the screenshot below. If you see a message saying there’s nothing registered, wait a few moments longer.
You now have Flow running locally, along with a handful of demo services which we’ll use in our next steps.
If you run docker ps
, you should see a collection of Docker containers now running.
Container Name | Part of Flow stack or Demo? | Description |
---|---|---|
flow |
Flow |
The core Flow server, which provides our UI, and runs all our integration for us |
management-center |
Flow |
The Management Center, for monitoring and managing Flow |
films-api |
Demo |
A REST API, which exposes information about films. We also publish a Protobuf message to Kafka from here |
pg-pagila |
Demo |
A Postgres DB, which contains the Postgres Pagila demo database for a fake DVD rental store |
kafka |
Demo (Kafka) |
The Apache Kafka image |
View the project
An empty project named films has been created for Flow to hold the connection details of our data sources, and schemas. The project is a Taxi project which gets edited locally, then checked into Git once you’re ready to go to production.
-
From the sidebar, click Projects
-
You will see the films project listed
-
Click on the project to view the details
Field | Value |
---|---|
Organisation |
|
Version |
|
As we progress through this tutorial, we’ll be adding connections to our database, REST API and Kafka topic to this project. You can view the taxi schemas that Flow generates in this project.
Connect a database table
Next, we’ll add a connection to our database, and make a table available as a datasource that Flow can fetch data from.
This demo ships with an instance of the Postgres demo DB called Pagila.
Pagila contains several tables related to running a fictional DVD Rental store, including details of all sorts of different films, actors, etc. We’ll use this database as part of our walk through.
To get started, click on Data sources on the side bar, and click Add a data source (or navigate to http://localhost:9021/data-source-manager/add)
Define the database connection
First, we need to tell Flow how to connect to the database.
-
From the project drop-down, select the "films" project
-
Select Database Table as the data source to add
-
For Connection, select Add a new connection…
-
A pop-up window appears, allowing you to create a connection to our database
-
Fill in the form with the details below:
Parameter | Value |
---|---|
Connection name |
films-database |
Connection type |
Postgres |
Host |
pg-pagila |
Port |
5432 |
Database |
pagila |
Username |
postgres |
Password |
admin |
-
Click Test connection and wait for the connection test to be successful
-
Click Save.
The connection to the database has now been created, and the pop-up should close.
Select the table to import
Now that Flow has a connection to the database, we need to select the tables we want to make available for Flow to query from.
Flow will create schema files for the contents of the table. Specifically, Flow will create:
-
A model for the table, defining all the fields that are present
-
A series of types, which describe the content of each field
-
A query service, which lets Flow run queries against the database
To import the schema:
-
Add a new data source using the new
films-database
connection and select thefilm
table -
Complete the form for the database table to import using the parameters below:
Parameter | Value |
---|---|
Connection |
|
Table |
|
Default namespace |
|
Namespaces are used to help us group related content together, like packages in Java or namespaces in C# and Typescript.
Here, we’re providing a default namespace, which will be applied to the types, models and services Flow will create importing this table.
-
Click Configure
Flow will connect to the database, and create all the necessary schema configuration for us for the table.
Preview the imported tables
Flow now shows a preview of the types, models and services that will be created.
Click around to explore the different models, types and services that will be created. For now, the defaults that have been assigned are good enough.
-
Click Save
Flow will create the necessary schema files in a local project.
Flow also creates a series of Taxi schema files that contain the schemas we’ve just imported. You can explore these files locally. You will find them in the 'taxi/src' directory.
cd taxi/
Taxi ships a great VS Code plugin which provides click-to-navigate, syntax highlighting, autocompletion and more.
You’ve now connected a database to Flow, and exposed one of its tables, so that Flow can run queries against it.
Connect a Swagger API
In this step, we want to tell Flow about our REST API, which exposes information about which streaming service each of our films is available on.
We’ll use the UI of Flow to import a Swagger definition of our REST API
-
Click Data Sources on the sidebar
-
Once again, click Add a data source
-
Alternatively, navigate to http://localhost:9021/data-source-manager/add
-
Select the films project as the target project
-
From the drop-down list, select Swagger / Open API as the type of schema to import
-
For the Swagger Source, select a URL
Fill in the form with the following values:
Parameter | Value |
---|---|
Swagger source |
|
Default namespace |
|
Base url |
Leave this blank |
-
Click Configure
Update the service type
A preview of the imported schema is once again displayed.
This time, we do need to modify some default values.
Click on Services → getStreamingProvidersForFilm
.
This shows the API operation that’s exposed in the Swagger spec we just imported. This API accepts the ID of a film, and returns information about the streaming services that have the film available to watch.
Now, take a look at the parameters section of the getStreamingProvidersForFilm
service (you may need to scroll down).
Note that the input parameter - filmId
is typed as Int
. Since we know that this is a FilmId (the same value that’s exposed
by the Films database table), we need to update the type accordingly, so that Flow knows these two pieces of information are linked.
-
Click on the
Int
link -
In the search box, type
FilmId
-
Select the FilmId type that’s shown
-
Finally, click Save
Great! We’ve now exposed the Swagger API to Flow.
What just happened?
We’ve connected the Swagger schema of a REST API to Flow. Flow now knows about this service, and will make calls to it as needed.
Importantly, we’ve defined a link from the data in our database to the data in the Rest API. The schema diagram shows an outline of this relationship:
Integrate services and loading data
Now that everything is set up, let’s fetch and integrate some data.
List all the films in the database
Queries in Flow are written in TaxiQL. TaxiQL is a simple query language that isn’t tied to one specific underlying technology (i.e., it’s independent of databases, APIs, etc.).
This means we can write queries for data without worrying where the data is served from.
Our first query is very simple - it just finds all the films.
-
Head over to the Query Builder, and select the Query Editor tab (or navigate to http://localhost:9021/query/editor)
-
Paste in the below query:
find { Film[] }
-
Click Run.
This query asks Flow for all Film
records.
When this query is executed, Flow looks for services that expose a collection of Films, and invokes them.
In our example, this means Flow will query the database to select all available films.
There are different options to show the result of Flow queries. These are displayed as tabs under the query editor.
-
Table - Ideal for tabular, two-dimensional data
-
Tree - Ideal for nested data
-
Raw - Raw JSON - ideal for larger result sets
-
Profile - What work Flow did to produce the result. Contains information about the systems called by Flow, performance stats and lineage information
Once the query has completed, a list of records appears in the grid.
Transform the data
Flow lets you restructure data in a way that’s useful to you. Our original query returned the data as a flat list, since it’s coming from a database.
However, for our purposes (let’s say we’re building a UI) we might want to restructure the data to a subset of fields, grouped in a way that’s useful.
-
Paste the below query into the Query Editor.
find { Film[] } as {
film: {
name: Title
id : FilmId
description: Description
}
productionDetails: {
released: ReleaseYear
}
}[]
-
Click Run.
This time, the data has been returned structured as a tree. To see the tree data, click on the Tree tab in the results panel.
Our data has now been restructured into a tree shape. Using this approach, we can change the shape of the structure, along with field names.
In Taxi language, this is called a projection as we’re changing the shape of the output.
Combine data from our DB and REST API
Finally, let’s add in data about which streaming movie service contains each movie. This requires linking data between our database and our REST API.
As Flow is handling all of the integration for us, this is as simple as updating our query to include the provider data.
Flow works out how to call the REST API, which data to pass, and what to collect.
-
Paste the below query:
find { Film[] } as {
film: {
name: Title
id : FilmId
description: Description
}
productionDetails: {
released: ReleaseYear
}
providers: StreamingProvider
}[]
-
Click Run.
When the query results are returned, as this is nested data, ensure you’re in the Tree view to see the results. Note that we now have data from our database, combined with data from our REST API.
Explore the query execution
Flow has several diagnostic tools to help us see what happened.
Explore the query execution plan
In the Profiler, click to see the high level integration plan that Flow used to execute the query, showing the services that were called, and how data was resolved at a field level.
Explore the individual server requests
In the Profiler, click to see a sequence diagram of calls that have taken place to different services. Clicking on any of the rows shows the actual request and response.
Explore cell-based lineage
Flow provides detailed trace lineage for each value shown in its results.
In Tree mode, try clicking on one of the names of the streaming providers. A lineage display will open, showing the trace of how the value was derived.
-
We can see that a value of Netflix was returned from an Http operation
-
The input to that Http operation was a FilmId - in our example, the value 1
-
Clicking on the FilmId expands the lineage graph to show where that FilmId came from
-
We can see that the FilmId was returned as the result of a database query
This deep lineage is very powerful for understanding how data has flowed, and proving the provenance of data that Flow is exposing.
Run our query via curl
Although Flow’s UI is powerful, developers will want to interact with Flow through its API. That’s a topic on its own, but here is an example of running the same query through Flow’s API, using curl.
Get a JSON payload
We can use curl to get the results of our query as a JSON document.
-
Copy and paste the below snippet into a shell window, and press Enter:
curl 'http://localhost:9021/api/taxiql' \
-H 'Accept: text/event-stream;charset-UTF-8' \
-H 'Content-Type: application/taxiql' \
--data-raw 'find { Film[] } as {
film: {
name: Title
id : FilmId
description: Description
}
productionDetails: {
released: ReleaseYear
}
providers: StreamingProvider
}[]'
Streaming versus batch results.
The curl command streams results from Flow as soon as they’re available. That’s because we set the Accept header to text/event-stream . This is both fast, and more efficient for Flow, as it’s not holding results in memory, allowing Flow to work on arbitrarily large datasets. If you’d rather have the results as a single batch, change the Accept header to -H 'Accept: application/json'
|
Add a Kafka streaming source
Now that we have Flow linking our Database and REST API, it’s time we add a Kafka stream into the mix.
We have a new releases topic that emits a message whenever Netflix decides to turn a beloved movie into a new TV series.
For this part of our demo, we’ll use Flow to listen for new release announcements, and join data from our REST API and Postgres DB.
Import a Protobuf schema
Our new releases topic emits a Protobuf message which Flow needs to know about.
To keep things simple in our demo, the Protobuf message is available via one of our APIs. You can view the Protobuf yourself by clicking on http://localhost:9981/proto.
For Flow (running inside the Docker Compose network), this is visible as http://films-api/proto
.
Import the spec by clicking Add a data source on the front page of Flow, or by navigating to http://localhost:9021/data-source-manager/add.
-
Select Protobuf as the type of schema to import
-
Set the Protobuf Source as a URL
-
Paste the URL:
http://films-api/proto
-
Click Configure
You should see a preview of a newly created model: NewFilmReleaseAnnouncement
.
To ensure the filmId
attribute in NewFilmReleaseAnnouncement uses the standard FilmId
type used elsewhere in the company, follow these steps:
-
Locate the Models Table: On the left-hand side of your screen, find the table labeled "Models."
-
Navigate to
NewFilmReleaseAnnouncement
-
Within the Models table, click on "Models" → "NewFilmReleaseAnnouncement." This will open the data model for
NewFilmReleaseAnnouncement
. -
Find the Attributes Table: In the NewFilmReleaseAnnouncement data model, locate the "Attributes" table. This table lists the properties (or attributes) of the NewFilmReleaseAnnouncement model.
-
-
Identify the
filmId
Attribute: In the "Attributes" table, find the row for the filmId attribute. You’ll see the current data type displayed next to it (likelyInt
). -
Change the
filmId
Type:-
Click on the underlined
Int
type next tofilmId
. This will open a dropdown or search box. -
In the search box, type "FilmId"
-
From the search results, select
com.hazelflix.films.filmsdatabase.film.types.FilmId
. This will update thefilmId
attribute to use the standardizedFilmId
type.
-
After selecting the FilmId type, click Save to import the protobuf definitions and save the changes.
The announcement field has been typed as Announcement
. As there’s no existing types in our company for this data,
it’s fine to leave as-is, and use the newly created type.
We’ve now imported a Protobuf schema, and linked its fields to other fields in our schema.
Import a Kafka topic
Next we need to tell Flow about the Kafka topic.
-
Click the Flow logo in the navigation bar to return to the Flow home page.
-
Click Add a Data Source or navigate to http://localhost:9021/data-source-manager/add
-
From the drop-down, select Kafka Topic
-
In the Connection Name, select Add a new connection…
Fill out the form with the following details:
Parameter | Value |
---|---|
Connection name |
|
Connection type |
|
Broker address |
|
Group Id |
|
-
Click Create. A new Kafka connection is created, and the popup closes
Fill out the rest of the form with the following details:
Parameter | Value |
---|---|
Connection name |
|
Topic |
|
Topic Offset |
|
Namespace |
|
Message Type |
|
Service and Operation Name |
Leave these blank |
-
Click Configure
A preview of the schema is shown.
By clicking Services → MyKafkaService → consumeFromReleases, you can see
a new operation has been created which returns a Stream<NewFilmReleaseAnnouncement>
.
Streams are a different type of operation. Rather than request / response like an HTTP operation exposes, these expose a continuous stream of data.
Take a look around, and then click Save.
The message type NewFilmReleaseAnnouncement should be pre-populated in a drop-down menu. If it’s not visible, double check that you’ve imported the Protobuf schema correctly.
|
Join data from Kafka, API and our DB
It’s time to explore writing some queries that join data from across all three sources.
First, let’s start with query our Kafka topic. Head over to the Query Editor, and paste the following query:
stream { NewFilmReleaseAnnouncement }
You should see results streaming in, which are being published to our Kafka topic.
Now, let’s enrich our Kafka stream with data from our other sources.
Cancel the running query, and paste the following:
import com.hazelflix.films.filmsdatabase.film.types.FilmId
stream { NewFilmReleaseAnnouncement } as {
// The announcement comes from our Kafka Protobuf message
news: {
announcement: NewFilmReleaseAnnouncement
}
// Grab some film information from the Database
film: {
name: Title
id : FilmId
description: Description
}
productionDetails: {
released: ReleaseYear
}
// And query the REST API to see where we can watch this
providers: StreamingProvider
}[]
In the results panel, you should see the following:
Looking in the Profiler tab, you can see the updated integration plan:
What’s next?
In this tutorial, we’ve set up Flow and used it to automatically integrate data from a Postgres Database, a REST API, and a Kafka topic with Protobuf.
Look under the hood
To get a better understanding of what’s happened under the hood, take a look at some of the files that Flow has generated during this tutorial.
Directory | What’s there? |
---|---|
|
The config file that lists all the projects - including the one we created. It defines where to read and write the schema files Flow created in the background. |
|
The schema project that Flow was writing schemas to |
|
A connections file defining the database and Kafka connections you imported in the UI. |
|
The Taxi schemas that Flow generated for you. These are the schemas that describe the data you’ve been working with. |