Source code for gs_quant.timeseries.helper

"""
Copyright 2019 Goldman Sachs.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

  http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied.  See the License for the
specific language governing permissions and limitations
under the License.
"""
import datetime as dt
import inspect
import logging
import os
import re
from enum import Enum, IntEnum
from functools import wraps, partial
from typing import Optional, Union, List, Iterable, Callable

import numpy as np
import pandas as pd

from gs_quant.api.gs.data import QueryType
from gs_quant.api.utils import ThreadPoolManager
from gs_quant.data import DataContext, Dataset
from gs_quant.datetime.relative_date import RelativeDate
from gs_quant.entities.entity import EntityType
from gs_quant.errors import MqValueError, MqRequestError
from gs_quant.timeseries.measure_registry import register_measure

ENABLE_DISPLAY_NAME = 'GSQ_ENABLE_MEASURE_DISPLAY_NAME'
USE_DISPLAY_NAME = os.environ.get(ENABLE_DISPLAY_NAME) == "1"
_logger = logging.getLogger(__name__)

try:
    from quant_extensions.timeseries.rolling import rolling_apply
except ImportError as e:
    _logger.debug('unable to import rolling_apply extension: %s', e)

    def rolling_apply(s: pd.Series, offset: pd.DateOffset, function: Callable[[np.ndarray], float]) -> pd.Series:
        if isinstance(s.index, pd.DatetimeIndex):
            values = [function(s.loc[(s.index > (idx - offset)) & (s.index <= idx)]) for idx in s.index]
        else:
            values = [function(s.loc[(s.index > (idx - offset).date()) & (s.index <= idx)]) for idx in s.index]
        return pd.Series(values, index=s.index, dtype=np.double)


def _create_enum(name, members):
    return Enum(name, {n.upper(): n.lower() for n in members}, module=__name__)


def _create_int_enum(name, mappings):
    return IntEnum(name, {k.upper(): v for k, v in mappings.items()})


def _to_offset(tenor: str) -> pd.DateOffset:
    import re
    matcher = re.fullmatch('(\\d+)([hdwmy])', tenor)
    if not matcher:
        raise MqValueError('invalid tenor ' + tenor)

    ab = matcher.group(2)
    if ab == 'h':
        name = 'hours'
    elif ab == 'd':
        name = 'days'
    elif ab == 'w':
        name = 'weeks'
    elif ab == 'm':
        name = 'months'
    else:
        assert ab == 'y'
        name = 'years'

    kwarg = {name: int(matcher.group(1))}
    return pd.DateOffset(**kwarg)


def _tenor_to_month(relative_date: str) -> int:
    matcher = re.fullmatch('([1-9]\\d*)([my])', relative_date)
    if matcher:
        mag = int(matcher.group(1))
        return mag if matcher.group(2) == 'm' else mag * 12
    raise MqValueError('invalid input: relative date must be in months or years')


Interpolate = _create_enum('Interpolate', ['intersect', 'step', 'nan', 'zero', 'time'])
Returns = _create_enum('Returns', ['simple', 'logarithmic', 'absolute'])
SeriesType = _create_enum('SeriesType', ['prices', 'returns'])
CurveType = _create_enum('CurveType', ['prices', 'excess_returns'])


[docs]class Window: """ Create a Window with size and ramp up to use. :param w: window size :param r: ramp up value. Defaults to the window size. :return: new window object **Usage** The window size and ramp up value can either the number of observations or a string representation of the time period. **Examples** Window size is :math:`22` observations and the ramp up value is :math:`10`: >>> Window(22, 10) Window size is one month and the ramp up size is one week: >>> Window('1m', '1w') """ def __init__(self, w: Union[int, str, None] = None, r: Union[int, str, None] = None): self.w = w self.r = w if r is None else r def as_dict(self): return { 'w': self.w, 'r': self.r } @classmethod def from_dict(cls, obj): return Window(w=obj.get('w'), r=obj.get('r'))
def _check_window(series_length: int, window: Window): if series_length > 0 and isinstance(window.w, int) and isinstance(window.r, int): if window.w <= 0: raise MqValueError('Window value must be greater than zero.') if window.r > series_length or window.r < 0: raise MqValueError('Ramp value must be less than the length of the series and greater than zero.') def apply_ramp(x: pd.Series, window: Window) -> pd.Series: _check_window(len(x), window) if isinstance(window.w, int) and window.w > len(x): # does not restrict window size when it is a DataOffset return pd.Series(dtype=float) if isinstance(window.r, pd.DateOffset): if np.issubdtype(x.index, dt.date): return x.loc[(x.index[0] + window.r).date():] else: return x.loc[(x.index[0] + window.r).to_pydatetime():] else: return x[window.r:] def normalize_window(x: Union[pd.Series, pd.DataFrame], window: Union[Window, int, str, None], default_window: int = None) -> Window: if default_window is None: default_window = len(x) if isinstance(window, int): window = Window(window, window) elif isinstance(window, str): window = Window(_to_offset(window), _to_offset(window)) else: if window is None: window = Window(default_window, 0) else: if isinstance(window.w, str): window = Window(_to_offset(window.w), window.r) if isinstance(window.r, str): window = Window(window.w, _to_offset(window.r)) if window.w is None: window = Window(default_window, window.r) _check_window(default_window, window) return window def plot_function(fn): # Indicates that fn should be exported to plottool as a pure function. fn.plot_function = True return fn def plot_session_function(fn): fn.plot_function = True fn.requires_session = True return fn def check_forward_looking(pricing_date, source, name="function"): if pricing_date is not None or source != 'plottool': return if DataContext.current.end_date <= dt.date.today(): msg = (f'{name}() requires a forward looking date range e.g. [0d, 3y]. ' 'Please update the date range via the date picker.') raise MqValueError(msg) def plot_measure(asset_class: tuple, asset_type: Optional[tuple] = None, dependencies: Optional[List[QueryType]] = tuple(), asset_type_excluded: Optional[tuple] = None, display_name: Optional[str] = None): # Indicates that fn should be exported to plottool as a member function / pseudo-measure. # Set category to None for no restrictions, else provide a tuple of allowed values. def decorator(fn): assert isinstance(asset_class, tuple) assert len(asset_class) >= 1 assert asset_type is None or isinstance(asset_type, tuple) assert asset_type_excluded is None or isinstance(asset_type_excluded, tuple) assert asset_type is None or asset_type_excluded is None fn.plot_measure = True fn.entity_type = EntityType.ASSET fn.asset_class = asset_class fn.asset_type = asset_type fn.asset_type_excluded = asset_type_excluded fn.dependencies = dependencies if USE_DISPLAY_NAME: fn.display_name = display_name multi_measure = register_measure(fn) multi_measure.entity_type = EntityType.ASSET return multi_measure else: return fn return decorator def plot_measure_entity(entity_type: EntityType, dependencies: Optional[Iterable[QueryType]] = tuple()): def decorator(fn): assert isinstance(entity_type, EntityType) if dependencies is not None: assert isinstance(dependencies, Iterable) assert all(isinstance(x, QueryType) for x in dependencies) fn.plot_measure_entity = True fn.entity_type = entity_type fn.dependencies = tuple(dependencies) # immutable return fn return decorator def requires_session(fn): fn.requires_session = True return fn def plot_method(fn): # Indicates that fn should be exported to plottool as a method. fn.plot_method = True # Allows fn to accept and ignore real_time argument even if it is not defined in the signature @wraps(fn) def ignore_extra_argument(*args, **kwargs): for arg in ('real_time', 'interval', 'time_filter'): if arg not in inspect.signature(fn).parameters: kwargs.pop(arg, None) return fn(*args, **kwargs) return ignore_extra_argument def log_return(logger: logging.Logger, message): def outer(fn): @wraps(fn) def inner(*args, **kwargs): response = fn(*args, **kwargs) logger.debug('%s: %s', message, response) return response return inner return outer def get_df_with_retries(fetcher, start_date, end_date, exchange, retries=1): """ Loads results from Data Service by calling fetcher function. Shifts query date range back by business days until result is not empty or retry limit reached. This is a fallback feature in case a data upload is late. Measure implementations should be written such that retries are usually not required. :param fetcher: a no-argument function runs a data query and returns a DataFrame :param start_date: initial start date for query :param end_date: initial end date for query :param exchange: exchange to use for holiday calendar :param retries: maximum number of retries :return: DataFrame """ retries = max(retries, 0) while retries > -1: with DataContext(start_date, end_date): result = fetcher() if not result.empty: break kwargs = {'exchanges': [exchange]} if exchange else {} # no need to include any part of the previous date range since it's known to be empty end_date = RelativeDate('-1b', base_date=start_date).apply_rule(**kwargs) start_date = end_date retries -= 1 return result def get_dataset_data_with_retries(dataset: Dataset, *, start: dt.date, end: dt.date, count: int = 0, max_retries: int = 5, **kwargs) -> pd.DataFrame: try: data = dataset.get_data(start=start, end=end, **kwargs) except MqRequestError as e: if 'Number of rows returned by your query is more than maximum allowed' in e.message and count < max_retries: mid = start + (end - start) / 2 count += 1 first_half = partial(get_dataset_data_with_retries, dataset, start=start, end=mid, count=count, **kwargs) mid = mid + dt.timedelta(days=1) second_half = partial(get_dataset_data_with_retries, dataset, start=mid, end=end, count=count, **kwargs) results = ThreadPoolManager.run_async([first_half, second_half]) first_half_results, second_half_results = results[0], results[1] data = pd.concat([first_half_results, second_half_results]).sort_index() else: raise e return data def _month_to_tenor(months: int) -> str: return f'{months // 12}y' if months % 12 == 0 else f'{months}m' def _split_where_conditions(where): la = [dict()] for k, v in where.items(): lb = [] while len(la) > 0: temp = la.pop() if isinstance(v, list): for cv in v: clone = temp.copy() clone[k] = [cv] lb.append(clone) else: lb.append(dict(**temp, **{k: v})) la = lb return la def _pandas_roll(s: pd.Series, window_str: str, method_name: str): return getattr(s.rolling(window_str), method_name)() def rolling_offset(s: pd.Series, offset: pd.DateOffset, function: Callable[[np.ndarray], float], method_name: str = None) -> pd.Series: """ Perform rolling window calculations. If offset has a fixed frequency and method name is provided, will use `Series.rolling< https://pandas.pydata.org/docs/reference/api/pandas.Series.rolling.html>`_ for best performance. :param s: time series :param offset: window size as a date offset :param function: function to call on each window :param method_name: name of method to call on each window (must be a method on Rolling object) :return: result time series """ # frequencies that can be passed to Series.rolling fixed = { 'hour': 'H', 'hours': 'H', 'day': 'D', 'days': 'D' } if method_name and len(offset.kwds) == 1: freq, count = offset.kwds.popitem() if freq in fixed: window_str = f'{count}{fixed[freq]}' if np.issubdtype(s.index, np.datetime64): return _pandas_roll(s, window_str, method_name) else: t = s.copy(deep=False) t.index = pd.to_datetime(t.index) # needed for Series.rolling return pd.Series(_pandas_roll(t, window_str, method_name), index=s.index) return rolling_apply(s, offset, function)