Java sample to backfill meters from MongoDB

Java sample to backfill meters from historical data stored in MongoDB into Amberflo.

The sample will perform the following tasks:

  1. Read entries from MongoDB collection. In this sample, events are ingested as meters into Amberflo.
  2. For each entry, ingest an event meter into Amberflo. Ensure the create time is used as ingested event time in UTC. Use the entry id as uniqueId of the ingested meter to prevent duplicated ingested records if the code were to run multiple times.

Maven dependencies (check for latest versions):

<dependencies>
  <dependency>
    <groupId>io.amberflo</groupId>
    <artifactId>metering-java-client</artifactId>
    <version>1.3.6</version>
  </dependency>
  <dependency>
    <groupId>org.mongodb</groupId>
    <artifactId>mongo-java-driver</artifactId>
    <version>3.5.0</version>
  </dependency>
  <dependency>
    <groupId>com.stripe</groupId>
    <artifactId>stripe-java</artifactId>
    <version>20.52.0</version>
  </dependency>
</dependencies>
package com.amberflo.backfill;

import com.amberflo.metering.ingest.MeteringContext;
import com.amberflo.metering.ingest.meter_message.Domain;
import com.amberflo.metering.ingest.meter_message.MeterMessage;
import com.amberflo.metering.ingest.meter_message.MeterMessageBuilder;
import com.amberflo.metering.ingest.meter_message.Region;
import com.mongodb.MongoClient;
import com.mongodb.MongoClientURI;
import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.MongoDatabase;
import org.bson.Document;

import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.HashMap;
import java.util.Map;

import static com.amberflo.metering.ingest.MeteringContext.metering;

public class BackfillMongoDbData {
    //TODO: set the database connection, queries, and Amberflo info
    private static final String MONGO_HOST = "sample-db.mongo.host";
    private static final int MONGO_PORT = 27017;
    private static final String MONGO_USER = "sample-user";
    private static final String MONGO_PASS = "pass";
    private static final String DB_NAME = "sampleDb";
    private final static String AMBERFLO_API_KEY = "amberflo-api-key";
    //END TODO

    private final static String EVENT_CREATE = "event_create";

    public static void main(final String[] args) throws Exception {
        MeteringContext
                .createOrReplaceContext(
                        AMBERFLO_API_KEY,
                        "events-api",
                        Domain.Prod,
                        Region.US_West,
                        1,
                        10
                );

        // Mongodb connection string.
        final String connectionString = String.format("mongodb://%s:%s@%s:%s", MONGO_USER, MONGO_PASS, MONGO_HOST, MONGO_PORT);
        final MongoClientURI uri = new MongoClientURI(connectionString);

        // Connecting to the mongodb server using the given client uri.
        final MongoClient mongoClient = new MongoClient(uri);

        // Fetching the database from the mongodb.
        final MongoDatabase db = mongoClient.getDatabase(DB_NAME);

        // Fetching the collection from the mongodb.
        final MongoCollection<Document> col = db.getCollection("events");

        // Performing a read operation on the collection.
        final FindIterable<Document> fi = col.find();
        final MongoCursor<Document> cursor = fi.iterator();
        int counter = 1;
        try {
            while (cursor.hasNext()) {
                final Document document = cursor.next();
                if (document == null) {
                    continue;
                }
                //example of a nested document
                final Document eventData = (Document) document.get("eventData");
                final String eventSourceType = eventData.get("event_source_type").toString();
                final String eventType = eventData.get("event_type").toString();
                final String meterName = EVENT_CREATE;
                final String customerId = eventData.get("customer_id").toString();
                final String id = document.get("_id").toString();
                
                //Converting timestamp stored as double epoch
                final Long eventTimeEpoch = ((Double) Double.parseDouble(document.get("created_time_epoch_utc").toString())).longValue();
                final LocalDateTime eventTime = Instant.ofEpochMilli(eventTimeEpoch).atZone(ZoneId.of("UTC")).toLocalDateTime();

                System.out.println(String.format("Ingesting: event_source_type=%s, customerId=%s, id=%s, eventTime=%s, eventType=%s, meterName %s",
                        eventSourceType, customerId, id, eventTime, eventType, meterName));
                System.out.println(document.toJson());

                final Map<String, String> dimensions = new HashMap();
                dimensions.put("event_source_type", eventSourceType);
                dimensions.put("event_type", eventType);

                final MeterMessage meterMessage = MeterMessageBuilder
                        .createInstance(meterName, eventTime, customerId)
                        .setMeterValue(1)
                        .setUniqueId(id)
                        .setDimensionsMap(dimensions)
                        .build();

                metering().meter(meterMessage);

                if (counter % 100 == 0) {
                    Thread.sleep(5000);
                }
                counter++;
            }
            System.out.println("Total meters ingested: " + counter);
        } finally {
            cursor.close();
            mongoClient.close();
            MeteringContext.flushAndClose();
            System.out.println("done!");
        }
    }
}