Source code for gs_quant.analytics.workspaces.workspace

"""
Copyright 2019 Goldman Sachs.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

  http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied.  See the License for the
specific language governing permissions and limitations
under the License.
"""
import logging
import webbrowser
from collections import deque
from typing import List, Tuple, Union, Dict

from pydash import get

from gs_quant.analytics.workspaces.components import Component, TYPE_TO_COMPONENT, RelatedLink, DataGridComponent, \
    MonitorComponent, PlotComponent, DataScreenerComponent
from gs_quant.entities.entitlements import Entitlements
from gs_quant.errors import MqValueError, MqRequestError
from gs_quant.session import GsSession
from gs_quant.target.common import Entitlements as Entitlements_

_logger = logging.getLogger(__name__)

API = '/workspaces/markets'
HEADERS = {'Content-Type': 'application/json;charset=utf-8'}


class WorkspaceCallToAction:

    def __init__(
            self,
            actions: List[RelatedLink],
            text: str,
            name: str = None):
        """
        Call to action displayed on the top right of the page.
        :param actions: link to external/internal pages, embed a mail to link, anchor links within page or notifications
        :param text: description below the link
        :param name: name of the link/button
        """
        self.actions = actions
        self.text = text
        self.name = name

    def as_dict(self):
        actions = []
        for action in self.actions:
            if isinstance(action, RelatedLink):
                actions.append(action.as_dict())
            else:
                actions.append(action)
        cta_dict = {'actions': actions, 'text': self.text}
        if self.name:
            cta_dict['name'] = self.name
        return cta_dict

    @classmethod
    def from_dict(cls, obj):
        actions = []
        for action in obj['actions']:
            if isinstance(action, Dict):
                actions.append(RelatedLink.from_dict(action))
            else:
                actions.append(action)
        return WorkspaceCallToAction(actions=actions, text=obj['text'], name=obj['name'])


class WorkspaceTab:
    def __init__(self,
                 id_: str,
                 name: str):
        """
        Workspace Tab to connect other workspaces.
        :param id_: alias of the workspace to create a tab
        :param name: Name of the tab
        """
        self.id_ = id_
        self.name = name

    def as_dict(self):
        return {
            'id': self.id_,
            'name': self.name
        }

    @classmethod
    def from_dict(cls, obj):
        return WorkspaceTab(id_=obj['id'], name=obj['name'])


class WorkspaceColumn:
    def __init__(self,
                 components: List[Union[Component, 'WorkspaceRow']],
                 width: int = None):
        """
        :param components: List of components in the same row
        """
        self.__components = []
        self.components = components
        self.__width = width

    @property
    def components(self):
        return self.__components

    @components.setter
    def components(self, value):
        if len(value) > 12:
            raise MqValueError(f'{value} exceeds the max number of columns of 12.')
        width_sum = 0
        for component in self.__components:
            if not isinstance(component, WorkspaceRow):
                width_sum += component.width
        if width_sum > 12:
            raise MqValueError(f'{width_sum} exceeds the max sum of widths of 12.')

        without_width_count = 0
        for component in value:
            if not isinstance(component, WorkspaceRow):
                without_width_count += component.width or 1
        if width_sum + without_width_count > 12:
            raise MqValueError(
                f'Cannot fit all components in column due to given total width of {width_sum} '
                f'and {without_width_count} components without a width.')
        self.__components = value

    @property
    def width(self):
        return self.__width

    @width.setter
    def width(self, value):
        self.__width = value

    def get_layout(self, count):
        layout = ''
        width_sum = 0
        for component in self.__components:
            if not isinstance(component, WorkspaceRow):
                width_sum += component.width or 0
        components_length = len(self.__components)
        if width_sum == 0:
            # Equally spread out
            size = int(12 / components_length)
            last_size = 12 % components_length
            for i, component in enumerate(self.__components):
                if isinstance(component, WorkspaceRow):
                    sub_layout, count = component.get_layout(count)
                    layout += sub_layout
                elif isinstance(component, WorkspaceColumn):
                    sub_layout, count = component.get_layout(count)
                    layout += f'c{size + last_size if i == components_length - 1 and last_size != 0 else size}' \
                              f'({sub_layout})'
                else:
                    # Case: Component
                    layout += f'c{size + last_size if i == components_length - 1 and last_size != 0 else size}' \
                              f'(${count})'
                    count += 1
        else:
            used_sum = 0
            if width_sum == 12:
                default_width = 0
            else:
                default_width = int(12 - width_sum / sum(1 for component in self.components if component.width is None))
            for i, component in enumerate(self.components):
                if i == components_length - 1 and not component.width:
                    layout += f'c{12 - used_sum}(${count})'
                elif component.width is None:
                    layout += f'c{default_width}(${count})'
                    used_sum += default_width
                else:
                    width = component.width or 1
                    layout += f'c{width}(${count})'
                    used_sum += width
                count += 1

        return layout, count

    def _add_components(self, components):
        for component in self.__components:
            if isinstance(component, (WorkspaceRow, WorkspaceColumn)):
                component._add_components(components)
            else:
                components.append(component.as_dict())


class WorkspaceRow:
    """
    Wrapper on a list of components in the same row.
    """

    def __init__(self,
                 components: List[Union[Component, WorkspaceColumn]]):
        """
        :param components: List of components in the same row
        """
        self.__components = []
        self.components = components

    @property
    def components(self):
        return self.__components

    @components.setter
    def components(self, value):
        if len(value) > 12:
            raise MqValueError(f'{value} exceeds the max number of columns of 12.')
        width_sum = 0
        for component in self.__components:
            if not isinstance(component, WorkspaceRow):
                width_sum += component.width
        if width_sum > 12:
            raise MqValueError(f'{width_sum} exceeds the max sum of widths of 12.')

        without_width_count = 0
        for component in value:
            if not isinstance(component, WorkspaceRow):
                without_width_count += component.width or 1
        if width_sum + without_width_count > 12:
            raise MqValueError(
                f'Cannot fit all components in row due to given total width of {width_sum} '
                f'and {without_width_count} components without a width.')
        self.__components = value

    def get_layout(self, count: int) -> Tuple[str, int]:
        layout = 'r('
        width_sum = 0
        for component in self.__components:
            if not isinstance(component, WorkspaceRow):
                width_sum += component.width or 0
        components_length = len(self.__components)
        if width_sum == 0:
            # Equally spread out
            size = int(12 / components_length)
            last_size = 12 % components_length
            for i, component in enumerate(self.__components):
                if isinstance(component, WorkspaceColumn):
                    sub_layout, count = component.get_layout(count)
                    layout += f'c{size + last_size if i == components_length - 1 and last_size != 0 else size}' \
                              f'({sub_layout})'
                else:
                    # Case: Component
                    layout += f'c{size + last_size if i == components_length - 1 and last_size != 0 else size}' \
                              f'(${count})'
                    count += 1
        else:
            used_sum = 0
            if width_sum == 12:
                default_width = 0
            else:
                default_width = self.components[0].width if len(self.components) == 1 else \
                    int(12 - width_sum / sum(1 for component in self.components if component.width is None))
            for i, component in enumerate(self.components):
                if i == components_length - 1 and not component.width:
                    if isinstance(component, WorkspaceColumn):
                        sub_layout, count = component.get_layout(count)
                        layout += f'c{12 - used_sum}({sub_layout})'
                    else:
                        layout += f'c{12 - used_sum}(${count})'
                        count += 1
                elif component.width is None:
                    if isinstance(component, WorkspaceColumn):
                        sub_layout, count = component.get_layout(count)
                        layout += f'c{default_width}({sub_layout})'
                    else:
                        layout += f'c{default_width}(${count})'
                        count += 1
                    used_sum += default_width
                else:
                    width = component.width or 1
                    if isinstance(component, WorkspaceColumn):
                        sub_layout, count = component.get_layout(count)
                        layout += f'c{width}({sub_layout})'
                    else:
                        layout += f'c{width}(${count})'
                        count += 1
                    used_sum += width

        layout += ')'
        return layout, count

    def _add_components(self, components):
        for component in self.__components:
            if isinstance(component, (WorkspaceRow, WorkspaceColumn)):
                component._add_components(components)
            else:
                components.append(component.as_dict())


[docs]class Workspace: PERSISTED_COMPONENTS = { DataGridComponent: '/data/grids', MonitorComponent: '/monitors', PlotComponent: '/charts', DataScreenerComponent: '/data/screens', }
[docs] def __init__(self, name: str, rows: List[WorkspaceRow] = None, alias: str = None, description: str = None, entitlements: Union[Entitlements, Entitlements_] = None, tabs: List[WorkspaceTab] = None, selector_components: List[Component] = None, disclaimer: str = None, maintainers: List[str] = None, call_to_action: Union[WorkspaceCallToAction, Dict] = None, tags: List[str] = None): self.__id = None self.__name = name self.__rows = rows or [] self.__selector_components = selector_components or [] self.__alias = alias self.__entitlements = entitlements self.__description = description self.__disclaimer = disclaimer self.__maintainers = maintainers or [] self.__tabs = tabs or [] self.__call_to_action = call_to_action self.__tags = tags or []
[docs] @classmethod def get_by_id(cls, workspace_id: str) -> 'Workspace': resp = GsSession.current._get(f'{API}/{workspace_id}') return Workspace.from_dict(resp)
[docs] @classmethod def get_by_alias(cls, alias: str) -> 'Workspace': resp = get(GsSession.current._get(f'{API}?alias={alias}'), 'results.0') if not resp: raise MqValueError(f'Workspace not found with alias {alias}') return Workspace.from_dict(resp)
[docs] def save(self): if self.__id: GsSession.current._put(f'{API}/{self.__id}', self.as_dict(), request_headers=HEADERS) elif self.__alias: id_ = get(GsSession.current._get(f'{API}?alias={self.__alias}'), 'results.0.id') if id_: self.__id = GsSession.current._put(f'{API}/{id_}', self.as_dict(), request_headers=HEADERS)['id'] else: self.__id = GsSession.current._post(API, self.as_dict(), request_headers=HEADERS)['id']
[docs] def open(self): if self.__id is None: raise MqValueError('Workspace must be created or saved before opening.') domain = GsSession.current.domain.replace(".web", "") if domain == 'https://api.gs.com': domain = 'https://marquee.gs.com' url = f'{domain}/s/markets/{self.__alias or self.__id}' webbrowser.open(url)
[docs] def create(self): resp = GsSession.current._post(f'{API}', self.as_dict(), request_headers=HEADERS) self.__id = resp['id']
[docs] def delete(self): if self.__id is None: raise MqValueError('Workspace must have an id to be deleted.') resp = GsSession.current._delete(f'{API}/{self.__id}') self.__id = resp['id']
[docs] def delete_all(self, include_tabs: bool = False): """ Deletes the workspace and all persisted components. :param include_tabs: whether to delete all tabs and their persisted components also :return: None """ for row in self.__rows: self.__delete_components(row.components) self.__delete_components(self.__selector_components) if include_tabs: for tab in self.__tabs: tab_workspace = self.get_by_alias(tab.id_) tab_workspace.delete_all()
@property def name(self): return self.__name @name.setter def name(self, value): self.__name = value @property def alias(self): return self.__alias @alias.setter def alias(self, value): self.__alias = value @property def rows(self) -> List[WorkspaceRow]: return self.__rows @rows.setter def rows(self, value: List[WorkspaceRow]): self.__rows = value @property def entitlements(self): return self.__entitlements @entitlements.setter def entitlements(self, value): self.__entitlements = value @property def description(self): return self.__description @description.setter def description(self, value): self.__description = value @property def disclaimer(self): return self.__disclaimer @disclaimer.setter def disclaimer(self, value): self.__disclaimer = value @property def maintainers(self): return self.__maintainers @maintainers.setter def maintainers(self, value): self.__maintainers = value @property def tabs(self): return self.__tabs @tabs.setter def tabs(self, value): self.__tabs = value @property def selector_components(self): return self.__selector_components @selector_components.setter def selector_components(self, value): self.__selector_components = value @property def call_to_action(self): return self.__call_to_action @call_to_action.setter def call_to_action(self, value): self.__call_to_action = value @property def tags(self): return self.__tags @tags.setter def tags(self, value): self.__tags = value @property def id(self): return self.__id @classmethod def _parse(cls, layout: str, workspace_components: List[Dict]): current_str = '' outside_components = [] stack = deque() for c in layout: current_str += c if c == '(': stack.append('(') elif c == ')': stack.pop() if len(stack) == 0: if current_str.startswith('c'): is_component = current_str[current_str.index('(') + 1:].startswith('$') if is_component: # Component Case scale, id_ = current_str.split('($') scale = int(scale[1:]) id_ = int(id_[0:-1]) component = workspace_components[id_] component_type = component['type'] component = TYPE_TO_COMPONENT[component_type].from_dict(component, scale) outside_components.append(component) else: # Column Case column_layout = current_str[current_str.index('(') + 1:-1] components = Workspace._parse(column_layout, workspace_components) width = int(current_str[1:current_str.index('(')]) outside_components.append(WorkspaceColumn(components, width)) elif current_str.startswith('r'): # Row Case row_layout = current_str[current_str.index('(') + 1:-1] components = Workspace._parse(row_layout, workspace_components) outside_components.append(WorkspaceRow(components)) current_str = '' return outside_components
[docs] @classmethod def from_dict(cls, obj): workspace_components = obj['parameters']['components'] layout = obj['parameters']['layout'] stack = deque() row_layout = '' row_layouts = [] for c in layout[1:]: if c == '(': stack.append('(') elif c == ')': stack.pop() if len(stack) == 0: row_layouts.append(row_layout[row_layout.index('c'):]) row_layout = '' row_layout += c workspace_rows = [WorkspaceRow(components=Workspace._parse(row_layout, workspace_components)) for row_layout in row_layouts] component_count = 0 # The rest of the components not in the layout should be selector components selector_components = [] if component_count < len(workspace_components): for i in range(component_count, len(workspace_components)): component = workspace_components[i] selector_components.append(TYPE_TO_COMPONENT[component['type']].from_dict(component)) params = obj['parameters'] tabs = [WorkspaceTab.from_dict(tab) for tab in params.get('tabs', [])] return Workspace(name=obj['name'], rows=workspace_rows, selector_components=selector_components, alias=obj.get('alias'), tabs=tabs, entitlements=Entitlements.from_dict(obj.get('entitlements', {})), description=obj.get('description'), disclaimer=params.get('disclaimer'), maintainers=params.get('maintainers'))
[docs] def as_dict(self): components, count, layout = [], 0, '' for row in self.__rows: row_layout, count = row.get_layout(count) layout += row_layout for component in row.components: if isinstance(component, (WorkspaceRow, WorkspaceColumn)): component._add_components(components) else: components.append(component.as_dict()) # Add the hidden components at the end components.extend([component.as_dict() for component in self.__selector_components]) parameters = { 'layout': layout, 'components': components } if len(self.__maintainers): parameters['maintainers'] = self.__maintainers if self.__call_to_action: if isinstance(self.__call_to_action, WorkspaceCallToAction): parameters['callToAction'] = self.__call_to_action.as_dict() else: parameters['callToAction'] = self.__call_to_action if len(self.__tabs): parameters['tabs'] = [tab.as_dict() for tab in self.__tabs] if self.__disclaimer: parameters['disclaimer'] = self.__disclaimer dict_ = { 'name': self.__name, 'parameters': parameters } if self.__alias: dict_['alias'] = self.__alias if self.__entitlements: if isinstance(self.__entitlements, Entitlements_): dict_['entitlements'] = self.__entitlements.as_dict() elif isinstance(self.__entitlements, Entitlements): dict_['entitlements'] = self.__entitlements.to_dict() else: dict_['entitlements'] = self.__entitlements if len(self.__tags): dict_['tags'] = self.__tags if self.__description: dict_['description'] = self.__description return dict_
@classmethod def __delete_components(cls, components: List[Component]): for component in components: if isinstance(component, (WorkspaceRow, WorkspaceColumn)): cls.__delete_components(component.components) else: type_ = type(component) if type_ in cls.PERSISTED_COMPONENTS: try: GsSession.current._delete(f'{cls.PERSISTED_COMPONENTS[type_]}/{component.id_}') except MqRequestError as ex: _logger.warning( f'Failed to delete {type_.__name__} with id {component.id_} due to {ex.message}')
def __get_layout(components, count): layout = 'r(' width_sum = 0 for component in components: if not isinstance(component, WorkspaceRow): width_sum += component.width or 0 components_length = len(components) if width_sum == 0: # Equally spread out size = int(12 / components_length) last_size = 12 % components_length for i, component in enumerate(components): if isinstance(component, WorkspaceRow): sub_layout, count = __get_layout(component.components, count) layout += f'c{size + last_size if i == components_length - 1 and last_size != 0 else size}' \ f'({sub_layout})' elif isinstance(component, WorkspaceColumn): sub_layout, count = __get_layout(component.components, count) layout += f'c{size + last_size if i == components_length - 1 and last_size != 0 else size}' \ f'({sub_layout})' else: # Case: Component layout += f'c{size + last_size if i == components_length - 1 and last_size != 0 else size}(${count})' count += 1 else: used_sum = 0 if width_sum == 12: default_width = 0 else: default_width = int(12 - width_sum / sum(1 for component in components if component.width is None)) for i, component in enumerate(components): if i == components_length - 1 and not component.width: layout += f'c{12 - used_sum}(${count})' elif component.width is None: layout += f'c{default_width}(${count})' used_sum += default_width else: width = component.width or 1 layout += f'c{width}(${count})' used_sum += width count += 1 layout += ')' return layout, count