# Enriching 450M Docs Daily With a Boring Stream Processor

For our fairhair.ai platform we enrich over 450 million documents such as news articles and social posts per day, with a dependency tree of more than 20 NLP syntactic and semantic enrichment tasks. We ingest these documents as a continuous stream of data and guarantee delivery of enriched documents within 5 minutes of ingestion.

This technical feat required tight collaboration between two specialised teams: data science and platform engineering. Enabling both teams to efficiently work together around a common workflow execution engine was another problem we needed to solve. Hopefully that description fully piqued your interest because our solution (Benthos) is totally boring.

## Too Many Cooks

Some components of fairhair.ai are owned by individual teams. Data science teams have ownership of the enrichments, written as HTTP services in order to allow easy deployment and granular horizontal scaling. Platform engineering teams have ownership of the production stream pipelines, using Kafka as a message bus.

This division of responsibility provided some autonomy. However, the logic of the workflow execution engine would need to be shared by multiple teams. Enrichments are often selected based on the result of other enrichments (e.g. type of sentiment analysis chosen based on language detection), and some have flavours tailored to certain categories of document (e.g. long form editorial text versus social), making it a large and cumbersome system to manage.

Our data scientists were in the best position to describe the evolving dependency tree between components, and the logic for determining which enrichment flavours are appropriate for each document type. Our platform engineers had the expertise needed for tuning the workflow to maximize performance, observability and resiliency.

We also needed multiple deployments of the engine, each tailored to a specific teams’ requirements. Our platform engineers were ultimately responsible for running the production engine as a stream processing component, from Kafka to Kafka. However, our data science teams would also need to run it regularly for their model improvements, metrics evaluation and integration testing, often on custom training and gold standard datasets and usually from S3 to S3.

We therefore needed a workflow engine that was simple enough to enable contributions from anyone regardless of their programming skills, and powerful enough to satisfy our complex workflows. It also needed to be flexible enough for any team to deploy it however they required, with the performance needed to meet our scaling and resiliency requirements in production.

## The Workflow Engine

Our solution to this problem was to use Benthos, which is a stream processor focused on solving complex tasks by breaking them down into simple stateless operations, expressed in a YAML file. Its goal is to be a solid and boring foundation for stream processing pipelines.

You can read a full guide on Benthos workflows at docs.benthos.dev/workflows. In summary, it has the ability to automatically resolve a Directed Acyclic Graph (DAG) of our workflow stages provided they are expressed as process_map processors.

For each stage we define a map that extracts parts of the source document relevant to the target enrichment, followed by the processing stages that execute the enrichment. Finally, we also define a map that places the enrichment result back into the source document. We may also choose to define conditions that determine whether a document is suited for the stage.

Here’s an example of one of our enrichment targets expressed as a step in our flow:

basics:
premap:
id: id
language: tmp.enrichments.language.code
title: body.title.text
body: body.content.text
processors:
- http:
parallel: true
request:
verb: POST
Content-Type: application/json
backoff_on: [ 429 ]
drop_on: [ 400 ]
retries: 3
postmap:
tmp.enrichments.basics: .


From this definition Benthos is able to determine that any stages that change part of the path tmp.enrichments.language.code are a dependency of basics, and will ensure that they are executed beforehand. Similarly, any stage that premaps a value within the namespace tmp.enrichments.basics will be considered dependent on this stage.

These stages in the workflow are organised by Benthos into tiers at runtime, where stages of a tier are only dependent on tiers that come before them. Each tier is executed only after the tier beforehand is finished and stages of a tier are executed in parallel.

This guarantee allows authors of these workflow stages to ignore the overall flow and focus only on the enrichment at hand. It also allows readers of this stage to understand it outside of the context of other stages.

Finally, it clearly outlines the relevant technical behaviour of the stage. In the above example we can see at a glance that documents of a batch are sent in parallel HTTP requests to the target enrichment, and that we retry it a maximum of three times for a document (unless we receive a status code 400.) Updating this stage is easy thanks to the power of Benthos processors, here’s a diff that instead sends batched documents as a JSON array in a single request:

@@ -5,4 +5,6 @@ basics:
body: body.content.text
processors:
+  - archive:
+      format: json_array
- http:
parallel: true
@@ -15,4 +17,6 @@ basics:
drop_on: [ 400 ]
retries: 3
+  - unarchive:
+      format: json_array
postmap:
tmp.enrichments.basics: baz


After enrichments are aggregated into the tmp namespace we have a separate process that maps them into their final structure using another Meltwater technology called IDML.

### Unit Testing

A small subset of our workflow steps were complicated enough that making changes to them carried a significant risk. Benthos has support for defining unit tests for our processors in this case.

However, although this provided us with a degree of protection it was considered an exception. The main benefit of using Benthos was to keep these steps simple and easy to reason about. Whenever workflow stages reached a level of complexity we weren’t comfortable with we took a step back and tried to simplify the enrichment instead.

## Hosted and Custom Deployments

Keeping all of this logic in configuration files allowed us to use all the same source control and collaboration tools as our regular codebase. These configurations live in a central repository, and using config references we are able to import them into our team-specific deployment configs, let’s take a look at how that works.

Importing our enrichments config into a simple Kafka to Kafka pipeline looks like this:

input:
kafka_balanced:
- exampleserver:9092
topics:
- example_input_stream
consumer_group: benthos_consumer_group
max_batch_count: 20

pipeline:
processors:
# Import our entire enrichment flow.
- $ref: ./enrichments.yaml#/pipeline/processors/0 output: kafka: addresses: - exampleserver:9092 topic: example_output_stream  For our data scientists, who wish to test against datasets stored in S3 as .tar.gz archives of JSON documents, it might look like this: input: s3: region: eu-west-1 bucket: example-bucket pipeline: processors: - decompress: algorithm: gzip - unarchive: format: tar -$ref: ./enrichments.yaml#/pipeline/processors/0

- archive:
format: tar

- compress:
algorithm: gzip

output:
s3:
region: eu-west-1
bucket: another-example-bucket
# Upload with the same key as the source archive.
path: ${!metadata:s3_key}  It’s even possible to deploy our enrichment flow as an HTTP service, which makes it easy for solutions engineers to test against custom documents in a one-off request: http: address: 0.0.0.0:4195 input: http_server: path: "/post" pipeline: processors: -$ref: ./enrichments.yaml#/pipeline/processors/0

output:
# Route the resulting payloads back to the source of the message.
type: sync_response


Benthos supports a wide range of inputs and outputs, including brokers for combining them at both the input and output level. This has proven extremely useful as often we have teams that wish to share data feeds but rely on differing services, in such cases Benthos was easily able to bridge and occasionally duplicate feeds across queue systems.

### Observability

Benthos automatically reports metrics for the components you have configured and sends them to an aggregator of your choice. Here’s an example of what some of our enrichment dashboards look like in Grafana, showing latencies, throughput, status codes, etc:

This gave us a birds-eye view of enrichment performance and allowed us to build alerts on events such as a drop in 200 status rates, a spike in latencies, etc.

### Error Handling

When something goes wrong and enrichments fail Benthos has plenty of error handling mechanisms on offer, which allowed teams to choose how they wish to deal with them. In production we configured a dead-letter queue for failed documents, but during integration tests our data science teams chose to simply log the errors.

## Conclusion

Innovation in data insights is often stunted by failing to establish a strong coupling between data science and engineering, resulting in awkward deployments and a lack of collaboration. However, getting this coupling right requires a common framework that enables everyone to focus on their specialities.

We found that Benthos was highly effective at bridging that gap. It democratised the deployment of our common workflow, allowing teams to hack away on their own test environments without impacting others, thus enabling continuous innovation. It also accommodated all of the technical requirements we had for our production environment. Finally, and most importantly, it was easy for all teams to work with.

This is because it is able to expose the simple processes that made our pipelines unique, whilst solving the more complex stream problems that aren’t specific to us out of the box. As a result Benthos has had a dramatic impact on the productivity of our teams and the quality of our platform.

benthos enrichments stream processing stream processor data science