"""
Copyright 2019 Goldman Sachs.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicablNe law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
"""
import datetime as dt
import logging
import re
from dataclasses import dataclass
from itertools import chain
from typing import Iterable, Optional, Tuple, Union
from urllib.parse import quote
import deprecation
import numpy as np
import pandas as pd
from gs_quant.api.gs.assets import GsAssetApi
from gs_quant.api.gs.portfolios import GsPortfolioApi
from gs_quant.base import InstrumentBase
from gs_quant.common import RiskMeasure
from gs_quant.instrument import Instrument, AssetType
from gs_quant.markets import HistoricalPricingContext, OverlayMarket, PricingContext, PositionContext
from gs_quant.priceable import PriceableImpl
from gs_quant.risk import ResolvedInstrumentValues
from gs_quant.risk.results import CompositeResultFuture, PortfolioRiskResult, PortfolioPath, PricingFuture
from gs_quant.target.common import RiskPosition
from gs_quant.target.portfolios import Portfolio as MarqueePortfolio
from gs_quant.target.portfolios import Position, PositionSet, RiskRequest, PricingDateAndMarketDataAsOf
from more_itertools import unique_everseen
_logger = logging.getLogger(__name__)
[docs]@dataclass
class Portfolio(PriceableImpl):
"""A collection of instruments
Portfolio holds a collection of instruments in order to run pricing and risk scenarios
"""
[docs] def __init__(self,
priceables: Optional[Union[PriceableImpl, Iterable[PriceableImpl], dict]] = (),
name: Optional[str] = None):
"""
Creates a portfolio object which can be used to hold instruments
:param priceables: constructed with an instrument, portfolio, iterable of either, or a dictionary where
key is name and value is a priceable
"""
super().__init__()
if isinstance(priceables, dict):
priceables_list = []
for name, priceable in priceables.items():
priceable.name = name
priceables_list.append(priceable)
self.priceables = priceables_list
else:
self.priceables = priceables
self.name = name
self.__id = None
self.__quote_id = None
def _to_records(self):
def get_name(obj, idx):
if isinstance(obj, InstrumentBase) and hasattr(obj, 'type_'):
type_name = obj.type_.name if isinstance(obj.type_, AssetType) else obj.type_
else:
type_name = 'Portfolio'
return f'{type_name}_{idx}' if obj.name is None else obj.name
stack = [(None, self)]
records = []
while stack:
temp_records = []
parent, portfolio = stack.pop()
current_record = {} if len(records) == 0 else records.pop(0)
for idx, priceable in enumerate(portfolio.__priceables):
path = parent + PortfolioPath(idx) if parent is not None else PortfolioPath(idx)
priceable_name = get_name(priceable, idx)
if isinstance(priceable, Portfolio):
stack.insert(0, (path, priceable))
temp_records.append({**current_record, f'portfolio_name_{len(path) - 1}': priceable_name})
else:
temp_records.append({**current_record, 'instrument_name': priceable_name})
records.extend(temp_records)
return records
def __getitem__(self, item):
if isinstance(item, (int, slice)):
return self.__priceables[item]
elif isinstance(item, PortfolioPath):
return item(self, rename_to_parent=True)
else:
values = tuple(self[p] for it in item for p in self.paths(it)) if isinstance(item, list) else tuple(
self[p] for p in self.paths(item))
return values[0] if len(values) == 1 else values
def __contains__(self, item):
if isinstance(item, PriceableImpl):
return any(item in p.__priceables for p in self.all_portfolios + (self,))
elif isinstance(item, str):
return any(item in p.__priceables_by_name for p in self.all_portfolios + (self,))
else:
return False
def __len__(self):
return len(self.__priceables)
def __iter__(self):
return iter(self.__priceables)
def __hash__(self):
hash_code = hash(self.name) ^ hash(self.__id)
for priceable in self.__priceables:
hash_code ^= hash(priceable)
return hash_code
def __eq__(self, other):
if not isinstance(other, Portfolio):
return False
for path in self.all_paths:
try:
if path(self) != path(other):
return False
except (IndexError, TypeError):
# indexerror occurs when two portfolios are of different lengths
# typeerror: instrument is not subscriptable occurs when two portfolios are of different depths
return False
return True
def __add__(self, other):
if not isinstance(other, Portfolio):
raise ValueError('Can only add instances of Portfolio')
return Portfolio(self.__priceables + other.__priceables)
@property
def __position_context(self) -> PositionContext:
return PositionContext.current if PositionContext.current.is_entered else PositionContext.default_value()
@property
def id(self) -> str:
return self.__id
@property
def quote_id(self) -> str:
return self.__quote_id
@property
def priceables(self) -> Tuple[PriceableImpl, ...]:
return self.__priceables
@priceables.setter
def priceables(self, priceables: Union[PriceableImpl, Iterable[PriceableImpl]]):
self.__priceables = (priceables,) if isinstance(priceables, PriceableImpl) else tuple(priceables)
self.__priceables_by_name = {}
for idx, i in enumerate(self.__priceables):
if i and i.name:
self.__priceables_by_name.setdefault(i.name, []).append(idx)
@priceables.deleter
def priceables(self):
self.__priceables = None
self.__priceables_by_name = None
@property
def instruments(self) -> Tuple[Instrument, ...]:
return tuple(unique_everseen(i for i in self.__priceables if isinstance(i, Instrument)))
@property
def all_instruments(self) -> Tuple[Instrument, ...]:
instr = chain(self.instruments, chain.from_iterable(p.all_instruments for p in self.all_portfolios))
return tuple(unique_everseen(instr))
@property
def portfolios(self) -> Tuple[PriceableImpl, ...]:
return tuple(i for i in self.__priceables if isinstance(i, Portfolio))
@property
def all_portfolios(self) -> Tuple[PriceableImpl, ...]:
stack = list(self.portfolios)
portfolios = list(unique_everseen(stack))
while stack:
portfolio = stack.pop()
if portfolio in portfolios:
continue
sub_portfolios = portfolio.portfolios
portfolios.extend(sub_portfolios)
stack.extend(sub_portfolios)
return tuple(unique_everseen(portfolios))
def subset(self, paths: Iterable[PortfolioPath], name=None):
# Do our paths represent a single portfolio?
paths_tuple = tuple(paths)
if len(paths_tuple) == 1 and isinstance(self[paths_tuple[0]], Portfolio):
return self[paths_tuple[0]]
else:
return Portfolio(tuple(self[p] for p in paths_tuple), name=name)
@staticmethod
def __from_internal_positions(id_type: str, positions_id, activity_type: str):
instruments = GsPortfolioApi.get_instruments_by_position_type(id_type, positions_id, activity_type)
return Portfolio(instruments, name=positions_id)
@staticmethod
def from_eti(eti: str):
return Portfolio.__from_internal_positions('ETI', quote(eti, safe=''), 'trade')
@staticmethod
def from_book(book: str, book_type: str = 'risk', activity_type: str = 'position'):
return Portfolio.__from_internal_positions(book_type, book, activity_type)
@staticmethod
def from_asset_id(asset_id: str, date=None):
asset = GsAssetApi.get_asset(asset_id)
response = GsAssetApi.get_asset_positions_for_date(asset_id, date) if date else \
GsAssetApi.get_latest_positions(asset_id)
response = response[0] if isinstance(response, tuple) else response
positions = response.positions if isinstance(response, PositionSet) else response['positions']
instruments = GsAssetApi.get_instruments_for_positions(positions)
ret = Portfolio(instruments, name=asset.name)
ret.__id = asset_id
return ret
@staticmethod
def from_asset_name(name: str):
asset = GsAssetApi.get_asset_by_name(name)
return Portfolio.load_from_portfolio_id(asset.id)
@classmethod
def get(cls,
portfolio_id: str = None,
portfolio_name: str = None):
if portfolio_name:
portfolio = GsPortfolioApi.get_portfolio_by_name(portfolio_name)
portfolio_id = portfolio.id
position_date = PositionContext.current.position_date if PositionContext.is_entered else dt.date.today()
portfolio = GsPortfolioApi.get_portfolio(portfolio_id)
ret = Portfolio(name=portfolio.name)
ret.__id = portfolio_id
ret._get_instruments(position_date, True)
return ret
@classmethod
@deprecation.deprecated(deprecated_in='0.8.293',
details='from_portfolio_id is now deprecated, please use '
'Portfolio.get(portfolio_id=portfolio_id) instead.')
def from_portfolio_id(cls, portfolio_id: str):
return cls.get(portfolio_id=portfolio_id)
@classmethod
@deprecation.deprecated(deprecated_in='0.8.293',
details='from_portfolio_name is now deprecated, please use '
'Portfolio.get(portfolio_name=portfolio_name) instead.')
def from_portfolio_name(cls, name: str):
return cls.get(portfolio_name=name)
@staticmethod
def from_quote(quote_id: str):
instruments = GsPortfolioApi.get_instruments_by_workflow_id(quote_id)
ret = Portfolio(instruments, name=quote_id)
ret.__quote_id = quote_id
return ret
def save(self, overwrite: Optional[bool] = False):
if self.portfolios:
raise ValueError('Cannot save portfolios with nested portfolios')
if self.__id:
if not overwrite:
raise ValueError(f'Portfolio with id {id} already exists. Use overwrite=True to overwrite')
else:
if not self.name:
raise ValueError('name not set')
self.__id = GsPortfolioApi.create_portfolio(MarqueePortfolio('USD', self.name)).id
_logger.info(f'Created Marquee portfolio {self.name} with id {self.__id}')
position_set = PositionSet(
position_date=self.__position_context.position_date,
positions=tuple(Position(asset_id=GsAssetApi.get_or_create_asset_from_instrument(i))
for i in self.instruments))
if len(position_set.positions) > 0:
GsPortfolioApi.update_positions(self.__id, [position_set])
def save_as_quote(self, overwrite: Optional[bool] = False) -> str:
if self.portfolios:
raise ValueError('Cannot save portfolios with nested portfolios')
pricing_context = self._pricing_context
with pricing_context:
pricing_date = PricingContext.current.pricing_date
market = PricingContext.current.market
request = RiskRequest(
tuple(RiskPosition(instrument=i, quantity=i.instrument_quantity) for i in self.instruments),
(ResolvedInstrumentValues,),
pricing_and_market_data_as_of=(PricingDateAndMarketDataAsOf(pricing_date=pricing_date,
market=market),)
)
if self.__quote_id:
if not overwrite:
raise ValueError(f'Quote with id {self.__quote_id} already exists. Use overwrite=True to overwrite')
else:
GsPortfolioApi.update_quote(self.__quote_id, request)
_logger.info(f'Updated quote with id {self.__quote_id}')
else:
self.__quote_id = GsPortfolioApi.save_quote(request)
_logger.info(f'Created quote with id {self.__quote_id}')
return self.__quote_id
def save_to_shadowbook(self, name: str):
if self.portfolios:
raise ValueError('Cannot save portfolios with nested portfolios')
pricing_context = self._pricing_context
with pricing_context:
pricing_date = PricingContext.current.pricing_date
market = PricingContext.current.market
request = RiskRequest(
tuple(RiskPosition(instrument=i, quantity=i.instrument_quantity) for i in self.instruments),
(ResolvedInstrumentValues,),
pricing_and_market_data_as_of=(PricingDateAndMarketDataAsOf(pricing_date=pricing_date,
market=market),)
)
status = GsPortfolioApi.save_to_shadowbook(request, name)
print(f'Save to shadowbook status - {status}')
@classmethod
def from_frame(cls, data: pd.DataFrame, mappings: dict = None):
def get_value(this_row: pd.Series, attribute: str):
value = mappings.get(attribute, attribute)
return value(this_row) if callable(value) else this_row.get(value)
instruments = []
mappings = mappings or {}
data = data.replace({np.nan: None})
for row in (r for _, r in data.iterrows() if any(v for v in r.values if v is not None)):
instrument = None
for init_keys in (('asset_class', 'type'), ('$type',)):
init_values = tuple(filter(None, (get_value(row, k) for k in init_keys)))
if len(init_keys) == len(init_values):
instrument = Instrument.from_dict(dict(zip(init_keys, init_values)))
instrument = instrument.from_dict({p: get_value(row, p) for p in instrument.properties()})
break
if instrument:
instruments.append(instrument)
else:
raise ValueError('Neither asset_class/type nor $type specified')
return cls(instruments)
@classmethod
def from_csv(
cls,
csv_file: str,
mappings: Optional[dict] = None
):
data = pd.read_csv(csv_file, skip_blank_lines=True).replace({np.nan: None})
reg = re.compile(r'\.[0-9]')
dupelist = [re.sub(reg, '', word) for word in data.columns if reg.search(word)]
if len(dupelist):
raise ValueError(f'Duplicate column values {dupelist}')
return cls.from_frame(data, mappings)
def scale(self, scaling: int, in_place: bool = True):
instruments = self._get_instruments(self.__position_context.position_date, in_place, False)
if in_place:
for inst in self.all_instruments:
inst.scale(scaling, in_place)
else:
return Portfolio([inst.scale(scaling, in_place) for inst in instruments])
def append(self, priceables: Union[PriceableImpl, Iterable[PriceableImpl]]):
self.priceables += ((priceables,) if isinstance(priceables, PriceableImpl) else tuple(priceables))
def pop(self, item) -> PriceableImpl:
priceable = self[item]
self.priceables = [inst for inst in self.instruments if inst != priceable]
return priceable
def extend(self, portfolio: Iterable):
self.priceables += tuple([p for p in portfolio])
def to_frame(self, mappings: Optional[dict] = None) -> pd.DataFrame:
def to_records(portfolio: Portfolio) -> list:
records = []
for priceable in portfolio.priceables:
if isinstance(priceable, Portfolio):
records.extend(to_records(priceable))
else:
as_dict = priceable.as_dict()
if not hasattr(priceable, 'asset_class'):
as_dict['$type'] = priceable.type_
records.append(dict(chain(as_dict.items(),
(('instrument', priceable), ('portfolio', portfolio.name)))))
return records
df = pd.DataFrame.from_records(to_records(self)).set_index(['portfolio', 'instrument'])
all_columns = df.columns.to_list()
columns = sorted(c for c in all_columns if c not in ('asset_class', 'type', '$type'))
for asset_column in ('$type', 'type', 'asset_class'):
if asset_column in all_columns:
columns = [asset_column] + columns
df = df[columns]
mappings = mappings or {}
for key, value in mappings.items():
if isinstance(value, str):
df[key] = df[value]
elif callable(value):
df[key] = len(df) * [None]
df[key] = df.apply(value, axis=1)
return df
def to_csv(self, csv_file: str, mappings: Optional[dict] = None, ignored_cols: Optional[list] = None):
port_df = self.to_frame(mappings or {})
port_df = port_df[np.setdiff1d(port_df.columns, ignored_cols or [])]
port_df.reset_index(drop=True, inplace=True)
port_df.to_csv(csv_file)
@property
def all_paths(self) -> Tuple[PortfolioPath, ...]:
paths = ()
stack = [(None, self)]
while stack:
parent, portfolio = stack.pop()
for idx, priceable in enumerate(portfolio.__priceables):
path = parent + PortfolioPath(idx) if parent is not None else PortfolioPath(idx)
if isinstance(priceable, Portfolio):
stack.insert(0, (path, priceable))
else:
paths += (path,)
return paths
def paths(self, key: Union[str, PriceableImpl]) -> Tuple[PortfolioPath, ...]:
if not isinstance(key, (str, Instrument, Portfolio)):
raise ValueError('key must be a name or Instrument or Portfolio')
if isinstance(key, str):
idx = self.__priceables_by_name.get(key)
else:
idx = []
for p_idx, p in enumerate(self.__priceables):
if p == key or getattr(p, "unresolved", None) == key:
idx.append(p_idx)
paths = tuple(PortfolioPath(i) for i in idx) if idx else ()
for path, porfolio in ((PortfolioPath(i), p)
for i, p in enumerate(self.__priceables) if isinstance(p, Portfolio)):
paths += tuple(path + sub_path for sub_path in porfolio.paths(key))
return paths
def resolve(self, in_place: bool = True) -> Optional[Union[PricingFuture, PriceableImpl, dict]]:
priceables = self._get_instruments(self.__position_context.position_date, in_place, True)
pricing_context = self._pricing_context
with pricing_context:
futures = [p.resolve(in_place) for p in priceables]
if not in_place:
ret = {} if isinstance(PricingContext.current, HistoricalPricingContext) else Portfolio(name=self.name)
result_future = PricingFuture() if self._return_future else None
def cb(future: CompositeResultFuture):
if isinstance(ret, Portfolio):
ret.priceables = [f.result() for f in future.futures]
else:
priceables_by_date = {}
for future in futures:
for date, priceable in future.result().items():
priceables_by_date.setdefault(date, []).append(priceable)
for date, priceables in priceables_by_date.items():
if any(p for p in priceables if not isinstance(p, PriceableImpl)):
_logger.error(f'Error resolving on {date}, skipping that date')
else:
ret[date] = Portfolio(priceables, name=self.name)
if result_future:
result_future.set_result(ret)
CompositeResultFuture(futures).add_done_callback(cb)
return result_future or ret
def market(self) -> Union[OverlayMarket, PricingFuture, dict]:
"""
Market Data map of coordinates and values. Note that this is not yet supported on all instruments
***Examples**
>>> from gs_quant.markets.portfolio import Portfolio
>>>
>>> portfolio = Portfolio(...)
>>> market = portfolio.market()
"""
pricing_context = self._pricing_context
instruments = self._get_instruments(self.__position_context.position_date, False, False)
with pricing_context:
futures = [i.market() for i in instruments]
result_future = PricingFuture()
def cb(future: CompositeResultFuture):
def update_market_data(all_market_data, this_market_data):
for coordinate, value in this_market_data.items():
existing_value = all_market_data.setdefault(coordinate, value)
if abs(existing_value - value) > 1e-6:
raise ValueError(f'Conflicting values for {coordinate}: {existing_value} vs {value}')
results = [f.result() for f in future.futures]
is_historical = isinstance(results[0], dict)
market_data = None if is_historical else {}
overlay_markets = {} if is_historical else None
for result in results:
if market_data is not None:
update_market_data(market_data, result.market_data_dict)
else:
for market in result.values():
update_market_data(overlay_markets.setdefault(market, {}), market.market_data)
if market_data:
ret = OverlayMarket(base_market=results[0], market_data=market_data)
else:
ret = {base_market.date: OverlayMarket(base_market=base_market, market_data=market_data)
for base_market, market_data in overlay_markets.items()}
if result_future:
result_future.set_result(ret)
CompositeResultFuture(futures).add_done_callback(cb)
return result_future if self._return_future else result_future.result()
def calc(self, risk_measure: Union[RiskMeasure, Iterable[RiskMeasure]], fn=None) -> PortfolioRiskResult:
priceables = self._get_instruments(self.__position_context.position_date, False, True)
with self._pricing_context:
# PortfolioRiskResult should hold a copy of the portfolio instead of a reference to the portfolio
# this is to prevent the portfolio object within portfolioriskresult to hold a reference to the portfolio
# object should it later be modified in place (eg: resolution)
return PortfolioRiskResult(self.clone(),
(risk_measure,) if isinstance(risk_measure, RiskMeasure) else risk_measure,
[p.calc(risk_measure, fn=fn) for p in priceables])
def _get_instruments(self, position_date: dt.date, in_place: bool, return_priceables: bool = True):
if self.id:
dates_prior = list(filter(lambda date: date < position_date,
GsPortfolioApi.get_position_dates(self.id)))
if len(dates_prior) == 0:
raise ValueError('Your portfolio has no positions on the PositionContext date')
date = max(dates_prior)
response = GsPortfolioApi.get_positions_for_date(self.id, date)
positions = response.positions if response else []
instruments = GsAssetApi.get_instruments_for_positions(positions)
if in_place:
self.__priceables = instruments
return instruments
return self.__priceables if return_priceables else self.all_instruments
def clone(self, clone_instruments: bool = False):
portfolio_clone = Portfolio(
[p.clone(clone_instruments) if isinstance(p, Portfolio) else p.clone() if clone_instruments else p for p in
self.__priceables], name=self.name)
portfolio_clone.__id = self.__id
portfolio_clone.__quote_id = self.__quote_id
return portfolio_clone