Source code for gs_quant.timeseries.backtesting

"""
Copyright 2020 Goldman Sachs.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

  http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied.  See the License for the
specific language governing permissions and limitations
under the License.
"""

from functools import partial

from dateutil.relativedelta import relativedelta as rdelta
from pydash import chunk

from gs_quant.timeseries.econometrics import volatility, correlation
from gs_quant.timeseries.helper import _create_enum, _tenor_to_month, _month_to_tenor
from gs_quant.timeseries.measures_helper import VolReference, preprocess_implied_vol_strikes_eq
from gs_quant import timeseries as ts
from .statistics import *
from ..api.gs.assets import GsAssetApi
from ..api.gs.data import GsDataApi, MarketDataResponseFrame
from ..data.log import log_debug

_logger = logging.getLogger(__name__)

RebalFreq = _create_enum('RebalFreq', ['Daily', 'Weekly', 'Monthly'])
ReturnType = _create_enum('ReturnType', ['excess_return'])


def backtest_basket(
        series: list,
        weights: list,
        costs: list = None,
        rebal_freq: RebalFreq = RebalFreq.DAILY,
):
    num_assets = len(series)
    costs = costs or [0] * num_assets
    weights = weights or [1 / num_assets] * num_assets

    if not all(isinstance(x, pd.Series) for x in series):
        raise MqTypeError("expected a list of series")

    if len(weights) != num_assets or len(weights) != len(costs):
        raise MqValueError("series, weights, and cost lists must have the same length")

    # For all inputs which are Pandas series, get the intersection of their calendars
    cal = pd.DatetimeIndex(
        reduce(
            np.intersect1d,
            (
                curve.index
                for curve in series + weights + costs
                if isinstance(curve, pd.Series)
            ),
        )
    )

    # Reindex inputs and convert to pandas dataframes
    series = pd.concat([curve.reindex(cal) for curve in series], axis=1)
    weights = pd.concat([pd.Series(w, index=cal) for w in weights], axis=1)
    costs = pd.concat([pd.Series(c, index=cal) for c in costs], axis=1)

    if rebal_freq == RebalFreq.DAILY:
        rebal_dates = cal
    else:
        if rebal_freq == RebalFreq.WEEKLY:
            # Get hypothetical weekly rebalances
            num_rebals = ((cal[-1] - cal[0]).days) // 7
            rebal_dates = [cal[0] + i * rdelta(weeks=1) for i in range(num_rebals + 1)]
        else:
            # Get hypothetical monthly rebalances
            num_rebals = (cal[-1].year - cal[0].year) * 12 + cal[-1].month - cal[0].month
            rebal_dates = [cal[0] + i * rdelta(months=1) for i in range(num_rebals + 1)]

        # Convert the hypothetical weekly/monthly rebalance dates to actual calendar days
        rebal_dates = [min(cal[cal >= date]) for date in rebal_dates if date < max(cal)]

    # Create Units dataframe
    units = pd.DataFrame(index=cal, columns=series.columns)
    actual_weights = pd.DataFrame(index=cal, columns=series.columns)
    output = pd.Series(dtype='float64', index=cal)

    # Initialize backtest
    output.values[0] = 100
    units.values[0, ] = (
        output.values[0] * weights.values[0, ] / series.values[0, ]
    )
    actual_weights.values[0, ] = weights.values[0, ]

    # Run backtest
    prev_rebal = 0
    for i, date in enumerate(cal[1:], 1):
        # Update performance
        output.values[i] = output.values[i - 1] + np.dot(
            units.values[i - 1, ], series.values[i, ] - series.values[i - 1, ]
        )

        actual_weights.values[i, ] = (
            weights.values[prev_rebal, ] *
            (series.values[i, ] / series.values[prev_rebal, ]) *
            (output.values[prev_rebal] / output.values[i])
        )

        # Rebalance on rebal_dates
        if date in rebal_dates:
            # Compute costs
            output.values[i] -= (
                np.dot(costs.values[i, ], np.abs(weights.values[i, ] - actual_weights.values[i, ])) *
                output.values[i]
            )

            # Rebalance
            units.values[i, ] = (
                output.values[i] * weights.values[i, ] / series.values[i, ]
            )
            prev_rebal = i

            actual_weights.values[i, ] = weights.values[i, ]
        else:
            units.values[i, ] = units.values[
                i - 1,
            ]

    return output, actual_weights


[docs]@plot_function def basket_series( series: list, weights: list = None, costs: list = None, rebal_freq: RebalFreq = RebalFreq.DAILY, return_type: ReturnType = ReturnType.EXCESS_RETURN, ) -> pd.Series: """ Calculates a basket return series. :param series: list of time series of instrument prices :param weights: list of weights (defaults to evenly weight series) :param costs: list of execution costs in decimal (defaults to costs of 0) :param rebal_freq: rebalancing frequency - Daily, Weekly or Monthly (defaults to Daily) :param return_type: return type of underlying instruments - only excess return is supported :return: time series of the resulting basket **Usage** Calculates a basket return series. **Examples** Generate price series and combine them in a basket (weights 70%/30%) which rebalances monthly and assumes execution costs 5bps and 10bps each time the constituents are traded. >>> prices1 = generate_series(100) >>> prices2 = generate_series(100) >>> mybasket = basket_series([prices1, prices2], [0.7, 0.3], [0.0005, 0.001], monthly) **See also** :func:`prices` """ return backtest_basket(series, weights, costs, rebal_freq)[0]
[docs]class Basket: """ Construct a basket of stocks :param stocks: list of stock bloomberg ids :param weights: list of weights (defaults to evenly weight stocks) :param rebal_freq: rebalancing frequency - Daily or Monthly (defaults to daily) """ def __init__(self, stocks: list, weights: list = None, rebal_freq: RebalFreq = RebalFreq.DAILY): if weights and len(weights) and len(stocks) != len(weights): # make sure that they passed in an array for weig raise MqValueError("Stocks and weights must have the same length if both specified.") self.bbids = stocks self.rebal_freq = rebal_freq self.weights = weights self._marquee_ids = None self.start = DataContext.current.start_date self.end = DataContext.current.end_date self._spot_data = None self._returns = None self._actual_weights = None def _reset(self): self.start = DataContext.current.start_date self.end = DataContext.current.end_date self._spot_data = None self._returns = None self._actual_weights = None def get_marquee_ids(self): if self._marquee_ids is None: # Assets sorted by increasing rank assets = reversed(GsAssetApi.get_many_assets_data(bbid=self.bbids, fields=('id', 'bbid', 'rank'), limit=2 * len(self.bbids), order_by=['>rank'])) # If duplicate assets exist, asset_dict will contain a entry for the one with the higher rank (relevant) assets_dict = {entry['bbid']: entry['id'] for entry in assets} if len(assets_dict) != len(set(self.bbids)): not_found = set(assets_dict).symmetric_difference(self.bbids) raise MqValueError(f'Unable to find stocks: {", ".join(not_found)}') self._marquee_ids = [assets_dict[bbid] for bbid in self.bbids] return self._marquee_ids def _ensure_spot_data(self, request_id: Optional[str] = None): if self.start != DataContext.current.start_date or self.end != DataContext.current.end_date: self._reset() if self._spot_data is None: q = GsDataApi.build_market_data_query(self.get_marquee_ids(), QueryType.SPOT) log_debug(request_id, _logger, 'q %s', q) spot_data = ts.get_historical_and_last_for_measure(self.get_marquee_ids(), QueryType.SPOT, {}, request_id=request_id) dataset_ids = getattr(spot_data, 'dataset_ids', ()) df = spot_data.set_index([spot_data.index, 'assetId']) df = df[~df.index.duplicated(keep='last')].spot.unstack() df = df[self._marquee_ids] self._spot_data = MarketDataResponseFrame(df) self._spot_data.dataset_ids = dataset_ids def _ensure_backtest(self, request_id: Optional[str] = None): if self.start != DataContext.current.start_date or self.end != DataContext.current.end_date: self._reset() if self._returns is None or self._actual_weights is None: spot_df = self.get_spot_data(request_id=request_id) spot_df.dropna(inplace=True) spot_series = [spot_df[asset_id] for asset_id in spot_df] results = backtest_basket(spot_series, self.weights, rebal_freq=self.rebal_freq) self._returns = results[0] actual_weights = results[1] actual_weights.columns = self.get_marquee_ids() self._actual_weights = actual_weights def get_returns(self, request_id: Optional[str] = None): self._ensure_backtest(request_id=request_id) return self._returns def get_actual_weights(self, request_id: Optional[str] = None): self._ensure_backtest(request_id=request_id) return self._actual_weights def get_spot_data(self, request_id: Optional[str] = None): self._ensure_spot_data(request_id=request_id) return self._spot_data @requires_session @plot_method def price(self, *, real_time: bool = False, request_id: Optional[str] = None) -> pd.Series: """ Weighted average price :param real_time: whether to retrieve intraday data instead of EOD :param request_id: service request id, if any :return: time series of the average price """ if real_time: raise NotImplementedError('real-time basket price not implemented') return self.get_returns(request_id=request_id) @requires_session @plot_method def average_implied_volatility(self, tenor: str, strike_reference: VolReference, relative_strike: Real, *, real_time: bool = False, request_id: Optional[str] = None, source: Optional[str] = None) -> pd.Series: """ Weighted average implied volatility :param tenor: relative date representation of expiration date e.g. 1m :param strike_reference: reference for strike level :param relative_strike: strike relative to reference :param real_time: whether to retrieve intraday data instead of EOD :param request_id: service request id, if any :param source: name of function caller :return: time series of the average implied volatility """ if real_time: raise NotImplementedError('real-time basket implied vol not implemented') ref_string, relative_strike = preprocess_implied_vol_strikes_eq(strike_reference, relative_strike) log_debug(request_id, _logger, 'where tenor=%s, strikeReference=%s, relativeStrike=%s', tenor, ref_string, relative_strike) where = dict(tenor=tenor, strikeReference=ref_string, relativeStrike=relative_strike) tasks = [] for i, chunked_assets in enumerate(chunk(self.get_marquee_ids(), 3)): query = GsDataApi.build_market_data_query( chunked_assets, QueryType.IMPLIED_VOLATILITY, where=where, source=source, real_time=real_time) tasks.append(partial(GsDataApi.get_market_data, query, request_id)) results = ThreadPoolManager.run_async(tasks) vol_data = pd.concat(results) actual_weights = self.get_actual_weights(request_id) # Add in today's data today = datetime.date.today() if not real_time and DataContext.current.end_date >= today and \ (vol_data.empty or today not in vol_data.index.date): vol_data = ts.append_last_for_measure(vol_data, self.get_marquee_ids(), QueryType.IMPLIED_VOLATILITY, where, source=source, request_id=request_id) vol_data.index.rename('date', inplace=True) # Below transformations will throw errors if vol_data is empty if vol_data.empty: return pd.Series(dtype=float) vols = vol_data.pivot_table('impliedVolatility', ['date'], 'assetId') vols.reindex(self.get_marquee_ids(), axis=1) vols.index.name = None # Necessary when current values appended - set weights index to match vols index actual_weights = actual_weights.reindex(vols.index).fillna(method='pad') return actual_weights.mul(vols).sum(axis=1, skipna=False) @requires_session @plot_method def average_realized_volatility(self, tenor: str, returns_type: Returns = Returns.LOGARITHMIC, *, real_time: bool = False, request_id: Optional[str] = None) -> pd.Series: """ Weighted average realized volatility :param tenor: relative date representation of expiration date e.g. 1m :param returns_type: returns type :param real_time: whether to retrieve intraday data instead of EOD :param request_id: service request id, if any :return: time series of the average realized volatility """ if real_time: raise NotImplementedError('real-time basket realized vol not implemented') spot_df = self.get_spot_data(request_id=request_id) actual_weights = self.get_actual_weights(request_id=request_id) vols = [volatility(spot_df[asset_id], Window(tenor, tenor), returns_type) for asset_id in spot_df] vols = pd.concat(vols, axis=1) vols.columns = list(spot_df) # Necessary when current values appended - set weights index to match vols index actual_weights = actual_weights.reindex(vols.index).fillna(method='pad') return actual_weights.mul(vols, axis=1).sum(axis=1, skipna=False) @requires_session @plot_method def average_realized_correlation(self, w: Union[Window, int, str] = Window(None, 0), *, real_time: bool = False, request_id: Optional[str] = None) -> pd.Series: """ Weighted average realized correlation. Computes the correlation between each of the basket's constituents and returns their average, weighted by their representation in the basket. :param w: Window, int, or str: size of window and ramp up to use. e.g. Window(22, 10) where 22 is the window size and 10 the ramp up value. If w is a string, it should be a relative date like '1m', '1d', etc. Window size defaults to length of series. :param real_time: whether to retrieve intraday data instead of EOD :param request_id: service request ID, if any :return: time series of the average realized correlation """ # Calculated using method (a) from here: http://www.nematrian.com/MeasuringAverageStockCorrelation # Could be sped-up by using another estimate method, such as (b) if real_time: raise NotImplementedError('real-time basket realized corr not implemented') spot_df = self.get_spot_data(request_id=request_id) actual_weights = self.get_actual_weights(request_id=request_id) tot = pd.Series(np.zeros_like(spot_df.iloc[:, 0]), index=spot_df.index) tot_wt = pd.Series(np.zeros_like(spot_df.iloc[:, 0]), index=spot_df.index) # Iterate through and compute pairwise correlation between spot price series, # averaging at the end. for i in range(len(spot_df.columns)): for j in range(i + 1, len(spot_df.columns)): corr = correlation(spot_df.iloc[:, i], spot_df.iloc[:, j], w) wt = actual_weights.iloc[:, i] * actual_weights.iloc[:, j] tot += corr * wt tot_wt += wt return pd.to_numeric(tot / tot_wt, errors='coerce') @requires_session @plot_method def average_forward_vol(self, tenor: str, forward_start_date: str, strike_reference: VolReference, relative_strike: Real, *, real_time: bool = False, request_id: Optional[str] = None, source: Optional[str] = None) -> pd.Series: """ Weighted average forward volatility. :param tenor: relative date representation of expiration date e.g. 1m :param forward_start_date: forward start date e.g. 2m, 1y :param strike_reference: reference for strike level :param relative_strike: strike relative to reference :param real_time: whether to retrieve intraday data instead of EOD :param request_id: service request id, if any :param source: name of function caller :return: average forward volatility """ if real_time: raise NotImplementedError('real-time basket forward vol not implemented') ref_string, relative_strike = preprocess_implied_vol_strikes_eq(strike_reference, relative_strike) t1_month = _tenor_to_month(forward_start_date) t2_month = _tenor_to_month(tenor) + t1_month t1 = _month_to_tenor(t1_month) t2 = _month_to_tenor(t2_month) log_debug(request_id, _logger, 'where tenor=%s, strikeReference=%s, relativeStrike=%s', f'{t1},{t2}', ref_string, relative_strike) where = dict(tenor=[t1, t2], strikeReference=[ref_string], relativeStrike=[relative_strike]) asset_ids = self.get_marquee_ids() vol_data = ts.get_historical_and_last_for_measure(asset_ids, QueryType.IMPLIED_VOLATILITY, where, source=source, request_id=request_id) # Below transformations will throw errors if vol_data is empty if vol_data.empty: return pd.Series(dtype=float) grouped_by_asset_ids = vol_data.groupby('assetId') s = {} for asset_id, df in grouped_by_asset_ids: grouped_by_tenor = df.groupby('tenor') try: sg = grouped_by_tenor.get_group(t1)['impliedVolatility'] lg = grouped_by_tenor.get_group(t2)['impliedVolatility'] except KeyError: log_debug(request_id, _logger, 'no data for one or more tenors') series = pd.Series(dtype=float, name='forwardVol') else: series = pd.Series(sqrt((t2_month * lg ** 2 - t1_month * sg ** 2) / _tenor_to_month(tenor)), name='forwardVol') s[asset_id] = series vols = pd.DataFrame(s) actual_weights = self.get_actual_weights(request_id) # Necessary when current values appended - set weights index to match vols index actual_weights = actual_weights.reindex(pd.DatetimeIndex(vols.index)).fillna(method='pad') return actual_weights.mul(vols).sum(axis=1, skipna=False)