"""
Copyright 2019 Goldman Sachs.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
"""
from typing import Optional, Union
import pandas as pd
from pandas import Series
from gs_quant.analytics.core.processor import BaseProcessor, DataCoordinateOrProcessor, DateOrDatetimeOrRDate
from gs_quant.analytics.core.processor_result import ProcessorResult
from gs_quant.timeseries import returns
from gs_quant.timeseries.statistics import percentiles, percentile, Window, mean, sum_, std, var, cov, zscores
[docs]class PercentilesProcessor(BaseProcessor):
[docs] def __init__(self,
a: DataCoordinateOrProcessor,
*,
b: Optional[DataCoordinateOrProcessor] = None,
start: Optional[DateOrDatetimeOrRDate] = None,
end: Optional[DateOrDatetimeOrRDate] = None,
w: Union[Window, int] = Window(None, 0),
**kwargs):
""" PercentilesProcessor
:param a: DataCoordinate or BaseProcessor for the first series
:param b: DataCoordinate or BaseProcessor for the second series
:param start: start date or time used in the underlying data query
:param end: end date or time used in the underlying data query
:param w: Window or int: size of window and ramp up to use. e.g. Window(22, 10) where 22 is the window size
and 10 the ramp up value. If w is a string, it should be a relative date like '1m', '1d', etc.
Window size defaults to length of series.
"""
super().__init__(**kwargs)
self.children['a'] = a
self.children['b'] = b
self.start = start
self.end = end
self.w = w
[docs] def process(self):
a_data = self.children_data.get('a')
if isinstance(a_data, ProcessorResult):
if a_data.success:
b_data = self.children_data.get('b')
# Need to check if the child node b was set in the first place.
if self.children.get('b') and isinstance(b_data, ProcessorResult):
if b_data.success:
result = percentiles(a_data.data, b_data.data, w=self.w)
self.value = ProcessorResult(True, result)
else:
self.value = ProcessorResult(True, 'PercentilesProcessor: b is not a valid series.')
result = percentiles(a_data.data, w=self.w)
self.value = ProcessorResult(True, result)
else:
self.value = ProcessorResult(False, "PercentilesProcessor does not have 'a' series values yet")
else:
self.value = ProcessorResult(False, "PercentilesProcessor does not have 'a' series yet")
return self.value
[docs] def get_plot_expression(self):
pass
class PercentileProcessor(BaseProcessor):
def __init__(self,
a: DataCoordinateOrProcessor,
*,
n: float,
start: Optional[DateOrDatetimeOrRDate] = None,
end: Optional[DateOrDatetimeOrRDate] = None,
w: Union[Window, int] = None,
**kwargs):
""" PercentileProcessor
:param a: DataCoordinate or BaseProcessor for the series
:param n: Percentile
:param start: start date or time used in the underlying data query
:param end: end date or time used in the underlying data query
:param w: Window or int: size of window and ramp up to use. e.g. Window(22, 10) where 22 is the window size
and 10 the ramp up value. If w is a string, it should be a relative date like '1m', '1d', etc.
Window size defaults to length of series.
"""
super().__init__(**kwargs)
self.children['a'] = a
self.n = n
self.start = start
self.end = end
self.w = w
def process(self):
a_data = self.children_data.get('a')
if isinstance(a_data, ProcessorResult):
if a_data.success:
series_length = len(a_data.data)
window = None
if self.w:
window = self.w if self.w <= series_length else series_length
result = percentile(a_data.data, self.n, w=window)
if not isinstance(result, pd.Series):
result = pd.Series(result)
self.value = ProcessorResult(True, result)
else:
self.value = ProcessorResult(False, "PercentileProcessor does not have 'a' series values yet")
else:
self.value = ProcessorResult(False, "PercentileProcessor does not have 'a' series yet")
return self.value
def get_plot_expression(self):
pass
class MeanProcessor(BaseProcessor):
def __init__(self,
a: DataCoordinateOrProcessor,
*,
start: Optional[DateOrDatetimeOrRDate] = None,
end: Optional[DateOrDatetimeOrRDate] = None,
w: Union[Window, int] = None,
**kwargs):
""" MeanProcessor
:param a: DataCoordinate or BaseProcessor for the series
:param start: start date or time used in the underlying data query
:param end: end date or time used in the underlying data query
:param w: Window or int: size of window and ramp up to use. e.g. Window(22, 10) where 22 is the window size
and 10 the ramp up value. If w is a string, it should be a relative date like '1m', '1d', etc.
Window size defaults to length of series.
**Usage**
Calculates `arithmetic mean <https://en.wikipedia.org/wiki/Arithmetic_mean>`_ of the series over a rolling
window
If window is not provided, computes rolling mean over the full series. If the window size is greater than the
available data, will return mean of available values.
"""
super().__init__(**kwargs)
self.children['a'] = a
self.start = start
self.end = end
self.w = w
def process(self):
a_data = self.children_data.get('a')
if isinstance(a_data, ProcessorResult):
if a_data.success:
series_length = len(a_data.data)
window = None
if self.w:
window = self.w if self.w <= series_length else series_length
result = mean(a_data.data, w=window)
self.value = ProcessorResult(True, result)
else:
self.value = ProcessorResult(False, "MeanProcessor does not have 'a' series values yet")
else:
self.value = ProcessorResult(False, "MeanProcessor does not have 'a' series yet")
return self.value
def get_plot_expression(self):
pass
class SumProcessor(BaseProcessor):
def __init__(self,
a: DataCoordinateOrProcessor,
*,
start: Optional[DateOrDatetimeOrRDate] = None,
end: Optional[DateOrDatetimeOrRDate] = None,
w: Union[Window, int] = None,
**kwargs):
""" SumProcessor
:param a: DataCoordinate or BaseProcessor for the series
:param start: start date or time used in the underlying data query
:param end: end date or time used in the underlying data query
:param w: Window or int: size of window and ramp up to use. e.g. Window(22, 10) where 22 is the window size
and 10 the ramp up value. If w is a string, it should be a relative date like '1m', '1d', etc.
Window size defaults to length of series.
**Usage**
Calculate the sum of observations over a given rolling window.
If window is not provided, computes sum over the full series. If the window size is greater than the available
data, will return sum of available values.
"""
super().__init__(**kwargs)
self.children['a'] = a
self.start = start
self.end = end
self.w = w
def process(self):
a_data = self.children_data.get('a')
if isinstance(a_data, ProcessorResult):
if a_data.success:
series_length = len(a_data.data)
window = None
if self.w:
window = self.w if self.w <= series_length else series_length
result = sum_(a_data.data, w=window)
self.value = ProcessorResult(True, result)
else:
self.value = ProcessorResult(False, "SumProcessor does not have 'a' series values yet")
else:
self.value = ProcessorResult(False, "SumProcessor does not have 'a' series yet")
return self.value
def get_plot_expression(self):
pass
class StdDevProcessor(BaseProcessor):
def __init__(self,
a: DataCoordinateOrProcessor,
*,
start: Optional[DateOrDatetimeOrRDate] = None,
end: Optional[DateOrDatetimeOrRDate] = None,
w: Union[Window, int] = Window(None, 0),
**kwargs):
""" StdDevProcessor
:param a: DataCoordinate or BaseProcessor for the first series
:param start: start date or time used in the underlying data query
:param end: end date or time used in the underlying data query
:param w: Window or int: size of window and ramp up to use. e.g. Window(22, 10) where 22 is the window size
and 10 the ramp up value. If w is a string, it should be a relative date like '1m', '1d', etc.
Window size defaults to length of series.
**Usage**
Provides `unbiased estimator <https://en.wikipedia.org/wiki/Unbiased_estimation_of_standard_deviation>`_ of
sample standard deviation <https://en.wikipedia.org/wiki/Standard_deviation>`_ over a rolling window
If window is not provided, computes standard deviation over the full series
"""
super().__init__(**kwargs)
self.children['a'] = a
self.start = start
self.end = end
self.w = w
def process(self):
a_data = self.children_data.get('a')
if isinstance(a_data, ProcessorResult):
if a_data.success:
series_length = len(a_data.data)
window = None
if self.w:
window = self.w if self.w <= series_length else series_length
result = std(a_data.data, w=window)
self.value = ProcessorResult(True, result)
else:
self.value = ProcessorResult(False, "StdDevProcessor does not have 'a' series values yet")
else:
self.value = ProcessorResult(False, "StdDevProcessor does not have 'a' series yet")
return self.value
def get_plot_expression(self):
pass
class VarianceProcessor(BaseProcessor):
def __init__(self,
a: DataCoordinateOrProcessor,
*,
start: Optional[DateOrDatetimeOrRDate] = None,
end: Optional[DateOrDatetimeOrRDate] = None,
w: Union[Window, int] = Window(None, 0),
**kwargs):
""" VarianceProcessor
:param a: DataCoordinate or BaseProcessor for the first series
:param start: start date or time used in the underlying data query
:param end: end date or time used in the underlying data query
:param w: Window or int: size of window and ramp up to use. e.g. Window(22, 10) where 22 is the window size
and 10 the ramp up value. If w is a string, it should be a relative date like '1m', '1d', etc.
Window size defaults to length of series.
**Usage**
Provides `unbiased estimator <https://en.wikipedia.org/wiki/Unbiased_estimation_of_standard_deviation>`_ of
sample variance <https://en.wikipedia.org/wiki/Variance>`_ over a rolling window
If window is not provided, computes variance over the full series
"""
super().__init__(**kwargs)
self.children['a'] = a
self.start = start
self.end = end
self.w = w
def process(self):
a_data = self.children_data.get('a')
if isinstance(a_data, ProcessorResult):
if a_data.success:
series_length = len(a_data.data)
window = None
if self.w:
window = self.w if self.w <= series_length else series_length
result = var(a_data.data, w=window)
self.value = ProcessorResult(True, result)
else:
self.value = ProcessorResult(False, "VarianceProcessor does not have 'a' series values yet")
else:
self.value = ProcessorResult(False, "VarianceProcessor does not have 'a' series yet")
return self.value
def get_plot_expression(self):
pass
class CovarianceProcessor(BaseProcessor):
def __init__(self,
a: DataCoordinateOrProcessor,
b: DataCoordinateOrProcessor,
*,
start: Optional[DateOrDatetimeOrRDate] = None,
end: Optional[DateOrDatetimeOrRDate] = None,
w: Union[Window, int] = Window(None, 0),
**kwargs):
""" CovarianceProcessor
:param a: DataCoordinate or BaseProcessor for the first series
:param b: DataCoordinate or BaseProcessor for the second series
:param start: start date or time used in the underlying data query
:param end: end date or time used in the underlying data query
:param w: Window or int: size of window and ramp up to use. e.g. Window(22, 10) where 22 is the window size
and 10 the ramp up value. If w is a string, it should be a relative date like '1m', '1d', etc.
Window size defaults to length of series.
**Usage**
Provides `unbiased estimator <https://en.wikipedia.org/wiki/Unbiased_estimation_of_standard_deviation>`_ of
sample co-variance <https://en.wikipedia.org/wiki/Covariance>`_ over a rolling window:
If window is not provided, computes variance over the full series
"""
super().__init__(**kwargs)
self.children['a'] = a
self.children['b'] = b
self.start = start
self.end = end
self.w = w
def process(self):
a_data = self.children_data.get('a')
if isinstance(a_data, ProcessorResult):
if a_data.success:
b_data = self.children_data.get('b')
# Need to check if the child node b was set in the first place.
if self.children.get('b') and isinstance(b_data, ProcessorResult):
if b_data.success:
result = cov(a_data.data, b_data.data, w=self.w)
self.value = ProcessorResult(True, result)
else:
self.value = ProcessorResult(True, "CovarianceProcessor does not 'b' series values yet.")
else:
self.value = ProcessorResult(True, 'CovarianceProcessor: b is not a valid series.')
else:
self.value = ProcessorResult(False, "CovarianceProcessor does not have 'a' series values yet")
else:
self.value = ProcessorResult(False, "CovarianceProcessor does not have 'a' series yet")
return self.value
def get_plot_expression(self):
pass
class ZscoresProcessor(BaseProcessor):
def __init__(self,
a: DataCoordinateOrProcessor,
*,
start: Optional[DateOrDatetimeOrRDate] = None,
end: Optional[DateOrDatetimeOrRDate] = None,
w: Union[Window, int] = None,
**kwargs):
""" ZscoresProcessor
:param a: DataCoordinate or BaseProcessor for the series
:param start: start date or time used in the underlying data query
:param end: end date or time used in the underlying data query
:param w: Window or int: size of window and ramp up to use. e.g. Window(22, 10) where 22 is the window size
and 10 the ramp up value. If w is a string, it should be a relative date like '1m', '1d', etc.
Window size defaults to length of series.
**Usage**
Calculate `standard score <https://en.wikipedia.org/wiki/Standard_score>`_ of each value in series over given
window. Standard deviation and sample mean are computed over the specified rolling window, then element is
normalized to provide a rolling z-score
If window is not provided, computes z-score relative to mean and standard deviation over the full series
"""
super().__init__(**kwargs)
self.children['a'] = a
self.start = start
self.end = end
self.w = w
def process(self):
a_data = self.children_data.get('a')
if isinstance(a_data, ProcessorResult):
if a_data.success:
result = zscores(a_data.data, w=Window(None, 0) if self.w is None else self.w)
self.value = ProcessorResult(True, result)
else:
self.value = ProcessorResult(False, "ZscoresProcessor does not have 'a' series values yet")
else:
self.value = ProcessorResult(False, "ZscoresProcessor does not have 'a' series yet")
return self.value
def get_plot_expression(self):
pass
class StdMoveProcessor(BaseProcessor):
def __init__(self,
a: DataCoordinateOrProcessor,
*,
start: Optional[DateOrDatetimeOrRDate] = None,
end: Optional[DateOrDatetimeOrRDate] = None,
w: Union[Window, int] = None,
**kwargs):
""" StdMoveProcessor: Returns normalized by std deviation of series a
:param a: DataCoordinate or BaseProcessor for the first series
:param start: start date or time used in the underlying data query
:param end: end date or time used in the underlying data query
:param w: Window or int: size of window and ramp up to use. e.g. Window(22, 10) where 22 is the window size
and 10 the ramp up value. If w is a string, it should be a relative date like '1m', '1d', etc.
Window size defaults to length of series.
"""
super().__init__(**kwargs)
self.children['a'] = a
self.start = start
self.end = end
self.w = w
def process(self):
a_data = self.children_data.get('a')
if isinstance(a_data, ProcessorResult):
if a_data.success:
data_series = a_data.data
change_pd = data_series.tail(2)
change = returns(change_pd).iloc[-1]
# Pass in all values except last value (which is last value)
returns_series = returns(data_series.head(-1))
std_result = std(returns_series, w=Window(None, 0) if self.w is None else self.w).iloc[-1]
if change is not None and std_result != 0:
self.value = ProcessorResult(True, pd.Series([change / std_result]))
else:
self.value = ProcessorResult(False, "StdMoveProcessor returns a NaN")
else:
self.value = ProcessorResult(False, "StdMoveProcessor does not have 'a' series values yet")
else:
self.value = ProcessorResult(False, "StdMoveProcessor does not have 'a' series yet")
return self.value
def get_plot_expression(self):
pass
class CompoundGrowthRate(BaseProcessor):
def __init__(self,
a: DataCoordinateOrProcessor,
*,
start: Optional[DateOrDatetimeOrRDate] = None,
end: Optional[DateOrDatetimeOrRDate] = None,
n: Optional[float] = None,
**kwargs):
""" CompoundGrowthRate: indicates the growth rate over given time period n
:param a: DataCoordinate or BaseProcessor of series
:param start: start date or time used in the underlying data query
:param end: end date or time used in the underlying data query
:param n: Number of time
"""
super().__init__(**kwargs)
self.children['a'] = a
self.start = start
self.end = end
self.n = n
def process(self):
a_data = self.children_data.get('a')
if isinstance(a_data, ProcessorResult):
if a_data.success:
data_series = a_data.data
self.value = ProcessorResult(True,
Series([(data_series.iloc[-1] / data_series.iloc[0]) ** (1 / self.n) - 1]))
else:
self.value = ProcessorResult(False, "CompoundGrowthRate does not have 'a' series values yet")
else:
self.value = ProcessorResult(False, "CompoundGrowthRate does not have 'a' series yet")
return self.value
def get_plot_expression(self):
pass