Source code for gs_quant.risk.core

"""
Copyright 2019 Goldman Sachs.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

  http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied.  See the License for the
specific language governing permissions and limitations
under the License.
"""
import datetime as dt
import itertools
from abc import ABCMeta, abstractmethod
from concurrent.futures import Future
from copy import copy
from dataclasses import dataclass, fields
from typing import Iterable, Optional, Tuple, Union, Dict

import pandas as pd
from dataclasses_json import dataclass_json

import gs_quant
from gs_quant.base import RiskKey
from gs_quant.config import DisplayOptions
from gs_quant.datetime import point_sort_order

__column_sort_fns = {
    'label1': point_sort_order,
    'mkt_point': point_sort_order,
    'point': point_sort_order
}
__risk_columns = ('date', 'time', 'mkt_type', 'mkt_asset', 'mkt_class', 'mkt_point')


class ResultInfo(metaclass=ABCMeta):

    def __init__(
            self,
            risk_key: RiskKey,
            unit: Optional[dict] = None,
            error: Optional[Union[str, dict]] = None,
            request_id: Optional[str] = None
    ):
        self.__risk_key = risk_key
        self.__unit = unit
        self.__error = error
        self.__request_id = request_id

    @property
    @abstractmethod
    def raw_value(self):
        ...

    @property
    def risk_key(self) -> RiskKey:
        return self.__risk_key

    @property
    def unit(self) -> dict:
        """The units of this result"""
        return self.__unit

    @property
    def error(self) -> Union[str, dict]:
        """Any error associated with this result"""
        return self.__error

    @property
    def request_id(self) -> Optional[str]:
        """The request Id associated with this result"""
        return self.__request_id

    @staticmethod
    def composition_info(components: Iterable):
        from gs_quant.markets.markets import historical_risk_key

        dates = []
        values = []
        errors = {}
        risk_key = None
        unit = None

        for component in components:
            date = component.risk_key.date
            risk_key = historical_risk_key(component.risk_key) if risk_key is None else risk_key

            if risk_key.market.location != component.risk_key.market.location:
                raise ValueError('Cannot compose results with different markets')

            if isinstance(component, (ErrorValue, Exception)):
                errors[date] = component
            else:
                values.append(component.raw_value)
                dates.append(date)
                unit = unit or component.unit

        return dates, values, errors, risk_key, unit


class ErrorValue(ResultInfo):

    def __init__(self, risk_key: RiskKey, error: Union[str, dict], request_id: Optional[str] = None):
        super().__init__(risk_key, error=error, request_id=request_id)

    def __repr__(self):
        return self.error

    def __getattr__(self, item):
        # only called if self.item doesn't exist
        raise AttributeError(f'ErrorValue object has no attribute {item}.  Error was {self.error}')

    @property
    def raw_value(self):
        return None

    def _to_records(self, extra_dict, display_options: DisplayOptions = None):
        return [{**extra_dict, 'value': self}]


class UnsupportedValue(ResultInfo):

    def __init__(self, risk_key: RiskKey, request_id: Optional[str] = None):
        super().__init__(risk_key, request_id=request_id)

    def __repr__(self):
        return 'Unsupported Value'

    @property
    def raw_value(self):
        return 'Unsupported Value'

    def _to_records(self, extra_dict, display_options: DisplayOptions = None):
        if display_options is not None and not isinstance(display_options, DisplayOptions):
            raise TypeError("display_options must be of type DisplayOptions")

        options = display_options if display_options is not None else gs_quant.config.display_options
        show_na = options.show_na

        return [{**extra_dict, 'value': self}] if show_na else []


class ScalarWithInfo(ResultInfo, metaclass=ABCMeta):

    def __init__(self,
                 risk_key: RiskKey,
                 value: Union[float, str],
                 unit: Optional[dict] = None,
                 error: Optional[Union[str, dict]] = None,
                 request_id: Optional[str] = None):
        float.__init__(value)
        ResultInfo.__init__(self, risk_key, unit=unit, error=error, request_id=request_id)

    @property
    @abstractmethod
    def raw_value(self):
        ...

    @staticmethod
    def compose(components: Iterable):
        dates, values, errors, risk_key, unit = ResultInfo.composition_info(components)
        return SeriesWithInfo(pd.Series(index=pd.DatetimeIndex(dates).date, data=values),
                              risk_key=risk_key,
                              unit=unit,
                              error=errors)

    def _to_records(self, extra_dict, display_options: DisplayOptions = None):
        return [{**extra_dict, 'value': self}]


class FloatWithInfo(ScalarWithInfo, float):

    def __new__(cls,
                risk_key: RiskKey,
                value: Union[float, str],
                unit: dict = None,
                error: Optional[str] = None,
                request_id: Optional[str] = None):
        return float.__new__(cls, value)

    @property
    def raw_value(self) -> float:
        return float(self)

    def __repr__(self):

        return self.error if self.error else float.__repr__(self)

    def __add__(self, other):
        if isinstance(other, FloatWithInfo):
            if self.unit == other.unit:
                return FloatWithInfo(combine_risk_key(self.risk_key, other.risk_key), self.raw_value + other.raw_value,
                                     self.unit)
            else:
                raise ValueError('FloatWithInfo unit mismatch')
        return super(FloatWithInfo, self).__add__(other)

    def __mul__(self, other):
        if isinstance(other, FloatWithInfo):
            return FloatWithInfo(combine_risk_key(self.risk_key, other.risk_key), self.raw_value * other.raw_value,
                                 self.unit)
        else:
            return FloatWithInfo(self.risk_key, self.raw_value * other, self.unit)

    def to_frame(self):
        return self


class StringWithInfo(ScalarWithInfo, str):

    def __new__(cls,
                risk_key: RiskKey,
                value: Union[float, str],
                unit: Optional[dict] = None,
                error: Optional[str] = None,
                request_id: Optional[str] = None):
        return str.__new__(cls, value)

    @property
    def raw_value(self) -> str:
        return str(self)

    def __repr__(self):
        return self.error if self.error else str.__repr__(self)


class SeriesWithInfo(pd.Series, ResultInfo):
    _internal_names = pd.DataFrame._internal_names + \
                      ['_ResultInfo__' + i for i in dir(ResultInfo) if isinstance(getattr(ResultInfo, i), property)]
    _internal_names_set = set(_internal_names)

    def __init__(
            self,
            *args,
            risk_key: Optional[RiskKey] = None,
            unit: Optional[dict] = None,
            error: Optional[Union[str, dict]] = None,
            request_id: Optional[str] = None,
            **kwargs
    ):
        pd.Series.__init__(self, *args, **kwargs)
        ResultInfo.__init__(self, risk_key, unit=unit, error=error, request_id=request_id)

    def __repr__(self):
        if self.error:
            return pd.Series.__repr__(self) + "\nErrors: " + str(self.error)
        return pd.Series.__repr__(self)

    @property
    def _constructor(self):
        return SeriesWithInfo

    @property
    def _constructor_expanddim(self):
        return DataFrameWithInfo

    @property
    def raw_value(self) -> pd.Series:
        return pd.Series(self)

    @staticmethod
    def compose(components: Iterable):
        dates, values, errors, risk_key, unit = ResultInfo.composition_info(components)
        return SeriesWithInfo(pd.Series(index=pd.DatetimeIndex(dates).date, data=values),
                              risk_key=risk_key,
                              unit=unit,
                              error=errors)

    def _to_records(self, extra_dict, display_options: DisplayOptions = None):
        df = pd.DataFrame(self).reset_index()
        df.columns = ['dates', 'value']
        records = df.to_dict('records')
        records = [dict(item, **{**extra_dict}) for item in records]
        return records

    def __mul__(self, other):
        new_result = pd.Series.__mul__(self, other)
        ResultInfo.__init__(new_result, risk_key=self.risk_key, unit=self.unit, error=self.error, request_id=self.request_id)
        return new_result

    def copy_with_resultinfo(self, deep=True):
        return SeriesWithInfo(self.raw_value.copy(deep=deep), risk_key=self.risk_key, unit=self.unit, error=self.error, request_id=self.request_id)


class DataFrameWithInfo(pd.DataFrame, ResultInfo):
    _internal_names = pd.DataFrame._internal_names + \
                      ['_ResultInfo__' + i for i in dir(ResultInfo) if isinstance(getattr(ResultInfo, i), property)]
    _internal_names_set = set(_internal_names)

    def __init__(
            self,
            *args,
            risk_key: Optional[RiskKey] = None,
            unit: Optional[dict] = None,
            error: Optional[Union[str, dict]] = None,
            request_id: Optional[str] = None,
            **kwargs
    ):
        pd.DataFrame.__init__(self, *args, **kwargs)
        ResultInfo.__init__(self, risk_key, unit=unit, error=error, request_id=request_id)

    def __repr__(self):
        if self.error:
            return pd.DataFrame.__repr__(self) + "\nErrors: " + str(self.errors)
        return pd.DataFrame.__repr__(self)

    @property
    def _constructor(self):
        return DataFrameWithInfo

    @property
    def _constructor_sliced(self):
        return SeriesWithInfo

    @property
    def raw_value(self) -> pd.DataFrame:
        if self.empty:
            return pd.DataFrame(self)
        df = self.copy()
        if isinstance(self.index.values[0], dt.date):
            df.index.name = 'dates'
            df.reset_index(inplace=True)
        return pd.DataFrame(df)

    @staticmethod
    def compose(components: Iterable):
        dates, values, errors, risk_key, unit = ResultInfo.composition_info(components)
        df = pd.concat(v.assign(date=d) for d, v in zip(dates, values)).set_index('date')

        return DataFrameWithInfo(df, risk_key=risk_key, unit=unit, error=errors)

    def to_frame(self):
        return self

    def _to_records(self, extra_dict, display_options: DisplayOptions = None):

        if self.empty:
            if display_options is not None and not isinstance(display_options, DisplayOptions):
                raise TypeError("display_options must be of type DisplayOptions")

            options = display_options if display_options is not None else gs_quant.config.display_options
            show_na = options.show_na

            return [{**extra_dict, 'value': None}] if show_na else []

        return [dict(item, **{**extra_dict}) for item in self.raw_value.to_dict('records')]

    def copy_with_resultinfo(self, deep=True):
        return DataFrameWithInfo(self.raw_value.copy(deep=deep), risk_key=self.risk_key, unit=self.unit,
                                 error=self.error, request_id=self.request_id)

    def filter_by_coord(self, coordinate):
        from gs_quant.markets import MarketDataCoordinate
        df = self.copy_with_resultinfo()
        for att in [i.name for i in fields(MarketDataCoordinate)]:
            if getattr(coordinate, att) is not None:
                if isinstance(getattr(coordinate, att), str):
                    df = df[getattr(df, att) == getattr(coordinate, att)]
                else:
                    df = df[getattr(df, att).isin(getattr(coordinate, att))]
        return df


@dataclass_json
@dataclass
class MQVSValidationTarget:
    env: Optional[str] = None
    operator: Optional[str] = None
    mqGroups: Optional[Tuple[str, ...]] = None
    users: Optional[Tuple[str]] = None
    assetClasses: Optional[Tuple[str, ...]] = None
    assets: Optional[Tuple[str, ...]] = None
    legTypes: Optional[Tuple[str, ...]] = None
    legFields: Optional[Dict[str, str]] = None


@dataclass_json
@dataclass
class MQVSValidatorDefn:
    validatorType: str
    targets: Tuple[MQVSValidationTarget, ...]
    args: Dict[str, str]
    groupId: Optional[str] = None
    groupIndex: Optional[int] = None
    groupMethod: Optional[str] = None

class MQVSValidatorDefnsWithInfo(ResultInfo):
    validators: Tuple[MQVSValidatorDefn]

    def __init__(self,
                 risk_key: RiskKey,
                 value: Union[MQVSValidatorDefn, Tuple[MQVSValidatorDefn]],
                 unit: Optional[dict] = None,
                 error: Optional[Union[str, dict]] = None,
                 request_id: Optional[str] = None):
        ResultInfo.__init__(self, risk_key, unit=unit, error=error, request_id=request_id)
        if value and isinstance(value, tuple):
            self.validators = value
        elif value and isinstance(value, MQVSValidatorDefn):
            self.validators = tuple([value])

    @property
    def raw_value(self):
        return self.validators


[docs]def aggregate_risk(results: Iterable[Union[DataFrameWithInfo, Future]], threshold: Optional[float] = None, allow_heterogeneous_types: bool = False) \ -> pd.DataFrame: """ Combine the results of multiple InstrumentBase.calc() calls, into a single result :param results: An iterable of Dataframes and/or Futures (returned by InstrumentBase.calc()) :param threshold: exclude values whose absolute value falls below this threshold :param allow_heterogeneous_types: allow Series to be converted to DataFrames before aggregating :return: A Dataframe with the aggregated results **Examples** >>> from gs_quant.instrument import IRCap, IRFloor >>> from gs_quant.markets import PricingContext >>> from gs_quant.risk import IRDelta, IRVega >>> >>> cap = IRCap('5y', 'GBP') >>> floor = IRFloor('5y', 'GBP') >>> instruments = (cap, floor) >>> >>> with PricingContext(): >>> delta_f = [inst.calc(IRDelta) for inst in instruments] >>> vega_f = [inst.calc(IRVega) for inst in (cap, floor)] >>> >>> delta = aggregate_risk(delta_f, threshold=0.1) >>> vega = aggregate_risk(vega_f) delta_f and vega_f are lists of futures, where the result will be a Dataframe delta and vega are Dataframes, representing the merged risk of the individual instruments """ def get_df(result_obj): if isinstance(result_obj, Future): result_obj = result_obj.result() if isinstance(result_obj, pd.Series) and allow_heterogeneous_types: return pd.DataFrame(result_obj.raw_value).T return result_obj.raw_value dfs = [get_df(r) for r in results] result = pd.concat(dfs).fillna(0) result = result.groupby([c for c in result.columns if c != 'value'], as_index=False).sum() if threshold is not None: result = result[result.value.abs() > threshold] return sort_risk(result)
ResultType = Union[None, dict, tuple, DataFrameWithInfo, FloatWithInfo, SeriesWithInfo] def aggregate_results(results: Iterable[ResultType], allow_mismatch_risk_keys=False, allow_heterogeneous_types=False) -> ResultType: unit = None risk_key = None results = tuple(results) if not len(results): return None for result in results: if isinstance(result, Exception): raise Exception if result.error: raise ValueError('Cannot aggregate results in error') if not allow_heterogeneous_types and not isinstance(result, type(results[0])): raise ValueError(f'Cannot aggregate heterogeneous types: {type(result)} vs {type(results[0])}') if result.unit: if unit and unit != result.unit: raise ValueError('Cannot aggregate results with different units') unit = unit or result.unit if not allow_mismatch_risk_keys and risk_key and risk_key != result.risk_key: raise ValueError('Cannot aggregate results with different pricing keys') risk_key = risk_key or result.risk_key inst = next(iter(results)) if isinstance(inst, dict): return dict((k, aggregate_results([r[k] for r in results])) for k in inst.keys()) elif isinstance(inst, tuple): return tuple(set(itertools.chain.from_iterable(results))) elif isinstance(inst, FloatWithInfo): return FloatWithInfo(risk_key, sum(results), unit=unit) elif isinstance(inst, SeriesWithInfo): return SeriesWithInfo(sum(results), risk_key=risk_key, unit=unit) elif isinstance(inst, DataFrameWithInfo): return DataFrameWithInfo(aggregate_risk(results, allow_heterogeneous_types=allow_heterogeneous_types), risk_key=risk_key, unit=unit)
[docs]def subtract_risk(left: DataFrameWithInfo, right: DataFrameWithInfo) -> pd.DataFrame: """Subtract bucketed risk. Dimensions must be identical :param left: Results to substract from :param right: Results to substract **Examples** >>> from gs_quant.datetime.date import business_day_offset >>> from gs_quant.instrument IRSwap >>> from gs_quant.markets import PricingContext >>> from gs_quant.risk import IRDelta >>> import datetime as dt >>> >>> ir_swap = IRSwap('Pay', '10y', 'USD') >>> delta_today = ir_swap.calc(IRDelta) >>> >>> with PricingContext(pricing_date=business_day_offset(dt.date.today(), -1, roll='preceding')): >>> delta_yday_f = ir_swap.calc(IRDelta) >>> >>> delta_diff = subtract_risk(delta_today, delta_yday_f.result()) """ assert (left.columns.names == right.columns.names) assert ('value' in left.columns.names) right_negated = copy(right) right_negated.value *= -1 return aggregate_risk((left, right_negated))
def sort_values(data: Iterable, columns: Tuple[str, ...], by: Tuple[str, ...]) -> Iterable: indices = tuple(columns.index(c) for c in by if c in columns) fns = [None] * len(columns) for idx in indices: fns[idx] = __column_sort_fns.get(columns[idx]) def cmp(row) -> tuple: return tuple((fns[i](row[i]) or 0) if fns[i] else row[i] for i in indices) return sorted(data, key=cmp)
[docs]def sort_risk(df: pd.DataFrame, by: Tuple[str, ...] = __risk_columns) -> pd.DataFrame: """ Sort bucketed risk :param df: Input Dataframe :param by: Columns to sort by :return: A sorted Dataframe """ columns = tuple(df.columns) data = sort_values((row for _, row in df.iterrows()), columns, by) fields = [f for f in by if f in columns] fields.extend(f for f in columns if f not in fields) result = pd.DataFrame.from_records(data, columns=columns)[fields] if 'date' in result: result = result.set_index('date') return result
def combine_risk_key(key_1: RiskKey, key_2: RiskKey) -> RiskKey: """ Combine two risk keys (key_1, key_2) into a new RiskKey :type key_1: RiskKey :type key_2: RiskKey """ def get_field_value(field_name: str): return getattr(key_1, field_name) if getattr(key_1, field_name) == getattr(key_2, field_name) else None return RiskKey(get_field_value("provider"), get_field_value("date"), get_field_value("market"), get_field_value("params"), get_field_value("scenario"), get_field_value("risk_measure"))