Thirsty for more expert insights?

Subscribe to our Tea O'Clock newsletter!

Subscribe

Real-Time Data Processing at Scale: How to ensure Data Integrity

Albert De Watrigant
Published on
18/6/2024
Have you ever wanted to create a cloud serverless architecture capable of streaming your data, only to find that there were data gaps between the input and output of your architecture? If so, we'll try to answer your question in this article, by presenting our use case for an architecture capable of processing 3,000 requests per second.

While a large number of data use cases can be solved with a batch solution, some situations necessarily require a streaming solution. With Google Cloud, you can achieve this with serverless services, allowing you to pay only for the resources you actually use. In this article, we'll take a look at how a Google Cloud architecture works: it retrieves HTTPS requests as input, processes these requests as soon as they arrive, and writes the output of these requests to a data warehouse. We'll focus on the part of our architecture that guarantees the same number of messages as the output.

To make things clearer, here's what our architecture does :

With more details and GCP tools, here's what it looks like :

General operation

In our architecture, incoming requests are harvested by a Cloud Function, which will read the request, check if the format is valid, then transmit the information via a message sent to a Pub/Sub topic. We chose to use a Cloud Function because the code to be executed is very simple, and the service is capable of increasing its number of instances according to the evolution of incoming traffic, without us having to worry about it.

We therefore created a "Push" subscription between the Pub/Sub topic and the Cloud Run. We chose the "Push" configuration to preserve the streaming nature of our architecture. The message is then processed by the Cloud Run, which may take a little longer than the Cloud Function if necessary. The Cloud Run then writes its processing output to a BigQuery table.

Architectural strengths

By separating acquisition from processing, we ensure that the request sent as input by the user is processed as quickly as possible. Processing can then take a little longer to do its job, as latency is less critical. In fact, if the workload ever gets too heavy, the Pub/Sub subscription will keep the messages and send them back until the Cloud Run has processed them.

In this way, we can guarantee that the user will not experience any particular latency, even during traffic peaks. We can also be sure that all "valid" messages will be processed by our architecture, since Pub/Sub guarantees that at least one occurrence of a message will be sent and received by the recipient (the Cloud Run in our case).

Problems encountered

After a few days of testing, we realized that our architecture was indeed processing all incoming messages, but that some messages were present in several copies in the BigQuery table. This means that the same message was being processed several times by our Cloud Run. Why is that? After a number of investigations, we discovered that the majority of duplicates were arriving in the BigQuery table at times of peak traffic. During these surges in requests, the number of Cloud Run instances increases rapidly. It is therefore possible for the Pub/Sub subscription to send the same message to different instances, since this service guarantees AT LEAST one delivery of a message. If you wish to restrict Pub/Sub to send a single message, this is possible, but only with a "Pull" subscription, which means that you are now facing a Batch architecture.

To solve the problem of duplicate messages in our database, we had two choices: either we filtered the messages during processing, removing those that had already been processed, or we cleaned up the database using an SQL query with a certain frequency. We have decided to implement the first solution. This choice is due to the fact that we wanted to keep a real-time approach as much as possible. However, with the execution of an SQL query, we would have been obliged to work in batches. Moreover, a regular SQL query on a large volume of data could have generated significant costs.

Solution

In order to solve this problem, we had to use new GCP services, which you can see on our architecture diagram.

We set up a Redis instance called Memorystore on Google Cloud. This service is used as a cache: as soon as a message is successfully processed, we write the message ID as a key to Memorystore. Then, as soon as new messages arrive, we query the Memorystore instance to see if the message ID is already present in the database. If it is, we don't process the message, as this means it has already been processed by Cloud Run. If the message ID is not present in Memorystore, we process the message and write the ID to the instance, thus saying that the message has just been processed.

When you write a key in Memorystore, you can also assign it an expiry date. In our case, we set this value to 15 minutes, as there was no need for a message ID to be kept longer than that.

We also used a VPC to ensure a secure connection between the Cloud Run and the Memorystore instance. Still aiming to strengthen security, we activated the Memorystore instance's authentication requirement, which means we need a security key to be able to communicate with it. To store this key securely, we placed it in Google Cloud's Secret Manager, which we call directly from the Cloud Run code.

Why did we use Redis rather than another database? First of all, we wanted a "key:value" database, which allows us to retrieve a key very quickly. Then, knowing that we only needed these keys for a certain time interval, we wanted a database that allowed us to enter an expiry date for the keys. That's why we chose Google Cloud's Memorystore service. Warning: using Memorystore was useful in our case because we had a large volume of data, but it's important to specify that its use must be adapted to the use case, because in its minimal configuration, the service costs $70 per month.

Results

After several weeks of testing, our architecture managed to process an average of 1,500 requests per second, with some peaks at 3,000. We were able to observe that our duplicate checking system with Memorystore didn't increase the latency of requests processed by Cloud Run at all. We also found that our system detected between 5,000 and 15,000 duplicate messages per day on average, with some peaks at 300,000 per day, out of a total of around 70 million messages per day. What's more, all messages are now only delivered once to the database.

Possible improvements

With regard to our architecture, several points can be modified or improved. 

If your first collection step requires you to adjust a large number of parameters (number of requests, number of CPUs and memory per instance, etc.) and you want to use Docker as a deployment tool, then replacing our Cloud Function with a Cloud Run might be more interesting for your use case.

If you want your collection point to retrieve requests from external users who are geographically dispersed, consider setting up a Load Balancer between the users and your collection point (Cloud Run or Cloud Function). What's more, with a Load Balancer, you could easily integrate Cloud Armor (Google Cloud's WAF), as well as manage your sub-domain names.

Finally, if your data processing is light or even non-existent, and you don't want to use Docker to simplify your deployment, you can replace our Cloud Run with a Cloud Function.

All articles

Related articles

No items found.

Thirsty for more expert insights? Subscribe to our monthly newsletter.

Discover all the latest news, articles, webinar replays and fifty-five events in our monthly newsletter, Tea O'Clock.

First name*
Last name*
Company*
Preferred language*
Email*
Merci !

Votre demande d'abonnement a bien été prise en compte.
Oops! Something went wrong while submitting the form.