Source code for gs_quant.instrument.core

"""
Copyright 2019 Goldman Sachs.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

  http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied.  See the License for the
specific language governing permissions and limitations
under the License.
"""
import datetime as dt
import inspect
import logging
import warnings
from copy import deepcopy
from typing import Iterable, Optional, Tuple, Union

from dataclasses_json import global_config

from gs_quant.api.gs.parser import GsParserApi
from gs_quant.api.gs.risk import GsRiskApi
from gs_quant.base import get_enum_value, InstrumentBase, Priceable, Scenario
from gs_quant.common import AssetClass, AssetType, XRef, RiskMeasure
from gs_quant.markets import HistoricalPricingContext, MarketDataCoordinate, PricingContext
from gs_quant.priceable import PriceableImpl
from gs_quant.risk import FloatWithInfo, DataFrameWithInfo, SeriesWithInfo, ResolvedInstrumentValues, \
    DEPRECATED_MEASURES
from gs_quant.risk.results import ErrorValue, MultipleRiskMeasureFuture, PricingFuture, MultipleScenarioFuture
from gs_quant.target.common import MultiScenario

_logger = logging.getLogger(__name__)


class Instrument(PriceableImpl, InstrumentBase):
    PROVIDER = GsRiskApi
    __instrument_mappings = {}

    @classmethod
    def __asset_class_and_type_to_instrument(cls):
        if not cls.__instrument_mappings:
            import gs_quant.target.instrument as instrument_
            instrument_classes = [c for _, c in inspect.getmembers(instrument_, inspect.isclass) if
                                  issubclass(c, Instrument) and c is not Instrument]

            cls.__instrument_mappings[(AssetClass.Cash, AssetType.Currency)] = instrument_.Forward

            for clazz in instrument_classes:
                instrument = clazz.default_instance()
                cls.__instrument_mappings[(instrument.asset_class, instrument.type)] = clazz

        return cls.__instrument_mappings

    @property
    def provider(self):
        return self.PROVIDER

    def resolve(self, in_place: bool = True) -> Optional[Union[PriceableImpl, PricingFuture, dict]]:
        """
        Resolve non-supplied properties of an instrument

        **Examples**

        >>> from gs_quant.instrument import IRSwap
        >>>
        >>> swap = IRSwap('Pay', '10y', 'USD')
        >>> rate = swap.fixedRate

        rate is None

        >>> swap.resolve()
        >>> rate = swap.fixedRate

        rates is now the solved fixed rate
        """

        is_historical = isinstance(PricingContext.current, HistoricalPricingContext)

        def handle_result(result: Optional[Union[ErrorValue, InstrumentBase]]) -> Optional[PriceableImpl]:
            ret = None if in_place else result
            if isinstance(result, ErrorValue):
                _logger.error('Failed to resolve instrument fields: ' + result.error)
                ret = {result.risk_key.date: None} if is_historical else None
            elif result is None:
                _logger.error('Unknown error resolving instrument fields')
                ret = {dt.date.today(): self} if is_historical else self
            elif in_place:
                self.from_instance(result)

            return ret

        if in_place and is_historical:
            raise RuntimeError('Cannot resolve in place under a HistoricalPricingContext')

        if in_place and len([i for i in Scenario.path if isinstance(i, MultiScenario)]):
            raise RuntimeError('Cannot resolve in place under a MultiScenario Context')

        return self.calc(ResolvedInstrumentValues, fn=handle_result)

    def calc(self, risk_measure: Union[RiskMeasure, Iterable[RiskMeasure]], fn=None) \
            -> Union[DataFrameWithInfo, ErrorValue, FloatWithInfo, PriceableImpl, PricingFuture,
                     SeriesWithInfo, Tuple[MarketDataCoordinate, ...]]:
        """
        Calculate the value of the risk_measure

        :param risk_measure: the risk measure to compute, e.g. IRDelta (from gs_quant.risk)
        :param fn: post-processing function (optional)
        :return: a float or dataframe, depending on whether the value is scalar or structured, or a future thereof
        (depending on how PricingContext is being used)

        **Examples**

        >>> from gs_quant.instrument import IRCap
        >>> from gs_quant.risk import IRDelta
        >>>
        >>> cap = IRCap('1y', 'USD')
        >>> delta = cap.calc(IRDelta)

        delta is a dataframe

        >>> from gs_quant.instrument import EqOption
        >>> from gs_quant.risk import EqDelta
        >>>
        >>> option = EqOption('.SPX', '3m', 'ATMF', 'Call', 'European')
        >>> delta = option.calc(EqDelta)

        delta is a float

        >>> from gs_quant.markets import PricingContext
        >>>
        >>> cap_usd = IRCap('1y', 'USD')
        >>> cap_eur = IRCap('1y', 'EUR')

        >>> with PricingContext():
        >>>     usd_delta_f = cap_usd.calc(IRDelta)
        >>>     eur_delta_f = cap_eur.calc(IRDelta)
        >>>
        >>> usd_delta = usd_delta_f.result()
        >>> eur_delta = eur_delta_f.result()

        usd_delta_f and eur_delta_f are futures, usd_delta and eur_delta are dataframes
        """

        def get_inst_futures(curr_measure):
            return MultipleScenarioFuture(self, multi_scenario.scenarios,
                                          (curr_measure.pricing_context.calc(self, curr_measure),)) \
                if multi_scenario else curr_measure.pricing_context.calc(self, curr_measure)

        single_measure = isinstance(risk_measure, RiskMeasure)
        multi_scenario = next((i for i in Scenario.path if isinstance(i, MultiScenario)), None)

        with self._pricing_context:
            future = get_inst_futures(risk_measure) if single_measure else \
                MultipleRiskMeasureFuture(self, {r: get_inst_futures(r) for r in risk_measure})

        # Warn on use of deprecated measures
        def warning_on_one_line(msg, category, _filename, _lineno, _file=None, _line=None):
            return f'{category.__name__}:{msg}'

        for measure in (risk_measure,) if single_measure else risk_measure:
            if measure.name in DEPRECATED_MEASURES.keys():
                message = '{0} risk measure is deprecated. Please use {1} instead and pass in arguments to describe ' \
                          'risk measure specifics.\n'.format(measure.name, DEPRECATED_MEASURES[measure.name])
                warnings.simplefilter('once')
                warnings.formatwarning = warning_on_one_line
                warnings.warn(message, DeprecationWarning)
                warnings.simplefilter('ignore')

        if fn is not None:
            ret = PricingFuture()

            def cb(f):
                try:
                    ret.set_result(fn(f.result()))
                except Exception as e:
                    ret.set_exception(e)

            future.add_done_callback(cb)
            future = ret

        return future if self._return_future else future.result()

    @classmethod
    def from_dict(cls, values: dict):
        if not values:
            return

        instrument = cls if hasattr(cls, 'asset_class') else None
        if instrument is None:
            builder_type = values.get('$type') or values.get('builder', values.get('defn', {})).get('$type')
            values_used = values.get('builder', values.get('defn', values))
            if builder_type:
                from gs_quant_internal.base import decode_quill_value
                return decode_quill_value(values_used)

            asset_class_field = next((f for f in ('asset_class', 'assetClass') if f in values), None)
            if not asset_class_field:
                raise ValueError('assetClass/asset_class not specified')
            if 'type' not in values:
                raise ValueError('type not specified')

            asset_type = values.pop('type')
            asset_class = values.pop(asset_class_field)
            security_types = (None, '', 'Security')
            default_type = Security if asset_type in security_types and asset_class in security_types else None

            instrument = Instrument.__asset_class_and_type_to_instrument().get((
                get_enum_value(AssetClass, asset_class),
                get_enum_value(AssetType, asset_type)), default_type)

            if instrument is None:
                raise ValueError('unable to build instrument')

        return instrument.from_dict(values)

    @classmethod
    def from_quick_entry(cls, text: str, asset_class: Optional[AssetClass] = None):
        if not asset_class:
            try:
                inst = cls.default_instance()
                asset_class = inst.asset_class
            except AttributeError:
                pass

        if not asset_class:
            res = GsParserApi.get_instrument_from_text(text)
            if len(res):  # multiple instruments returned
                instrument = res.pop(0)
            else:
                raise ValueError('Could not resolve instrument')
        else:
            instrument = GsParserApi.get_instrument_from_text_asset_class(text, asset_class.value)
        try:
            return cls.from_dict(instrument)
        except AttributeError:
            raise ValueError('Invalid instrument specification')

    @classmethod
    def from_asset_ids(cls, asset_ids: Tuple[str, ...]) -> Tuple[InstrumentBase, ...]:
        from gs_quant.api.gs.assets import GsAssetApi
        instruments = GsAssetApi.get_instruments_for_asset_ids(asset_ids)

        try:
            inst = cls.default_instance()
            asset_class = inst.asset_class
            asset_type = inst.type

            if not all(i.asset_class == asset_class and i.type == asset_type for i in instruments):
                raise ValueError(f'Instrument(s) not all of type {cls.__name__}')
        except AttributeError:
            pass

        return instruments

    @classmethod
    def from_asset_id(cls, asset_id: str) -> InstrumentBase:
        return cls.from_asset_ids((asset_id,))[0]

    @staticmethod
    def compose(components: Iterable):
        return {c.risk_key.date if isinstance(c, ErrorValue) else c.resolution_key.date: c for c in components}

    def flip(self, in_place: bool = True):
        return self.scale(-1, in_place)

    def scale(self, scaling: float, in_place: bool = True):
        if scaling is None:
            return self
        if not hasattr(self, 'scale_in_place'):
            raise NotImplementedError(f'scale_in_place not implemented on {type(self).__name__}')
        if in_place:
            self.scale_in_place(scaling)
            return
        new_inst = deepcopy(self)
        new_inst.scale(scaling)
        return new_inst


class DummyInstrument(Instrument):
    def __init__(self, dummy_result: Union[str, float] = None):
        super().__init__()
        self.dummy_result = dummy_result

    @property
    def dummy_result(self) -> Union[str, float]:
        return self.__dummy_result

    @dummy_result.setter
    def dummy_result(self, value: Union[str, float]):
        self.__dummy_result = value

    @property
    def type(self) -> AssetType:
        return AssetType.Any


[docs]class Security(XRef, Instrument): """A security, specified by a well-known identifier""" def __init__(self, ticker: str = None, bbid: str = None, ric: str = None, isin: str = None, cusip: str = None, prime_id: str = None, quantity: float = 1): """ Create a security by passing one identifier only and, optionally, a quantity :param ticker: Exchange ticker :param bbid: Bloomberg identifier :param isin: International Security Number :param cusip: CUSIP :param prime_id: Prime (GS internal) identifier :param quantity: Quantity (number of contracts for exchange-traded instruments, notional for bonds) """ if len(tuple(filter(None, (f is not None for f in (ticker, bbid, isin, cusip, prime_id))))) > 1: raise ValueError('Only specify one identifier') XRef.__init__(self, ticker=ticker, bbid=bbid, ric=ric, isin=isin, cusip=cusip, prime_id=prime_id) Instrument.__init__(self) self.quantity_ = quantity
def encode_instrument(instrument: Optional[Instrument]) -> Optional[dict]: if instrument is not None: return instrument.to_dict() def encode_instruments(instruments: Optional[Iterable[Instrument]]) -> Optional[Iterable[Optional[dict]]]: if instruments is not None: return [encode_instrument(i) for i in instruments] global_config.decoders[Instrument] = Instrument.from_dict global_config.decoders[InstrumentBase] = Instrument.from_dict global_config.encoders[Instrument] = encode_instrument global_config.encoders[Priceable] = encode_instrument global_config.encoders[Optional[Priceable]] = encode_instrument global_config.encoders[Optional[Instrument]] = encode_instrument global_config.encoders[Optional[InstrumentBase]] = encode_instrument