February 09, 2022

Enabling Highly Available Trino Clusters at Goldman Sachs

Ramesh Bhanan, Vice President; Siddhant Chadha, Associate; Sumit Halder, Vice President; and Suman Baliganahalli Narayan Murthy, Vice President - Business Platform Engineering

We have been invited to chat about the content in this blog post on the Trino Community Broadcast. Enjoy the live stream on February 17, 2022.


The Challenge

As one of the Data Platform teams at Goldman Sachs (GS), we are responsible for making accurate and timely data available to our analytics and application teams. At GS, we work with various types of data, such as transaction-related data, valuations, and product reference data from external vendors. These datasets can reside in multiple heterogeneous data sources like HDFS, S3, Oracle, Snowflake, Sybase, Elasticsearch, and MongoDB. Each of these options presents datasets in different ways, each of which must be individually dealt with. The challenge we encountered was how to consistently make these varied datasets from different sources centrally available to our data science team for analytics purposes.

A few of the goals we wanted to achieve include:

  • Reduce last mile ETL (Extract, Transform, Load) - ETL pipelines require significant effort to create and stabilize. Maintenance is costly - ex: resolving various pipeline failures and doing reconciliations.
  • Access data in a unified way - a common language (SQL) to access different types of data sources.
  • Federated Joins - a way to join datasets residing in varied data sources.
  •  

The Solution

Trino, an open source distributed SQL query engine, gives users the ability to query various datasets through a single uniform platform. Users can connect to multiple data sources and perform federated joins with its connector-based architecture, eliminating ETL development and maintenance costs.

Integrating Trino into the Goldman Sachs Internal Ecosystem 

Our first step was to integrate Trino within the Goldman Sachs on-premise ecosystem. This meant:

  • Integration with internal authentication and authorization systems.
  • Integration with in-house tracking, monitoring, and auditing systems.
  • Integration with in-house credential stores.
  • Integration with data discovery and cataloguing services
  • Support for new data source connectors such as SingleStore, Sybase, and more.
  • Updating various existing connectors such as Elasticsearch, Mongo DB, etc. 

We were able to extend Trino plugins to support all of the above integrations. For example, we implemented the EventListenerFactory interface to gather query-level statistics. Also, as a part of this process, we made sure that we contributed all of the relevant patches back to the Trino open source community - for example: SingleStore connector, MongoDB connectors, and additional features for Elasticsearch. 

Achieving Scaling and High Availability

A typical Trino cluster consists of a coordinator and multiple worker nodes. We can scale the clusters by adding more worker nodes, but the coordinator is still the single point of failure. We then revisited our requirements to see what the best path forward was.

We wanted to achieve the following:

  • Scaling
  • High Availability
  • Multi-tenancy
  • Resource isolation
  • Ability to perform blue-green deployments

We quickly realized that for all of the above requirements, we would need a multiple cluster setup. To get around the limitations of having a single coordinator per cluster, we decided to have an Envoy-based proxy layer with a group of clusters behind it.

Trino Ecosystem at Goldman Sachs

The diagram shows how we have integrated the clusters with our in-house authentication, authorization, data cataloging, and monitoring systems. All of these systems form the operational backbone of our Trino offering. We support both cloud and on-premise data sources. We have also established connectivity with our Hadoop Distributed File System (HDFS) store through the Hive connector.

Users utilize various clients such as CLI, JupyterHub, SQL editors, custom reporting tools, and other JDBC-based apps to connect to Trino. When a user fires a query, the request first lands in the client-side load balancer (DNS-based LB). The LB routes the request to an underlying envoy router. The router parses the request header to determine the user for that request, and based on the user, it determines which cluster group the query should be routed to. Once it lands into a cluster group, the request is assigned to one of its child clusters. In the above example, when a user (User 1) fires a query, it gets routed to cluster group B per the routing rules defined. It can be routed to child clusters B1, B2, or B3.

The section below explores the routing logic in more detail.

Dynamic Query Routing

Main Components

Envoy Proxy

Envoy is an open source edge and service proxy, designed for cloud-native applications. Envoy provides features sych as routing, traffic management, load balancing, external authorization, rate limiting, and more. We use Envoy proxy as the Trino gateway. It helps us achieve dynamic routing externally without changing Trino's default behavior.

Above: Control Plane - xDS server

  • LDS - Listener Discovery Service - using this API Envoy can dynamically add/delete/update the entire listener, including the L4/L7 filters
  • CDS - Cluster Discovery Service - using this API Envoy can dynamically add/update/delete upstream clusters
  • RDS - Route Discovery Service - using this API Envoy can add/update HTTP route tables
  • EDS - Endpoint Discovery Service - using this API Envoy can add/update cluster members

 

Cluster Groups

For our purposes, a cluster group is a logical namespace that maps to multiple Trino clusters. For example, cluster group A can map to child clusters A1 and A2. Any query landing on a cluster group is load-balanced between the child clusters. The routing layer has the intelligence to load balance the queries among the child clusters in a “round robin” fashion. We also integrated our cluster groups with cluster health check APIs to remove unhealthy child clusters while balancing the load.

Overall there are three different types of cluster groups:

  1. Default cluster group: This is our primary cluster group. When a user is not explicitly assigned to a cluster group, their queries are routed to the default group. There can only be one default cluster group at a time.
  2. Named cluster group: There can be multiple named cluster groups. When a user is assigned to one of the named cluster groups, their queries are routed to their respective group. This helps us segment the incoming workload.
  3. Fallback cluster group: Traffic will be routed to the fallback cluster group if the default or the assigned cluster group is down or unhealthy. This helps provide resiliency in case of a sudden cluster outage.

When a user wants to perform an ad-hoc analysis and is not sure of their requirements, we route their traffic to the default cluster group. Once they have finalized their requirements and want to isolate their workload from all other traffic, we map them to a dedicated cluster group.

Cluster Metadata Service

Metadata Service is a service that provides the Envoy routers with all the cluster related configurations. It contains mappings for cluster groups, cluster groups to child clusters, users to cluster groups, and beyond. Under the hood, it is a spring boot service backed by a persistent storage. The DevOps or cluster admins use this service to manage the clusters. The Metadata Service exposes APIs for the following operations:

  1. Cluster Group: Add/Update/Delete
  2. Cluster: Add/Update/Delete 
  3. Cluster: Map/Un-map cluster to cluster groups
  4. Cluster: Activate/Deactivate
  5. User to Cluster Mapping: Add/Remove 

Router Service

Envoy Control Plane

The Envoy Control Plane is an xDs gRPC-based service, and is responsible for providing dynamic configurations to Envoy. When Envoy starts, it queries xDs APIs to get the upstream cluster configurations dynamically. It periodically polls the Metadata Service to detect changes in the cluster configurations. We can add, update, or delete upstream clusters without restarting the Envoy Control Plane.

Upstream Cluster Selection

Envoy provides HTTP filters to parse and modify both request and response headers. We use a custom Lua filter to parse the request and extract the x-trino-user header. Then, we call the Router Service, which returns the upstream cluster address. A health check is also completed while selecting the clusters.

Challenges with Node Affinity

Unlike simple HTTP requests, which are independent, Trino follows the following JDBC protocol:

  • A POST to /v1/statement runs the query string in the POST body, and returns a JSON document containing the query results. If there are more results, the JSON document contains a nextUri URL attribute.
  • A GET to the nextUri attribute returns the next batch of query results.
  • A DELETE to nextUri terminates a running query.

This means that, during the routing phase, if a request is routed to a cluster, all the consecutive nextUri calls should also get routed to the same cluster. We solve this by keeping a query_id to cluster map in our distributed cache layer. The process flow resembles the following:

  1. The Trino client initiates a POST to /v1/statement, which lands on the Envoy gateway.
  2. on_request()
  3. Envoy parses the header, extracts the x-trino-user and calls the router service to get the upstream cluster.
  4. It then sets the cluster_header with the upstream cluster name. Envoy routes the request by reading this header.
  5. on_response()
  6. We get the response from the upstream cluster, then parse the response to extract the query_id and cluster address.
  7. We then persist the query_id to map the cluster address to the distributed cache.

From this point onwards, all the nextUri calls are checked against this map for routing.

Summary

We use Trino for many applications, from analytics, to data quality, to reporting, and more. Within Goldman Sachs, we have tried to create an ecosystem that can help manage our Trino infrastructure in the most efficient way. With the above architecture, we have achieved that, and will continue to iterate and improve. 

Want to learn more about exciting engineering opportunities at Goldman Sachs? Explore our careers page.


See https://www.gs.com/disclaimer/global_email for important risk disclosures, conflicts of interest, and other terms and conditions relating to this blog and your reliance on information contained in it.

Certain solutions and Institutional Services described herein are provided via our Marquee platform. The Marquee platform is for institutional and professional clients only. This site is for informational purposes only and does not constitute an offer to provide the Marquee platform services described, nor an offer to sell, or the solicitation of an offer to buy, any security. Some of the services and products described herein may not be available in certain jurisdictions or to certain types of clients. Please contact your Goldman Sachs sales representative with any questions. Any data or market information presented on the site is solely for illustrative purposes. There is no representation that any transaction can or could have been effected on such terms or at such prices. Please see https://www.goldmansachs.com/disclaimer/sec-div-disclaimers-for-electronic-comms.html for additional information.
Transaction Banking services are offered by Goldman Sachs Bank USA (“GS Bank”). GS Bank is a New York State chartered bank, a member of the Federal Reserve System and a Member FDIC.
GS DAP™ is owned and operated by Goldman Sachs. This site is for informational purposes only and does not constitute an offer to provide, or the solicitation of an offer to provide access to or use of GS DAP™. Any subsequent commitment by Goldman Sachs to provide access to and / or use of GS DAP™ would be subject to various conditions, including, amongst others, (i) satisfactory determination and legal review of the structure of any potential product or activity, (ii) receipt of all internal and external approvals (including potentially regulatory approvals); (iii) execution of any relevant documentation in a form satisfactory to Goldman Sachs; and (iv) completion of any relevant system / technology / platform build or adaptation required or desired to support the structure of any potential product or activity.
Mosaic is a service mark of Goldman Sachs & Co. LLC. This service is made available in the United States by Goldman Sachs & Co. LLC and outside of the United States by Goldman Sachs International, or its local affiliates in accordance with applicable law and regulations. Goldman Sachs International and Goldman Sachs & Co. LLC are the distributors of the Goldman Sachs Funds. Depending upon the jurisdiction in which you are located, transactions in non-Goldman Sachs money market funds are affected by either Goldman Sachs & Co. LLC, a member of FINRA, SIPC and NYSE, or Goldman Sachs International. For additional information contact your Goldman Sachs representative. Goldman Sachs & Co. LLC, Goldman Sachs International, Goldman Sachs Liquidity Solutions, Goldman Sachs Asset Management, L.P., and the Goldman Sachs funds available through Goldman Sachs Liquidity Solutions and other affiliated entities, are under the common control of the Goldman Sachs Group, Inc.
© 2024 Goldman Sachs. All rights reserved.