Use AWS services
Before you begin using AWS services, you need to define a connection to AWS in your connections.conf
file. Read more about defining AWS connections here.
Flow supports reading Protobuf messages, either as a message format for HTTP endpoints or from a streaming query originating from Kafka.
You can annotate Protobuf sources directly with Taxi metadata, or you can import through the Schema Importer.
DynamoDb
DynamoDb services can only be declared in Taxi.
Querying against key is supported, as are write operations.
To expose a DynamoDb service, the following two annotations are required:
-
Add a
@flow.aws.dynamo.Table
to a model -
Add a
@flow.aws.dynamo.DynamoService
annotation to the service
import flow.aws.dynamo.DynamoService
import flow.aws.dynamo.Table
// connectionName must match the defined connection.
@Table( connectionName = "myAws" , tableName = "reviews" )
model Review {
@Id
movieId : MovieId
score: ReviewScore inherits Int
}
@DynamoService
service DynamoService {
// Exposes query operations that return Review[]
table reviews: Review[]
// allows upsert calls
@UpsertOperation
write operation saveReview(Review):Review
}
Write operations (such as upserts) can only be invoked in mutations.
Lambda
Flow can invoke a Lambda, either to discover data in a query, or in a mutation.
A service must have the AwsLambdaService
annotation, passing the configured connection.
Operations have the LambdaOperation
annotation.
import flow.aws.lambda.AwsLambdaService
import flow.aws.lambda.LambdaOperation
@AwsLambdaService( connectionName = "myAws" )
service MovieDb {
@LambdaOperation(name = "streamingprovider")
operation movieQuery(@RequestBody StreamingProviderRequest): StreamingProviderResponse
}
S3
Flow can use S3 as a source for both reading and writing data.
Read from S3
To declare a data source that exposes data, take the following steps:
-
Declare a service annotated with
@S3Service
-
Within that service, declare an operation with
@S3Operation
, providing the bucket name -
The operation should take a single parameter, of type
FilenamePattern
, indicating the filename (or pattern) to read from
Be sure to either add the imports for S3Service , S3Operation and FilenamePattern (as shown below), or use their fully qualified names.
|
Here’s an example:
import flow.aws.s3.S3Service
import flow.aws.s3.S3Operation
import flow.aws.s3.FilenamePattern
@S3Service(connectionName = "MyAwsConnection")
service AwsBucketService {
@S3Operation(bucket = "MyBucket")
operation readBucket(filename:FilenamePattern = "films.csv"):Film[]
}
Filename patterns when reading from S3
When reading from S3 files, you can either specify a single filename (e.g., films.csv
),
or a pattern - such as film*.csv
, .csv
, or even just to read everything in the bucket.
When working with a pattern, Flow will read and combine all matching files, treating them as a single response.
For information about supported file formats, see file formats.
Examples - Reading from S3
Each of the below examples will work with the following model of content stored on S3, so we’re defining it once here for brevity:
import flow.formats.Csv
import flow.aws.s3.S3Service
import flow.aws.s3.S3Operation
import flow.aws.s3.FilenamePattern
// Define the format we're reading.
// This is a CSV file, so the @Csv annotation is used.
// (Note it's also imported at the top of the file)
@Csv
type TradeSummary {
symbol : Symbol inherits String
open : OpenPrice inherits Decimal
high : HighPrice inherits Decimal
close : ClosePrice inherits Decimal
}
Read a single file
@S3Service(connectionName = "MyAwsConnection")
service AwsBucketService {
@S3Operation(bucket = "MyTrades")
operation readBucket(filename:FilenamePattern = "trades.csv"):TradeSummary[]
}
With the above schema, we can issue a simple query to return the contents of trade.csv
:
find { TradeSummary[] }
Read the contents of multiple files
@S3Service(connectionName = "MyAwsConnection")
service AwsBucketService {
@S3Operation(bucket = "MyTrades")
// This will read all files ending in csv present in the bucket
operation readBucket(filename:FilenamePattern = "*.csv"):TradeSummary[]
}
With the above schema, we can issue a simple query to return the contents of all *.csv
files in the bucket:
find { TradeSummary[] }
Read files and expose as an HTTP endpoint
Using the same service definition as shown above, we can expose the contents of our *.csv
files
with a query published as an HTTP endpoint:
import taxi.http.HttpOperation
@HttpOperation(url = "/api/q/trades", method = "GET")
find { TradeSummary[] }
For more information, see publish queries as endpoints.
Read files and publish to Kafka
This example shows how to read a CSV file from S3, and publish each row as an individual message to Kafka, as a JSON object.
First, we’ll declare our Kafka broker and associated message format:
import flow.kafka.KafkaService
import flow.kafka.KafkaOperation
// The message format we're publishing to Kafka.
// Because there's no format defined, it's JSON by default
model TradeSummaryEvent {
ticker : Symbol
// field names and structure are different, but the
// types are the same as on our source model.
prices: {
openPrice : OpenPrice
highPrice : HighPrice
closePrice : ClosePrice
}
}
// Declare our Kafka service and operation
@KafkaService( connectionName = "market-prices" )
service MyKafkaService {
// Define an operation that writes to Kafka
@KafkaOperation( topic = "tradeRecords" )
write operation publishTrades(TradeSummaryEvent):TradeSummaryEvent
}
With the above in place, we can write a query that reads from S3, transforms from CSV to our JSON format, and writes it out to Kafka.
find { TradeSummary[] }
call MyKafkaService::publishTrades
In the above example, Flow detects that the inbound model (TradeSummary
) is
different from the destination format (TradeSummaryEvent
) and handles the transformation
for us.
In our example, that’s simple converting from CSV to JSON and restructuring the message. However, the transformation could be richer, doing tasks such as calling services to discover data.
Finally, we might want to expose an HTTP POST operation to trigger this update:
import taxi.http.HttpOperation
@HttpOperation(url = "/api/q/publishTradeUpdates", method = "POST")
find { TradeSummary[] }
call MyKafkaService::publishTrades
For more information about working with Kafka, including defining connections to brokers, see our dedicated docs on Kafka.
Read files and save to a database
This example shows how to read a CSV file from S3, and save each row as a record to a database.
First, we’ll define our database table, and associated service:
import flow.jdbc.Table
import flow.jdbc.DatabaseService
import flow.jdbc.InsertOperation
@Table(connection = "trades-database", schema = "public" , table = "trades" )
type TradeSummaryRecord {
symbol : Symbol
open : OpenPrice
high : HighPrice
close : ClosePrice
timestamp : Instant = now()
}
@DatabaseService(connection = "trades-database")
service TradesDatabase {
@InsertOperation
write operation saveTradeSummary(TradeSummaryRecord):TradeSummaryRecord
}
With the above in place, we can write a query that reads from S3, transforms from CSV to our database format, and performs the database inserts:
find { TradeSummary[] }
call TradesDatabase::saveTradeSummary
In the above example, Flow detects that the inbound model (TradeSummary
) is
different from the destination format (TradeSummaryRecord
) and handles the transformation
for us.
In our example, that’s simple converting from CSV to JSON and restructuring the message. However, the transformation could be richer, doing tasks such as calling services to discover data.
Finally, we might want to expose an HTTP POST operation to trigger this update:
import taxi.http.HttpOperation
@HttpOperation(url = "/api/q/publishTradeUpdates", method = "POST")
find { TradeSummary[] }
call TradesDatabase::saveTradeSummary
For more information about working with databases, including defining connections to databases, and the support for different types of databases, see Databases.
Write to S3
To declare an operation that can write data to S3, take the following steps:
-
Declare a service annotated with
@S3Service
-
Within that service, declare a
write
operation with@S3Operation
, providing the bucket name -
The operation should take two parameters:
-
One with a
@RequestPayload
annotation, which contains the contents to be written -
One of type
FilenamePattern
which defines the filename to write to
-
Be sure to either add the imports for S3Service , S3Operation , RequestBody and FilenamePattern (as shown below), or use their fully qualified names.
|
Here’s an example:
import flow.aws.s3.S3Service
import flow.aws.s3.S3Operation
import flow.aws.s3.FilenamePattern
import flow.aws.s3.RequestBody
@S3Service(connectionName = "MyAwsConnection")
service AwsBucketService {
@S3Operation(bucket = "MyBucket")
write operation writeToS3(@RequestBody films:Film[], filename:FilenamePattern = "films.csv"):Film[]
}
Examples - Writing to S3
Each of the below examples will work with the following model of content stored on S3, so we’re defining it once here for brevity:
import flow.formats.Csv
// Define the format we're reading.
// This is a CSV file, so the @Csv annotation is used.
// (Note it's also imported at the top of the file)
@Csv
type TradeSummary {
symbol : Symbol inherits String
open : OpenPrice inherits Decimal
high : HighPrice inherits Decimal
close : ClosePrice inherits Decimal
}
Fetch from an API and write the results to S3 as a CSV
This example shows data fetched from a REST API (exposed as JSON), and stored onto S3.
As part of the operation, we’ll transform a tree-like JSON structure into a flattened CSV file.
First, we’ll define the API and its response object:
model StockPriceUpdate {
ticker : Symbol
// field names and structure are different, but the
// types are the same as on our source model.
prices: {
openPrice : OpenPrice
highPrice : HighPrice
closePrice : ClosePrice
}
}
service ApiService {
@HttpOperation(url="http://myApi.com/prices", method = "GET")
operation getPrices():StockPriceUpdate[]
}
And, we’ll define a write operation on S3 to store the content:
import flow.aws.s3.S3Service
import flow.aws.s3.S3Operation
import flow.aws.s3.FilenamePattern
@S3Service( connectionName = "myAwsConnection" )
service AwsBucketService {
@S3Operation(bucket = "trades")
write operation writeTradeSummary(@RequestBody payload: TradeSummary[], filename: FilenamePattern = "trades.csv"):StockPriceCsv[]
}
Given the above, we can use the following query to read from our API, transform the data, and write to our S3 bucket:
find { StockPriceUpdate[] }
call AwsBucketService::writeTradeSummary
This will result in the data returned from our API call to be converted
to CSV and written to trades.csv
on our S3 bucket.
If we’d like to set the filename within our query, we could:
given { filename : FilenamePattern = 'todaysTrades.csv' }
find { StockPriceUpdate[] }
call AwsBucketService::writeTradeSummary
This time, the output is written to todaysTrades.csv
S3 file formats
In the above examples, our content has been stored in S3 using CSV.
This is defined because the model used in our operations is annotated with @Csv
,
as shown in the following excerpt:
import flow.formats.Csv
@Csv
type TradeSummary {
// ... omitted
}
@S3Service( connectionName = "myAwsConnection" )
service AwsBucketService {
// reading CSV
@S3Operation(bucket = "MyTrades")
// This operation returns a collection of
// TradeSummary objects, which are defined with @Csv
operation readBucket(filename:FilenamePattern = "*.csv"):TradeSummary[]
// writing CSV
@S3Operation(bucket = "trades")
write operation writeTradeSummary(
// The requesy body is a collection
// of trade summaries, which are configured as CSV
@RequestBody payload: TradeSummary[],
filename: FilenamePattern = "trades.csv"
):StockPriceCsv[]
}
The format can be any supported format, such as Avro, XML, CSV (or any other character-delimited file), or even Protobuf.
If no format is defined, JSON is used as the default.
For more information, see Data formats.
SQS
Consume events
Flow can subscribe to a stream of data from SQS.
import flow.aws.sqs.SqsService
import flow.aws.sqs.SqsOperation
@SqsService( connectionName = "moviesConnection" )
service MovieService {
@SqsOperation( queue = "movies" )
operation streamMovieQuery():Stream<Movie>
}
This can then be queried using a standard stream
query:
stream { Movie }
// as ...
Publish events
Flow can publish to a queue using a mutation:
import flow.aws.sqs.SqsService
import flow.aws.sqs.SqsOperation
@SqsService( connectionName = "moviesConnection" )
service MovieService {
@SqsOperation( queue = "movies" )
write operation publishMovieEvent(Movie):Movie
}
Publishing events can only be invoked in mutations.
Example: Consuming from one SQS topic, and publishing to another
import flow.aws.sqs.SqsService
import flow.aws.sqs.SqsOperation
@SqsService( connectionName = "moviesConnection" )
service MovieService {
@SqsOperation(queue = "newReleases" )
operation newReleases():Stream<Movie>
@SqsOperation( queue = "moviesToReview" )
write operation publishMovieEvent(Movie):Movie
}
// Query: consume from the new releases queue, and publish to
// a "movies to review" queue
stream { Movie }
call MovieService::publishMovieEvent