My team at Meltwater is responsible for tracking how our customers consume the variety of content that we offer. We also build aggregate reports to help our Sales team understand customer behavior. That requires collecting and querying billions of usage events each month. In this post, I will share which technical solution we built, and what we learned by building the system a second time.
Meltwater offers comprehensive media monitoring and analysis tools for content across online news, social media, print, broadcast, and podcasts. Our customers access this content via our Media Intelligence application. Our goal was to build a unified pipeline to collect usage data from our application, and generate automated reports periodically.
The First Solution - A Routing Service and Dedicated Databases
As our first approach, we built an event routing service to receive usage data from our Media Intelligence application and conditionally route it to an event processing pipeline, one pipeline being responsible for one report. Each pipeline consisted of an API to receive the usage data, filter the data further according to the requirements of the given report and store the filtered data in a dedicated database. An AWS Lambda function triggered the report generation and uploaded the final report to an S3 bucket.
Initially, this approach worked well for us. There were just a few reports needed so scalability wasn’t a major concern. Spinning up a new reporting solution involved replicating the reporting pipeline for each new report that we were required to submit, and adding a new entry to the routing service that points to the new pipeline. Having a dedicated pipeline for each reporting solution, allowed us to reduce the impact radius, if any service were to crash. However, as the number of content integrations grew over time, so did the number of reporting pipelines, each only slightly differing from the other in terms of the filtering rules.
Additionally, the logic for filtering events was spread across multiple components, each based on different conditions in each pipeline, making it difficult to reason about why an event was collected/dropped. We desperately needed to unify this filtering logic.
Revisiting the Design
It was clear that we needed a better solution. One that would reduce the duplication while ensuring high availability and keeping the costs low. So we set the following goals when building the new solution:
- Build as few services and write as little code as possible
- Reduce the duplication of logic, data & infrastructure
- Ensure resilience of the pipeline
- Keep the cost to a minimum
- Make the creation of new reports easier
The New Solution - Implementing a Data Lake
We decided to boil down the entire process to 4 simple steps. Collecting events, enriching them with all the information required by all reporting solutions, store them in a single data store and only filter the events in a single place at query time. Having an enrichment step in our pipeline meant that our data would constantly change structure as we integrate newer solutions. We realized that what we needed was essentially a data lake. A data lake is a repository that allows users to store structured data at any scale, without needing a predetermined schema when storing the data. Let’s take a look at how we implemented a pipeline to collect, enrich and store our structured data into a data lake and how we query this data.
Apache Kafka for Data Collection
There are a number of technologies available for real-time message streaming, such as AWS Kinesis, Apache Kafka and RabbitMQ to name a few. After evaluating different solutions, based on factors such as message size, data transfer rate, and delivery guarantees, we narrowed our options down to AWS Kinesis and Apache Kafka.
While Kinesis is a cost effective and scalable option for those who don’t wish to maintain any server instances, it requires the consumers to maintain a data store for tracking consumer offsets. Additionally, the responsibility of ensuring equitable distribution of Kinesis stream shards amongst all consumers, lies with the developer. Which meant we needed to write code to rebalance traffic, each time our consumers scaled up or down.
On the other hand, Kafka takes care of consumer offset management and equitable traffic distribution across a consumer group out of the box, by using an internal topic. But maintaining a Kafka cluster can get cumbersome. Not only does one need to manage Kafka brokers but also needs to manage additional Apache Zookeeper nodes to perform distributed coordination actions such as leader election and service discovery for Kafka brokers in the cluster. This didn’t fit well with our goal of having to maintain as few services as possible. What we needed was a managed Kafka cluster.
Managed Streaming with AWS MSK
We preferred using an AWS based solution since a lot of the services in our ecosystem already live in AWS. We came across AWS Managed Streaming for Kafka (MSK), which provides a fully managed Kafka cluster as a service. This was exactly what we were looking for. Using MSK meant that we only needed to worry about Kafka configuration, and not about managing the brokers themselves or their coordination. On the client side, we needed to build a simple Kafka client to read/write to and from the cluster. We could also scale the consumers as needed, without having to write any code for traffic distribution or have any additional data store. We created a multi-availability zone Kafka cluster, with a replication factor of 3 and a strong acknowledgement configuration on the producer, to ensure resilience. We also configured a sufficient retention period for our Kafka topics, so that if any producers/consumers were to crash, we would have enough time to bring them back up without losing any events.
Kafka Consumer/Producers for Enrichment
To enrich the usage data, we built a simple Kafka consumer and producer with Kotlin & Spring using spring-kafka. This service consumed raw usage data from one topic, and published enriched data to another topic from where it was shoveled to the data store. The enrichment service contained all the business logic in one place, so that all the necessary additional information that is required for all the reporting solutions gets added to the usage data, and the enriched usage data could be filtered differently for different reporting solutions by simply writing different queries.
AWS S3 for Storage
There are a number of factors why we chose S3 over a database.
Storing objects in S3 is significantly cheaper than using a database. We require ~3 TB storage each month. Let’s consider a simple database configuration - A single availability zone, postgres instance with instance type ‘db.t2.medium’ and SSD storage with no backups configured. The following shows the comparison of RDS against S3 (based on current AWS pricing).
|AWS RDS||AWS S3|
|Instance cost||~ 60 USD||-|
|Storage cost||~ 390 USD||~ 70 USD|
|Total monthly cost||~ 450 USD||~ 70 USD|
Long Term Storage
We needed to backup this data and store it indefinitely. For this, we used a combination of S3 Object lifecycle policy and Replication configuration. Configuring a lifecycle policy rule on the objects allows us to transition the objects to a cheaper storage class, or to delete them permanently after a desired time frame after the object creation date.
To achieve that, we added a replication configuration to copy all incoming objects to a separate bucket with a cheaper “Infrequent Access” storage class where the objects get stored indefinitely. We then added a lifecycle policy to keep only the latest data in the standard storage class to be used for querying, and delete any older data permanently from the main bucket.
Updates to the Data
Given that we didn’t need to update the data once it was stored, we didn’t need a data store with the ability to perform updates. In a worst case scenario, we still have the ability to trace a particular event down to the S3 object which contains it, and replace the object with the corrected event.
AWS Athena for Querying
Another benefit of using AWS S3 is that it allows us to use AWS Athena. Athena lets you run SQL queries on structured objects stored in S3. It does not require a schema for storing objects in S3. You only need a schema at query time, which can be created using AWS Glue. The objects in S3 can be in JSON, Apache parquet or any columnar format. It is serverless, so there is no infrastructure to manage, and you pay only for the queries that you run. It is built on Presto - a distributed SQL query engine for big data.
Athena performed really well for our use case. Our workflow involved querying a whole month’s data (~3TB) with complex aggregations and the queries only took a few minutes to scan over 2 billion records. We further optimized our costs and performance by partitioning the data into multiple levels, so that queries could be scoped to specific partitions. For more on this topic we suggest you read Top 10 Performance Tuning Tips for Amazon Athena.
AWS SDK for Generating Reports
Athena is also capable of converting the results returned by a query to CSV, and uploading the file to a configurable S3 bucket. Which meant that we no longer had to write code to execute any database queries or convert SQL rows to CSV. For automating report generation, we simply wrapped our Athena queries in a cron job using AWS SDK for Java. The cron job ran at the start of each month and triggered our Athena queries for the previous month. Athena then took care of executing the query, converting the results to CSV and uploading them to an S3 bucket of our choice.
By modeling the entire process as a single pipeline with 4 simple Collect-Enrich-Store-Query steps, not only did we greatly reduce the number of services & infrastructure, but we also made integrating new reporting solutions much easier. From spinning up a new pipeline for each reporting solution, we boiled it down to just configuring a single Athena query per each reporting solution.
Besides the technologies we used, the most interesting aspect of building a solution for the same problem a second time was the learning that came with it. From trying to write code for solving every problem to getting more value out of less code, we matured as engineers. We learnt to break away from the “sunk cost fallacy”. Knowing when a system needs to be rebuilt from scratch, instead of continuing to build on top of a broken system, can help you avoid suboptimal outcomes and also a lot of frustration for the team building and maintaining the solution.
Have you had similar use cases? Which technologies did you use? We would love to hear about them in the comments.
Image Credits: Kaiwalya Limaye (instagram.com/__peshwa)