Dynamic partitioning with Amazon Data Firehose using CloudFormation

Posted: | Tags: til aws cloud

Dynamically partitioning events on Amazon Data Firehouse is possible using the jq 1.6 engine or using a Lambda function for custom parsing. Using JQ expressions through the console to partition events when configuring a Firehouse stream is straight forward provided you know the JQ expression and the source event schema. I found it difficult translating this configuration into a CloudFormation template after initially setting up the stream by click through the console.

This post will walk you through how I was able to setup a Firehose stream that uses inline JQ dynamic partitioning into S3. To keep testing simple I’ll be using the demo data available through the console to test if the configuration works.

The easiest way to get started, in my opinion, especially if you’re defining your own JQ expressions is to configure the stream through the console first. This gives you the ability to test the configuration and compare the results to the CloudFormation defined stream. In my setup using the demo data I have a stream with:

  • New line delimiter - Enabled
  • Dynamic partitioning - Enabled
  • Multi record deaggregation - Not enabled
  • Inline parsing for JSON - Enabled
  • Dynamic partitioning keys with their names and expressions are configured per the table below
    Key nameJQ expression
    sector.SECTOR| "sector=\(.)"
    ticker_symbol.TICKER_SYMBOL| "ticker_symbol=\(.)"
  • S3 bucket prefix > Apply dynamic partitioning keys. This will result in the following string !{partitionKeyFromQuery:sector}/!{partitionKeyFromQuery:ticker_symbol}/
  • S3 bucket error output prefix - error/

After the stream is created and active you can click “Start sending demo data”. A few minutes later you will start to see your destination S3 bucket populate. The format of the prefixes and objects in the bucket will be something like the tree diagram below. If yours is too we can continue to defining our CloudFormation template.

sector=FINANCIAL
├── ticker_symbol=PLM
│   └── PUT-S3-Blog-5-2024-06-17-09-00-28-0a663898-675c-3689-9e3f-9ba4c5f2a933
│   └── PUT-S3-Blog-5-2024-06-17-09-03-23-a306ba1c-aa31-394a-ac77-d35319e49d20
├── ticker_symbol=UHN
│   └── PUT-S3-Blog-5-2024-06-17-09-00-54-02b2b100-9e17-343d-9d42-cc923630315e
└── ticker_symbol=WSB
    ├── PUT-S3-Blog-5-2024-06-17-08-58-09-1f530147-5ad4-38ef-88b0-32624527fc76
    └── PUT-S3-Blog-5-2024-06-17-09-03-54-64aa3bd2-18e6-3a01-b7cf-32aa60b251b6
sector=HEALTHCARE
├── ticker_symbol=CRM
│   └── PUT-S3-Blog-5-2024-06-17-08-58-40-93453273-c58c-30b4-b43e-217e0b5c8d92
sector=RETAIL
├── ticker_symbol=DFT
│   └── PUT-S3-Blog-5-2024-06-17-09-00-33-2004645f-dc6c-3b45-9080-aa83e0bb5386
└── ticker_symbol=WMT
    ├── PUT-S3-Blog-5-2024-06-17-08-59-47-3d4c7944-3512-3ea6-8e9e-d41f84465d95
    └── PUT-S3-Blog-5-2024-06-17-09-04-30-1843ce09-9186-3a51-bffb-5913e0e8e761

When creating the Firehose stream resource the ExtendedS3DestinationConfiguration property block is used to define the IAM role, the S3 bucket, the DynamicPartitioningConfiguration, ProcessingConfiguration, Prefix and ErrorOutputPrefix.

DynamicPartitioningConfiguration is used to enable dynamic partitioning, ProcessingConfiguration where you can set the newline delimieter and the inline JSON parsing. You can convert the dynamic partitioning keys from the table above by placing them between curly braces {}, the key name goes first, and a colon : separates it from the JQ expression. The JQ expression quotes and backslashes need to be escaped with a backslash \. See the ParameterValue below for an example.

        ProcessingConfiguration:
          Enabled: true
          Processors:
            - Type: MetadataExtraction
              Parameters:
              - ParameterName: MetadataExtractionQuery
                ParameterValue: "{sector:.SECTOR| \"sector=\\(.)\",ticker_symbol:.TICKER_SYMBOL| \"ticker_symbol=\\(.)\"}"
              - ParameterName: JsonParsingEngine
                ParameterValue: "JQ-1.6"
            - Type: AppendDelimiterToRecord
              Parameters:
              - ParameterName: Delimiter
                ParameterValue: "\\n"

Within Prefix you can now use the key names from the ParameterValue to define how you’d like to partition your objects.

        Prefix: "!{partitionKeyFromQuery:sector}/!{partitionKeyFromQuery:ticker_symbol}/"

The full CloudFormation YAML file can be found below.

AWSTemplateFormatVersion: "2010-09-09"

Description: 'Create a Data Firehose stream into an S3 Bucket.'

Parameters:
  DeliveryStreamName:
    Description: Name of Data Firehose stream
    Type: String
    Default: blog-firehose-demo

  BucketName:
    Description: Name of the destination S3 Bucket
    Type: String

Resources:
  DeliveryStream:
    Type: AWS::KinesisFirehose::DeliveryStream
    Properties:
      DeliveryStreamName: !Ref DeliveryStreamName
      DeliveryStreamType: DirectPut
      ExtendedS3DestinationConfiguration:
        RoleARN: !GetAtt deliveryRole.Arn
        BucketARN: !Join
          - ''
          - - 'arn:aws:s3:::'
            - !Ref DestinationBucket
        ErrorOutputPrefix: "errors/"
        Prefix: "!{partitionKeyFromQuery:sector}/!{partitionKeyFromQuery:ticker_symbol}/"
        DynamicPartitioningConfiguration:
          Enabled: true
          RetryOptions:
            DurationInSeconds: 300
        ProcessingConfiguration:
          Enabled: true
          Processors:
            - Type: MetadataExtraction
              Parameters:
              - ParameterName: MetadataExtractionQuery
                ParameterValue: "{sector:.SECTOR| \"sector=\\(.)\",ticker_symbol:.TICKER_SYMBOL| \"ticker_symbol=\\(.)\"}"
              - ParameterName: JsonParsingEngine
                ParameterValue: "JQ-1.6"
            - Type: AppendDelimiterToRecord
              Parameters:
              - ParameterName: Delimiter
                ParameterValue: "\\n"

  FirehoseLogGroup:
    Type: AWS::Logs::LogGroup
    Properties: 
      RetentionInDays: 1

  FirehoseLogStream:      
    Type: AWS::Logs::LogStream
    Properties: 
      LogGroupName: !Ref FirehoseLogGroup

  DestinationBucket:
    Type: AWS::S3::Bucket
    Properties:
      VersioningConfiguration:
        Status: Enabled
      BucketName: !Ref BucketName
  deliveryRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: 2012-10-17
        Statement:
          - Sid: ''
            Effect: Allow
            Principal:
              Service: firehose.amazonaws.com
            Action: 'sts:AssumeRole'
            Condition:
              StringEquals:
                'sts:ExternalId': !Ref 'AWS::AccountId'
      Path: "/"
      Policies:
        - PolicyName: firehose_delivery_policy
          PolicyDocument:
            Version: 2012-10-17
            Statement:
              - Effect: Allow
                Action:
                  - 's3:AbortMultipartUpload'
                  - 's3:GetBucketLocation'
                  - 's3:GetObject'
                  - 's3:ListBucket'
                  - 's3:ListBucketMultipartUploads'
                  - 's3:PutObject'
                Resource:
                  - !Join
                    - ''
                    - - 'arn:aws:s3:::'
                      - !Ref DestinationBucket
                  - !Join
                    - ''
                    - - 'arn:aws:s3:::'
                      - !Ref DestinationBucket
                      - '/*'

Related ramblings