We have been invited to chat about the content in this blog post on the Trino Community Broadcast. Enjoy the live stream on February 17, 2022.
As one of the Data Platform teams at Goldman Sachs (GS), we are responsible for making accurate and timely data available to our analytics and application teams. At GS, we work with various types of data, such as transaction-related data, valuations, and product reference data from external vendors. These datasets can reside in multiple heterogeneous data sources like HDFS, S3, Oracle, Snowflake, Sybase, Elasticsearch, and MongoDB. Each of these options presents datasets in different ways, each of which must be individually dealt with. The challenge we encountered was how to consistently make these varied datasets from different sources centrally available to our data science team for analytics purposes.
A few of the goals we wanted to achieve include:
Trino, an open source distributed SQL query engine, gives users the ability to query various datasets through a single uniform platform. Users can connect to multiple data sources and perform federated joins with its connector-based architecture, eliminating ETL development and maintenance costs.
Our first step was to integrate Trino within the Goldman Sachs on-premise ecosystem. This meant:
We were able to extend Trino plugins to support all of the above integrations. For example, we implemented the EventListenerFactory interface to gather query-level statistics. Also, as a part of this process, we made sure that we contributed all of the relevant patches back to the Trino open source community - for example: SingleStore connector, MongoDB connectors, and additional features for Elasticsearch.
A typical Trino cluster consists of a coordinator and multiple worker nodes. We can scale the clusters by adding more worker nodes, but the coordinator is still the single point of failure. We then revisited our requirements to see what the best path forward was.
We wanted to achieve the following:
We quickly realized that for all of the above requirements, we would need a multiple cluster setup. To get around the limitations of having a single coordinator per cluster, we decided to have an Envoy-based proxy layer with a group of clusters behind it.
The diagram shows how we have integrated the clusters with our in-house authentication, authorization, data cataloging, and monitoring systems. All of these systems form the operational backbone of our Trino offering. We support both cloud and on-premise data sources. We have also established connectivity with our Hadoop Distributed File System (HDFS) store through the Hive connector.
Users utilize various clients such as CLI, JupyterHub, SQL editors, custom reporting tools, and other JDBC-based apps to connect to Trino. When a user fires a query, the request first lands in the client-side load balancer (DNS-based LB). The LB routes the request to an underlying envoy router. The router parses the request header to determine the user for that request, and based on the user, it determines which cluster group the query should be routed to. Once it lands into a cluster group, the request is assigned to one of its child clusters. In the above example, when a user (User 1) fires a query, it gets routed to cluster group B per the routing rules defined. It can be routed to child clusters B1, B2, or B3.
The section below explores the routing logic in more detail.
Envoy is an open source edge and service proxy, designed for cloud-native applications. Envoy provides features sych as routing, traffic management, load balancing, external authorization, rate limiting, and more. We use Envoy proxy as the Trino gateway. It helps us achieve dynamic routing externally without changing Trino's default behavior.
Above: Control Plane - xDS server
For our purposes, a cluster group is a logical namespace that maps to multiple Trino clusters. For example, cluster group A can map to child clusters A1 and A2. Any query landing on a cluster group is load-balanced between the child clusters. The routing layer has the intelligence to load balance the queries among the child clusters in a “round robin” fashion. We also integrated our cluster groups with cluster health check APIs to remove unhealthy child clusters while balancing the load.
Overall there are three different types of cluster groups:
When a user wants to perform an ad-hoc analysis and is not sure of their requirements, we route their traffic to the default cluster group. Once they have finalized their requirements and want to isolate their workload from all other traffic, we map them to a dedicated cluster group.
Metadata Service is a service that provides the Envoy routers with all the cluster related configurations. It contains mappings for cluster groups, cluster groups to child clusters, users to cluster groups, and beyond. Under the hood, it is a spring boot service backed by a persistent storage. The DevOps or cluster admins use this service to manage the clusters. The Metadata Service exposes APIs for the following operations:
The Envoy Control Plane is an xDs gRPC-based service, and is responsible for providing dynamic configurations to Envoy. When Envoy starts, it queries xDs APIs to get the upstream cluster configurations dynamically. It periodically polls the Metadata Service to detect changes in the cluster configurations. We can add, update, or delete upstream clusters without restarting the Envoy Control Plane.
Envoy provides HTTP filters to parse and modify both request and response headers. We use a custom Lua filter to parse the request and extract the x-trino-user header. Then, we call the Router Service, which returns the upstream cluster address. A health check is also completed while selecting the clusters.
Unlike simple HTTP requests, which are independent, Trino follows the following JDBC protocol:
This means that, during the routing phase, if a request is routed to a cluster, all the consecutive nextUri calls should also get routed to the same cluster. We solve this by keeping a query_id to cluster map in our distributed cache layer. The process flow resembles the following:
From this point onwards, all the nextUri calls are checked against this map for routing.
We use Trino for many applications, from analytics, to data quality, to reporting, and more. Within Goldman Sachs, we have tried to create an ecosystem that can help manage our Trino infrastructure in the most efficient way. With the above architecture, we have achieved that, and will continue to iterate and improve.
Want to learn more about exciting engineering opportunities at Goldman Sachs? Explore our careers page.
See https://www.gs.com/disclaimer/global_email for important risk disclosures, conflicts of interest, and other terms and conditions relating to this blog and your reliance on information contained in it.