Data Lake is the firm's big data platform. It consists of a proprietary data store, services, and infrastructure components that are used to house and process petabytes of data every day. As of today, the Data Lake contains a 650+ node HDFS cluster with 200 TB memory, 30K+ cores, and more than a hundred thousand datasets. Data flowing in and out of the Data Lake is crucial for enabling four key business segments of the firm - Global Markets, Consumer and Wealth Management, Asset Management, and Investment Banking. Data Lake has two types of users - Producers and Consumers. Producers produce data which is ingested into Data Lake - each batch of data is merged into an existing snapshot of data for that data store. Consumers can subscribe to multiple data stores and can get the required data exported to multiple database technologies including cloud (also called, virtual warehouses).
The Data Lake platform scales to run hundreds of thousands of ingestions and more than a million exports per day (and we are constantly growing!). Fast processing times for this scale is a financial need - speed is everything! Each of these ingestions and exports is time-sensitive, and the workloads they process vary tremendously in volume. For instance, the same producer can send anywhere from zero to millions of rows depending on the number of trades executed in that hour. The most difficult question to answer operationally has been: which particular ingestion or export has delayed, and also has the potential to delay downstream consumer deadlines? If we could proactively monitor for these, then we can manually intervene and prevent operational misses. This question led us to start estimating the time taken for each ingestion and export using a machine learning model, and subsequently creating an anomaly detection model based on predictions to find outliers.
We started with the simplest possible model - a static threshold (e.g. 2 hours) based on 90th percentile duration of all jobs. This means that if an ingestion takes longer to complete than the static threshold, it is marked as delayed. This approach was proven to be ineffective because batches with larger volumes of data (asymmetric workloads) always took longer than that static threshold.
We then considered training simple statistical models to adjust the static threshold for each dataset based on previous run times for that particular dataset. However, previous runs for a dataset could all be running with a particular feature set, such as ten rows sent every day in a batch. These models fail when a new batch in that dataset changes the parameters, such as sending millions of rows instead of ten rows. If something like this happens, it means that previous runs of that dataset cannot be relied upon (asymmetric workloads issue).
We then moved on to identifying the feature set (~20+ features) that affects these processing times in close collaboration with the Data Lake Developers. Some of the more important features identified are:
Interestingly, we realized that there are daily windows when the resources/infrastructure components of Data Lake are processing big workloads, leading to higher wait times for the ingest/export jobs.
We then explored supervised machine learning techniques (trained on millions of historical ingestion samples with ~20+ features) to predict the ingestion time, as the Baseline Model that we tried before didn't work very well. We experimented with:
Comparing between the Baseline Model and the Decision Tree Regressor for each store, the Decision Tree Regressor had better MSE for 80% of the stores; performance was similar for the rest.
We found that the advantage of training Decision Tree Regressor models for every store (hundreds of thousands of models) is that they capture the micro trends of varying features in a store really well. However, a challenge for training these models was that the number of training samples varied vastly since every store sends batches with different frequencies. We encountered data sparsity issues for low frequency stores which send only 20-30 batches in a year; predictions made were not relevant since the underlying code and infrastructure changes over time as the Lake grows.
We then tried using one big Decision Tree Regressor trained on millions of ingestions (excluding failures) across all stores. It showed 8% lesser MSE than a Decision Tree Regressor for each store. However, this model produced an overall 0.92 R2 score on a test set (using an 80/20 train-test split) which was pretty accurate. We also found other advantages with this model:
Notice the difference in predictions for the three models as staging row count (number of rows sent in that batch) changes.
We explored other machine learning techniques as well like regression, gradient boosting decision trees, and neural networks which either gave lesser or similar accuracy, but at the cost of interpretability. Easy visualization of the buckets(/splits) based on features are very useful for understanding the features that drive the processing times. We decided to choose big Decision Tree Regressor as the final model for our use-case.
Below is a snippet of the big Decision Tree Regressor on ingestions. As can be seen from the first split in the tree, any batch with total rows less than 8 mil, gets ingested in 0.58 minutes on average and otherwise, gets ingested in 5.47 minutes on average.
We define the happiness of the Data Lake by how well we perform in real-time against expectations set up by the model for every batch. The Data Lake is considered "happy" if we are able to process all incoming batches in the duration predicted by the model. Theoretically, we are creating an Anomaly Detection Model (explained later) based on the Decision Tree Regressor Model predictions/expectations.
We use these model(s) for:
We used two times the standard deviation at the leaf of the Decision Tree Regressor Node to calculate model threshold. This is done to contain the volume of breaches generated - application of Chebyshev’s inequality. We kept a hard_buffer of 5 minutes in the AD model, because we do not want to alert if there is a deviation of norm by a very few minutes.
A Data Lake client is happy if their batch gets finished within the threshold predicted by the AD model. Since the model takes into account all the big data features and asymmetric workloads sent by the client (millions of rows or tens of rows and other feature sets), we believe it is reasonable to set client expectations based on it.
Happiness Metric:
We measure the percentage of clients who are happy/unhappy in real-time. Any dip in the happiness graph below indicates that some clients are affected by Data Lake lags. A major dip indicates a systemic issue in Data Lake which would generate internal alerts for the team. We also have different levels of views like plant (overall) view and warehouse view. Using these views, we can easily drill down into clients and batches affected in real-time with the help of the prediction made for those batches and respective breach minutes. We also have a developer focus view for suggesting long term improvements for Data Lake Developers through a list of stores showing high breach minutes.
*Warehouse View colored in green to red by the severity (SLI) of any particular breach in ingestion for that warehouse.
Overall, these models have proven to be extremely useful. They continue to generate accurate alerts and prevent delays. We run with >99% happiness on most days for both ingestions and exports!
We are happy to see these models improving our operational efficiency and user happiness and continue to be excited about applying machine learning in the Big Data space.
Do you enjoy solving interesting data engineering challenges? Our team is growing and we have several exciting opportunities!
See https://www.gs.com/disclaimer/global_email for important risk disclosures, conflicts of interest, and other terms and conditions relating to this blog and your reliance on information contained in it.