Stream data
Write streaming queries
Streaming queries are executed in the same way as request/response
queries, but use the stream
keyword instead of find
.
For example, consider a Kafka topic emitting stock price updates:
model StockPrice {
symbol : StockSymbol
price : StockPrice
}
service KafkaService {
stream stockPrices : Stream<StockPrice>
}
This can be queried using the following:
stream { StockPrice }
This can be combined with other standard querying tools, such as projections and mutations:
stream { StockPrice } as {
symbol : StockSymbol
updateReceived : Instant = now()
currentPrice : StockPrice
totalTradedQuantity : TotalTradedQuantity
}
call TradeBookService::saveTradeSnapshot
Run long-lived streaming queries
Flow’s query editor is great for running short-lived streaming queries. However, oftentimes you want a streaming query to continue in the background.
To deploy a long-lived streaming query, simply define a query in one of your taxonomy projects. Typically, this is checked into a Git repository.
// A sample query that streams data from a Stock price stream,
// and writes to Postgres
query MySavedQuery {
stream { StockPrices }
call MyPostgresService::upsertStockPrice
}
Update streaming queries
Streaming queries are automatically upgraded whenever their definition changes.
Enable a streaming query
When each streaming query is detected for the first time, it’s disabled by default to prevent accidental data changes (as streams are often mutating).
You can enable a streaming query either via the UI, or an API call from the Endpoints page.
Note that once a streaming query has been enabled once, further updates are automatically deployed and the stream remains running.