Hazelcast as a data source
In addition to using Hazelcast as a cache, Flow can use Hazelcast as a source for reading and writing data, as well as a streaming data source.
Add a Hazelcast data source
To enable Hazelcast as a data source, first define a connection in your connections.conf
file:
hazelcast {
myHazelcast {
connectionName = myHazelcast
addresses = ["hazelcast:5701"]
}
}
Assuming you are using Docker Compose to run Flow, here is an example of how to configure an external Hazelcast container: |
hazelcast:
image: "docker.io/hazelcast/hazelcast:latest"
healthcheck:
test: curl --fail http://localhost:5701/hazelcast/health || exit 1
interval: 30s
timeout: 5s
retries: 3
ports:
- "5701:5701"
environment:
HZ_MANAGEMENTCENTER_SCRIPTINGENABLED: true
HZ_MANAGEMENTCENTER_CONSOLEENABLED: true
Supported formats
When values are written to Hazelcast, they must be serialized.
The following formats are supported. Note that because Flow does not
have access to custom Java classes, serialization formats that require classloading
(such as Serializable
, Externalizable
and Custom Serialization) are not supported.
Compact (preferred)
Compact Serialization is the preferred format when using Flow to read/write to Hazelcast.
To enable Compact serialization, add a @CompactObject
annotation to your Taxi definition:
import flow.hazelcast.CompactObject
import flow.hazelcast.HazelcastMap
@HazelcastMap(name = "films")
@CompactObject
closed model Film {
@Id
filmId: FilmId inherits Int
// ...omitted...
}
Write data to Hazelcast
To write data into a Hazelcast map, define a Hazelcast service in your Taxi project, with a write operation:
import flow.hazelcast.HazelcastService
import flow.hazelcast.HazelcastMap
import flow.hazelcast.UpsertOperation
import flow.hazelcast.CompactObject
// The HazelcastService annotation should specify the
// name of the connection defined in your connections.conf
@HazelcastService(connectionName = "myHazelcast")
service HazelcastService {
// Expose a write operation, and annotate it
// with UpsertOperation, to define the writing behavior
@UpsertOperation
write operation upsert(Film):Film
}
type Language inherits String
// Define the name of the map to use. If not present in Hazelcast,
// the map wil be created
@HazelcastMap(name = "films")
// specify the serialization format
@CompactObject
closed model Film {
// A field annotated with @Id becomes the key.
// Composite keys are not supported
@Id
filmId : FilmId inherits Int
title : Title inherits String
languages: Language[]
}
With your map declared, use a mutation query to insert data:
Examples
Insert a static value into Hazelcast
// inserting a static value into Hazelcast
given { film : Film =
{
filmId : 123,
title : "Star Wars",
languages : ["English" , "American" ]
}
}
call HazelcastService::upsert
Store a Query Result in Hazelcast
To write the outcome of a query to Hazelcast, it’s not necessary for the query’s result format to align with the format of the persisted value.
Flow will automatically adapt the query result to the required persistence format, which may involve projections and even calling additional services if needed.
find { NetflixFilms[] } as {
// projection not shown
}
call HazelcastService::upsert
Query data from Hazelcast
Flow supports querying from Hazelcast using both direct key lookups, and rich query criteria.
Define a map to query
To query a map, you first define a service
that exposes your
Hazelcast map.
Maps are exposed using Taxi’s table
declaration, as this indicates
a data source that supports rich querying.
Here’s a complete example:
import flow.hazelcast.HazelcastService
import flow.hazelcast.HazelcastMap
import flow.hazelcast.CompactObject
// The HazelcastService annotation should specify the
// name of the connection defined in your connections.conf
@HazelcastService(connectionName = "myHazelcast")
service HazelcastService {
// Table is a shorthand to declare
// a data source that supports rich querying.
table films : Film[]
}
// Define the name of the map to query.
@HazelcastMap(name = "films")
// specify the serialization format
@CompactObject
closed model Film {
// A field annotated with @Id becomes the key, which
// is used when performing key lookups
@Id
filmId : FilmId inherits Int
title : Title inherits String
languages: Language[]
}
Write queries
Once a Hazelcast map is exposed, it can be queried as a standard data source, including being used as a data source when projecting and joining data from other sources (such as APIs, Kafka topics, or databases).
Here are some sample queries:
Fetch a value with a specific key
If criteria are defined against the key (as defined using an @Id
annotation),
then a key lookup is performed:
// Assuming FilmId is annotated as @Id in the Film model
// as shown...
model Film {
@Id
filmId : FilmId inherits Int
// ..snip..
}
// Elsewhere, writing a query...
find { Film( FilmId == 123 ) }
Stream data from Hazelcast
Hazelcast maps can be treated as data streams, where inserts or updates are created as streams of events which can be queried using Flow.
The following events trigger the current state of the record to be written to the event stream:
-
Entry Added
-
Entry Updated
Declare a map as a stream
To stream updates from a map, you first define a service
that exposes your
Hazelcast map as a stream.
Here’s a complete example:
import flow.hazelcast.HazelcastService
import flow.hazelcast.HazelcastMap
import flow.hazelcast.CompactObject
// The HazelcastService annotation should specify the
// name of the connection defined in your connections.conf
@HazelcastService(connectionName = "myHazelcast")
service HazelcastService {
stream films : Stream<Film>
}
// Define the name of the map to query.
@HazelcastMap(name = "films")
// specify the serialization format
@CompactObject
closed model Film {
// A field annotated with @Id becomes the key, which
// is used when performing key lookups
@Id
filmId : FilmId inherits Int
title : Title inherits String
languages: Language[]
}
Write streaming queries
Once a Hazelcast map is exposed as a stream, it can be queried as a standard data source, including used as a data source when projecting and joining data from other sources (such as APIs, Kafka topics, or databases).
Below are some sample queries. It’s useful when testing to combine this with writing data to Hazelcast to trigger change events which produce values on the data stream.
Here are some sample queries: