AWS CloudWatch

Meter Ingestion via serverless agent for AWS CloudWatch

If you want to try out Amberflo without making significant changes to your code (and importing a new library), you can configure Amberflo's Serverless Agent for AWS Cloudwatch to automatically extract data from CloudWatch logs into Amberflo as meters.

What you need to do is configure our serverless agent to attach to your CloudWatch system in your AWS VPC. You then log the meter records (accompanied with an Amberflo tag, so the agent can identify the meter records). The agent will automatically extract all logs with the tag, and ingest them as meters.

You can find a demo project example here. It shows a REST API that is served by a lambda function, and how to meter this lambda (for instance, for tracking the number of API calls). In addition to the CloudWatch agent, it shows other ways to meter the lambda.

To learn more about our integration with AWS CloudWatch, please contact us.

822

How does it work?

Here is a step by step description of how the CloudWatch agent works:

  1. Your code runs and logs meter records to a CloudWatch log group (this is the default for an AWS Lambda);
  2. Then, a Kinesis Stream collects all the (relevant) log entries via the Subscription Filter (if you have multiple log groups you can collect from all of them to the same Kinesis Stream);
  3. Finally, a lambda agent (code below) consumes the log entries from the Kinesis Stream, extract the meter records and send them to Amberflo for ingestion.

If you are familiar with AWS SAM or Cloud Formation, you can adapt our demo project to your needs and use it to deploy the agent. We recommend you use the Kinesis stream method, as it is more flexible.

Otherwise, please follow the Using CloudWatch Logs subscription filters guide for setting up the Kinesis stream and CloudWatch subscription filter. Note that in the example below, the filter pattern is meter_record_for_stream.

You can use the code below (NodeJS) for the Kinesis Stream consumer lambda:

'use strict';

const crypto = require('crypto');
const zlib = require('zlib');
const AWS = require('aws-sdk');

const bucketName = process.env.INGEST_BUCKET_NAME;
const accessKeyId = process.env.ACCESS_KEY;
const secretAccessKey = process.env.SECRET_KEY;

const s3 = new AWS.S3({
    region: 'us-west-2',
    accessKeyId,
    secretAccessKey,
});

const prefix = 'meter_record_for_stream';

exports.handler = async (event) => {
    const records = event
        .Records
        .map(r => r.kinesis.data)
        .map(d =>
            // Decompress and parse the CloudWatch payload
            JSON.parse(zlib.gunzipSync(Buffer.from(d, 'base64')).toString())
        )
        .filter(m => m.messageType !== 'CONTROL_MESSAGE')
        .map(p => p
            .logEvents
            .map(x => x.message)
            .map(m => {
                const i = m.indexOf(prefix);
                if (i < 0) return;  // get only messages containing meter records
                return JSON.parse(m.slice(i + prefix.length + 1));
            })
            .filter(x => x)
        ).flat();

    await ingest(records);
};

async function ingest(records) {
    const date = new Date().toISOString().slice(0, 10);
    const key = `ingest/records/${date}/${crypto.randomBytes(20).toString('hex')}.json`;

    const params = {
        Bucket: bucketName,
        Key: key,
        Body: JSON.stringify(records),
    };
    return s3.putObject(params).promise();
}

This code will take log entries like the one below and ingest them in batches through your Amberflo-provided S3 bucket.

INFO meter_record_for_stream {
    "meterApiName": "api-calls",
    "customerId": "70f1dd87-6978-4d96-a934-5a83b63cdeb1",
    "meterTimeInMillis": 1663094105062,
    "meterValue": 1,
    "uniqueId": "c91b8860-3392-11ed-a17b-bfa2e899d2c9",
    "dimensions": {
        "method": "GET"
        "endpoint": "meter-example"
    }
}

📘

Ingest record format

If you log the meter records in a different format, you can modify the code above to marshal the records into the Amberflo ingest format.

1880