Dynamic partitioning with Amazon Data Firehose using CloudFormation
Posted: | Tags: til aws cloudDynamically partitioning events on Amazon Data Firehouse is possible using the jq 1.6
engine or using a Lambda function for custom parsing. Using JQ expressions through the console to partition events when configuring a Firehouse stream is straight forward provided you know the JQ expression and the source event schema. I found it difficult translating this configuration into a CloudFormation template after initially setting up the stream by click through the console.
This post will walk you through how I was able to setup a Firehose stream that uses inline JQ dynamic partitioning into S3. To keep testing simple I’ll be using the demo data available through the console to test if the configuration works.
The easiest way to get started, in my opinion, especially if you’re defining your own JQ expressions is to configure the stream through the console first. This gives you the ability to test the configuration and compare the results to the CloudFormation defined stream. In my setup using the demo data I have a stream with:
- New line delimiter - Enabled
- Dynamic partitioning - Enabled
- Multi record deaggregation - Not enabled
- Inline parsing for JSON - Enabled
- Dynamic partitioning keys with their names and expressions are configured per the table below
Key name JQ expression sector .SECTOR| "sector=\(.)"
ticker_symbol .TICKER_SYMBOL| "ticker_symbol=\(.)"
- S3 bucket prefix > Apply dynamic partitioning keys. This will result in the following string
!{partitionKeyFromQuery:sector}/!{partitionKeyFromQuery:ticker_symbol}/
- S3 bucket error output prefix -
error/
After the stream is created and active you can click “Start sending demo data”. A few minutes later you will start to see your destination S3 bucket populate. The format of the prefixes and objects in the bucket will be something like the tree diagram below. If yours is too we can continue to defining our CloudFormation template.
sector=FINANCIAL
├── ticker_symbol=PLM
│ └── PUT-S3-Blog-5-2024-06-17-09-00-28-0a663898-675c-3689-9e3f-9ba4c5f2a933
│ └── PUT-S3-Blog-5-2024-06-17-09-03-23-a306ba1c-aa31-394a-ac77-d35319e49d20
├── ticker_symbol=UHN
│ └── PUT-S3-Blog-5-2024-06-17-09-00-54-02b2b100-9e17-343d-9d42-cc923630315e
└── ticker_symbol=WSB
├── PUT-S3-Blog-5-2024-06-17-08-58-09-1f530147-5ad4-38ef-88b0-32624527fc76
└── PUT-S3-Blog-5-2024-06-17-09-03-54-64aa3bd2-18e6-3a01-b7cf-32aa60b251b6
sector=HEALTHCARE
├── ticker_symbol=CRM
│ └── PUT-S3-Blog-5-2024-06-17-08-58-40-93453273-c58c-30b4-b43e-217e0b5c8d92
sector=RETAIL
├── ticker_symbol=DFT
│ └── PUT-S3-Blog-5-2024-06-17-09-00-33-2004645f-dc6c-3b45-9080-aa83e0bb5386
└── ticker_symbol=WMT
├── PUT-S3-Blog-5-2024-06-17-08-59-47-3d4c7944-3512-3ea6-8e9e-d41f84465d95
└── PUT-S3-Blog-5-2024-06-17-09-04-30-1843ce09-9186-3a51-bffb-5913e0e8e761
When creating the Firehose stream resource the ExtendedS3DestinationConfiguration
property block is used to define the IAM role, the S3 bucket, the DynamicPartitioningConfiguration
, ProcessingConfiguration
, Prefix
and ErrorOutputPrefix
.
DynamicPartitioningConfiguration
is used to enable dynamic partitioning, ProcessingConfiguration
where you can set the newline delimieter and the inline JSON parsing.
You can convert the dynamic partitioning keys from the table above by placing them between curly braces {}
, the key name goes first, and a colon :
separates it from the JQ expression. The JQ expression quotes and backslashes need to be escaped with a backslash \
. See the ParameterValue
below for an example.
ProcessingConfiguration:
Enabled: true
Processors:
- Type: MetadataExtraction
Parameters:
- ParameterName: MetadataExtractionQuery
ParameterValue: "{sector:.SECTOR| \"sector=\\(.)\",ticker_symbol:.TICKER_SYMBOL| \"ticker_symbol=\\(.)\"}"
- ParameterName: JsonParsingEngine
ParameterValue: "JQ-1.6"
- Type: AppendDelimiterToRecord
Parameters:
- ParameterName: Delimiter
ParameterValue: "\\n"
Within Prefix
you can now use the key names from the ParameterValue
to define how you’d like to partition your objects.
Prefix: "!{partitionKeyFromQuery:sector}/!{partitionKeyFromQuery:ticker_symbol}/"
The full CloudFormation YAML file can be found below.
AWSTemplateFormatVersion: "2010-09-09"
Description: 'Create a Data Firehose stream into an S3 Bucket.'
Parameters:
DeliveryStreamName:
Description: Name of Data Firehose stream
Type: String
Default: blog-firehose-demo
BucketName:
Description: Name of the destination S3 Bucket
Type: String
Resources:
DeliveryStream:
Type: AWS::KinesisFirehose::DeliveryStream
Properties:
DeliveryStreamName: !Ref DeliveryStreamName
DeliveryStreamType: DirectPut
ExtendedS3DestinationConfiguration:
RoleARN: !GetAtt deliveryRole.Arn
BucketARN: !Join
- ''
- - 'arn:aws:s3:::'
- !Ref DestinationBucket
ErrorOutputPrefix: "errors/"
Prefix: "!{partitionKeyFromQuery:sector}/!{partitionKeyFromQuery:ticker_symbol}/"
DynamicPartitioningConfiguration:
Enabled: true
RetryOptions:
DurationInSeconds: 300
ProcessingConfiguration:
Enabled: true
Processors:
- Type: MetadataExtraction
Parameters:
- ParameterName: MetadataExtractionQuery
ParameterValue: "{sector:.SECTOR| \"sector=\\(.)\",ticker_symbol:.TICKER_SYMBOL| \"ticker_symbol=\\(.)\"}"
- ParameterName: JsonParsingEngine
ParameterValue: "JQ-1.6"
- Type: AppendDelimiterToRecord
Parameters:
- ParameterName: Delimiter
ParameterValue: "\\n"
FirehoseLogGroup:
Type: AWS::Logs::LogGroup
Properties:
RetentionInDays: 1
FirehoseLogStream:
Type: AWS::Logs::LogStream
Properties:
LogGroupName: !Ref FirehoseLogGroup
DestinationBucket:
Type: AWS::S3::Bucket
Properties:
VersioningConfiguration:
Status: Enabled
BucketName: !Ref BucketName
deliveryRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: 2012-10-17
Statement:
- Sid: ''
Effect: Allow
Principal:
Service: firehose.amazonaws.com
Action: 'sts:AssumeRole'
Condition:
StringEquals:
'sts:ExternalId': !Ref 'AWS::AccountId'
Path: "/"
Policies:
- PolicyName: firehose_delivery_policy
PolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Action:
- 's3:AbortMultipartUpload'
- 's3:GetBucketLocation'
- 's3:GetObject'
- 's3:ListBucket'
- 's3:ListBucketMultipartUploads'
- 's3:PutObject'
Resource:
- !Join
- ''
- - 'arn:aws:s3:::'
- !Ref DestinationBucket
- !Join
- ''
- - 'arn:aws:s3:::'
- !Ref DestinationBucket
- '/*'