Source code for gs_quant.markets.securities

"""
Copyright 2019 Goldman Sachs.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

  http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied.  See the License for the
specific language governing permissions and limitations
under the License.
"""
import calendar
import datetime
import datetime as dt
import json
import logging
import threading
import time
from abc import ABCMeta, abstractmethod
from collections import defaultdict
from copy import deepcopy
from enum import auto, Enum
from functools import partial
from typing import Tuple, Generator, Iterable, Optional, Dict, List, Union

import backoff
import cachetools
import pandas as pd
import pytz
from dateutil.relativedelta import relativedelta
from pydash import get

from gs_quant.api.gs.assets import GsAsset, AssetParameters, AssetType as GsAssetType, Currency, GsIdType, GsAssetApi
from gs_quant.api.gs.data import GsDataApi
from gs_quant.api.utils import ThreadPoolManager
from gs_quant.base import get_enum_value
from gs_quant.common import DateLimit
from gs_quant.data import DataMeasure, DataFrequency, Dataset
from gs_quant.data.coordinate import DataDimensions
from gs_quant.data.coordinate import DateOrDatetime
from gs_quant.data.core import IntervalFrequency, DataAggregationOperator
from gs_quant.entities.entity import Entity, EntityIdentifier, EntityType, PositionedEntity
from gs_quant.errors import MqValueError, MqTypeError, MqRequestError
from gs_quant.json_encoder import JSONEncoder
from gs_quant.markets import PricingContext
from gs_quant.markets.indices_utils import BasketType, IndicesDatasets
from gs_quant.session import GsSession
from gs_quant.target.common import AssetClass
from gs_quant.target.data import DataQuery

_logger = logging.getLogger(__name__)


class ExchangeCode(Enum):
    """Exchange enumeration

    Exchange codes representing global venues where Securities are listed and traded

    """

    NASDAQ = "NASD"  # Nasdaq Global Stock Market
    NYSE = "NYSE"  # New York Stock Exchange


[docs]class AssetType(Enum): """Asset type enumeration Enumeration of different types of asset or security. """ #: Index which tracks an evolving portfolio of securities, and can be traded through cash or derivatives markets INDEX = "Index" #: Exchange traded fund which tracks an evolving portfolio of securities and is listed on an exchange to be #: traded as a security ETF = "ETF" #: Bespoke basket which provides exposure to a customized collection of assets with levels published daily; can be #: traded on swap and rebalanced programmatically CUSTOM_BASKET = "Custom Basket" #: Bespoke basket which provides exposure to a customized collection of assets with levels published daily; #: basket composition maintained by Goldman Sachs Investment Research RESEARCH_BASKET = "Research Basket" #: Listed equities which provide access to equity holding in a company and participation in dividends and other #: distributions in common, preferred or other variants which provide different investor rights STOCK = "Single Stock" #: Standardized listed contract which provides delivery of an asset at a pre-defined forward date and can be #: settled in cash or physical form FUTURE = "Future" #: FX cross or currency pair CROSS = "Cross" #: Currency CURRENCY = "Currency" #: Rate RATE = "Rate" #: Cash CASH = "Cash" #: Weather Index WEATHER_INDEX = "Weather Index" #: Swap SWAP = "Swap" #: Swaption SWAPTION = "Swaption" #: Option OPTION = "Option" #: Binary BINARY = "Binary" #: Commodity Reference Price COMMODITY_REFERENCE_PRICE = "Commodity Reference Price" # COMMODITY NATURAL GAS Hub COMMODITY_NATURAL_GAS_HUB = "Commodity Natural Gas Hub" # COMMODITY EU NATURAL GAS Hub COMMODITY_EU_NATURAL_GAS_HUB = "Commodity EU Natural Gas Hub" #: Commodity Power Node COMMODITY_POWER_NODE = "Commodity Power Node" #: Commodity Power Aggregated Nodes COMMODITY_POWER_AGGREGATED_NODES = "Commodity Power Aggregated Nodes" #: Bond BOND = "Bond" #: Future Market FUTURE_MARKET = "Future Market" #: Future Contract FUTURE_CONTRACT = "Future Contract" #: Commodity COMMODITY = "Commodity" #: Crypto CRYPTOCURRENCY = "Cryptocurrency" #: Forward FORWARD = "Forward" #: Fund FUND = "Fund" #: Default Swap DEFAULT_SWAP = "Default Swap" #: Systematic Hedging SYSTEMATIC_HEDGING = 'Systematic Hedging' #: Access ACCESS = 'Access' #: Risk Premia RISK_PREMIA = 'Risk Premia' #: Multi Asset Allocation MULTI_ASSET_ALLOCATION = 'Multi-Asset Allocation' # Sec Master types ADR = 'ADR' GDR = 'GDR' DUTCH_CERT = 'Dutch Cert' NYRS = 'NY Reg Shrs' RECEIPT = 'Receipt' UNIT = 'Unit' MUTUAL_FUND = 'Mutual Fund' RIGHT = 'Right' PREFERRED = 'Preferred' MISC = 'Misc.' REIT = 'REIT' PRIVATE_COMP = 'Private Comp' PREFERENCE = 'Preference' LIMITED_PARTNERSHIP = 'Ltd Part' TRACKING_STOCK = 'Tracking Stk' ROYALTY_TRUST = 'Royalty Trst' CLOSED_END_FUND = 'Closed-End Fund' OPEN_END_FUND = 'Open-End Fund' FUND_OF_FUNDS = 'Fund of Funds' MLP = 'MLP' STAPLED_SECURITY = 'Stapled Security' SAVINGS_SHARE = 'Savings Share' EQUITY_WRT = 'Equity WRT' # ETF already defined SAVINGS_PLAN = 'Savings Plan' EQUITY_INDEX = 'Equity Index' COMMON_STOCK = 'Common Stock'
[docs]class AssetIdentifier(EntityIdentifier): """Asset type enumeration Enumeration of different security identifiers """ MARQUEE_ID = "MQID" #: Goldman Sachs Marquee identifier code (MA4B66MW5E27UAHKG34) REUTERS_ID = "RIC" #: Thompson Reuters Instrument Code (RIC), (GS.N) BLOOMBERG_ID = "BBID" #: Bloomberg identifier and exchange code (GS UN) BLOOMBERG_COMPOSITE_ID = "BCID" #: Bloomberg composite identifier and exchange code (GS US) CUSIP = "CUSIP" #: Committee on Uniform Security Identification Procedures code (38141G104) ISIN = "ISIN" #: International Securities Identification Number (US38141G1040) SEDOL = "SEDOL" #: LSE Stock Exchange Daily Official List code (2407966) TICKER = "TICKER" #: Exchange ticker (GS) PLOT_ID = "PLOT_ID" #: ID for Marquee PlotTool GSID = "GSID"
class SecurityIdentifier(EntityIdentifier): GSID = "gsid" RCIC = "rcic" RIC = "ric" ID = "id" CUSIP = "cusip" CUSIP8 = "cusip8" SEDOL = "sedol" ISIN = "isin" TICKER = "ticker" BBID = "bbid" BCID = "bcid" GSS = "gss" PRIMEID = "primeId" BBG = "bbg" ASSET_ID = "assetId" ANY = "identifiers" class ReturnType(Enum): """Index return type Represents different index return types or funding models """ EXCESS_RETURN = "Excess Return" # Returns are excess of funding rate in denominated currency TOTAL_RETURN = "Total Return" # Returns are inclusive of funding rate in denominated currency
[docs]class Asset(Entity, metaclass=ABCMeta):
[docs] def __init__(self, id_: str, asset_class: AssetClass, name: str, exchange: Optional[str] = None, currency: Optional[str] = None, parameters: AssetParameters = None, entity: Optional[Dict] = None): super().__init__(id_, EntityType.ASSET, entity=entity) self.__id = id_ self.asset_class = asset_class self.name = name self.exchange = exchange self.currency = currency self.parameters = parameters self.entity = entity
def get_marquee_id(self): return self.__id def get_url(self) -> str: """ Retrieve url to asset's product page on Marquee """ env = '-dev-ext.web' if 'dev' in get(GsSession, 'current.domain', '') else '' env = '-qa' if 'qa' in get(GsSession, 'current.domain', '') else env return f'https://marquee{env}.gs.com/s/products/{self.get_marquee_id()}/summary' def get_identifiers(self, as_of: dt.date = None) -> dict: """ Get asset identifiers :param as_of: As of date for query :return: dict of identifiers **Usage** Get asset identifiers as of a given date. Where the identifiers are temporal (and can change over time), this function will return the identifiers as of that point in time. If no date is provided as a parameter, will use the current PricingContext. **Examples** Get current asset identifiers: >>> gs = SecurityMaster.get_asset("GS", AssetIdentifier.TICKER) >>> gs.get_identifiers() Get identifiers as of 1Jan18: >>> gs.get_identifiers(dt.date(2018,1,1)) Use PricingContext to determine as of date: >>> with PricingContext(dt.date(2018,1,1)) as ctx: >>> gs.get_identifiers() **See also** :class:`AssetIdentifier` :func:`get_asset` """ if not as_of: current = PricingContext.current if not current.is_entered: with current: as_of = current.pricing_date else: as_of = current.pricing_date if isinstance(as_of, dt.datetime): as_of = as_of.date() valid_ids = set(item.value for item in AssetIdentifier) xrefs = GsAssetApi.get_asset_xrefs(self.get_marquee_id()) identifiers = {} for xref in xrefs: start_date = xref.startDate end_date = xref.endDate if start_date <= as_of <= end_date: identifiers = {k.upper(): v for k, v in xref.identifiers.as_dict().items() if k.upper() in valid_ids} return identifiers @cachetools.cached(cachetools.TTLCache(256, 600), lambda s, id_type, as_of=None: cachetools.keys.hashkey(s.get_marquee_id(), id_type, as_of), threading.RLock()) def get_identifier(self, id_type: AssetIdentifier, as_of: dt.date = None): """ Get asset identifier :param as_of: As of date for query :param id_type: requested id type :return: identifier value **Usage** Get asset identifier as of a given date. Where the identifiers are temporal (and can change over time), this function will return the identifier as of that point in time. If no date is provided as a parameter, will use the current PricingContext. **Examples** Get current SEDOL: >>> import datetime as dt >>> >>> gs = SecurityMaster.get_asset("GS", AssetIdentifier.TICKER) >>> gs.get_identifier(AssetIdentifier.SEDOL) Get SEDOL as of 1Jan18: >>> gs.get_identifier(AssetIdentifier.SEDOL, as_of=dt.date(2018,1,1)) Use PricingContext to determine as of date: >>> with PricingContext(dt.date(2018,1,1)) as ctx: >>> gs.get_identifier(AssetIdentifier.SEDOL) **See also** :class:`AssetIdentifier` :func:`get_asset_identifiers` """ if id_type == AssetIdentifier.MARQUEE_ID: return self.get_marquee_id() ids = self.get_identifiers(as_of=as_of) return ids.get(id_type.value) def get_data_series(self, measure: DataMeasure, dimensions: Optional[DataDimensions] = None, frequency: Optional[DataFrequency] = None, start: Optional[DateOrDatetime] = None, end: Optional[DateOrDatetime] = None, dates: List[dt.date] = None, operator: DataAggregationOperator = None) -> pd.Series: """ Get asset series :param measure: measure to get as series :param dimensions: dimensions to query (e.g. tenor) :param frequency: data frequency to query :param start: start of the series :param end: end of the series :return: timeseries of given measure **Usage** Get a given timeseries for the asset **Examples** Get close price series: >>> from gs_quant.markets.securities import SecurityMaster >>> from gs_quant.data import DataMeasure >>> >>> gs = SecurityMaster.get_asset("GS", AssetIdentifier.TICKER) >>> gs.get_data_series(DataMeasure.CLOSE_PRICE) **See also** :class:`DataMeasure` """ coordinate = self.get_data_coordinate(measure, dimensions, frequency) if coordinate is None: raise MqValueError(f"No data coordinate found for parameters: {measure, dimensions, frequency}") elif coordinate.dataset_id is None: raise MqValueError(f"Measure '{measure.value}' not found for asset: {self.__id}") return coordinate.get_series(start=start, end=end, dates=dates, operator=operator) def get_latest_close_price(self) -> float: coordinate = self.get_data_coordinate(DataMeasure.CLOSE_PRICE, None, DataFrequency.DAILY) if coordinate is None: raise MqValueError(f"No data co-ordinate found for these parameters: \ {DataMeasure.CLOSE_PRICE, None, DataFrequency.DAILY}") return coordinate.last_value() def get_close_price_for_date(self, date: dt.date) -> pd.Series: return self.get_data_series(DataMeasure.CLOSE_PRICE, None, DataFrequency.DAILY, date, date) def get_close_prices(self, start: dt.date = DateLimit.LOW_LIMIT.value, end: dt.date = dt.date.today()) -> pd.Series: """ Get close price series :return: timeseries of close prices **Usage** Get close prices for an asset **Examples** Get close price series: >>> from gs_quant.markets.securities import SecurityMaster >>> >>> gs = SecurityMaster.get_asset("GS", AssetIdentifier.TICKER) >>> gs.get_close_prices() **See also** :class:`DataMeasure` :func:`get_data_series` """ return self.get_data_series(DataMeasure.CLOSE_PRICE, None, DataFrequency.DAILY, start, end) def get_hloc_prices(self, start: dt.date = DateLimit.LOW_LIMIT.value, end: dt.date = dt.date.today(), interval_frequency: IntervalFrequency = IntervalFrequency.DAILY) -> pd.DataFrame: """ Get high, low, open, close (hloc) prices :return: dataframe indexed by datetimes bucketed by the given interval_frequency with High, Low, Open, and Close columns **Usage** Get high, low, open, and close prices for an asset for the given interval frequency. **Examples** Get hloc price series: >>> from gs_quant.markets.securities import SecurityMaster >>> >>> gs = SecurityMaster.get_asset("GS", AssetIdentifier.TICKER) >>> gs.get_hloc_prices() **See also** :class:`DataMeasure` :func:`get_close_prices` """ if self.asset_class == AssetClass.Equity: if interval_frequency == IntervalFrequency.DAILY: dates = None use_field = False elif interval_frequency == IntervalFrequency.MONTHLY: d = dt.date(start.year, start.month, 1) dates = [d, dt.date(d.year, d.month, calendar.monthrange(d.year, d.month)[-1])] d += relativedelta(months=1) while d < end: dates.append(dt.date(d.year, d.month, calendar.monthrange(d.year, d.month)[-1])) d += relativedelta(months=1) dates.append(end) use_field = True else: raise MqValueError(f'Unsupported IntervalFrequency {interval_frequency.value} for get_hloc_prices') tasks = [ partial(self.get_data_series, DataMeasure.ADJUSTED_HIGH_PRICE, None, DataFrequency.DAILY, start, end, dates=dates, operator=DataAggregationOperator.MAX if use_field else None), partial(self.get_data_series, DataMeasure.ADJUSTED_LOW_PRICE, None, DataFrequency.DAILY, start, end, dates=dates, operator=DataAggregationOperator.MIN if use_field else None), partial(self.get_data_series, DataMeasure.ADJUSTED_OPEN_PRICE, None, DataFrequency.DAILY, start, end, dates=dates, operator=DataAggregationOperator.FIRST if use_field else None), partial(self.get_data_series, DataMeasure.ADJUSTED_CLOSE_PRICE, None, DataFrequency.DAILY, start, end, dates=dates, operator=DataAggregationOperator.LAST if use_field else None) ] results = ThreadPoolManager.run_async(tasks) df = pd.DataFrame({'high': results[0], 'low': results[1], 'open': results[2], 'close': results[3]}) elif self.asset_class == AssetClass.FX: if interval_frequency != IntervalFrequency.DAILY: raise MqValueError('Unsupported IntervalFrequency for FX asset class.') ds = Dataset('FX_HLOC') df = ds.get_data(start=start, end=end, assetId=self.get_marquee_id()) df = df.drop(columns=['assetId', 'updateTime']).reindex(columns=['high', 'low', 'open', 'close']) else: raise MqValueError('Unsupported AssetClass for HLOC data.') return df.dropna() @abstractmethod def get_type(self) -> AssetType: """Overridden by sub-classes to return security type""" @classmethod def entity_type(cls) -> EntityType: return EntityType.ASSET @property def data_dimension(self) -> str: return 'assetId' @classmethod def get(cls, id_value: str, id_type: AssetIdentifier, as_of: Union[dt.date, dt.datetime] = None, exchange_code: ExchangeCode = None, asset_type: AssetType = None, sort_by_rank: bool = False) -> Optional['Asset']: asset = SecurityMaster.get_asset(id_value, id_type, as_of, exchange_code, asset_type, sort_by_rank) return asset
class SecMasterAsset(Asset): def __init__(self, id_: str, asset_type: AssetType, asset_class: AssetClass, name: str, exchange: Optional[str] = None, currency: Optional[str] = None, parameters: AssetParameters = None, entity: Optional[Dict] = None): Asset.__init__(self, id_, asset_class=asset_class, name=name, exchange=exchange, currency=currency, parameters=parameters, entity=entity) self.__asset_type = asset_type self.__cached_identifiers = None def get_type(self) -> AssetType: return self.__asset_type def get_marquee_id(self): marquee_id = self.get_identifier(SecurityIdentifier.ASSET_ID) self.__id = marquee_id # Updates Marquee Id in case it changes from context change if marquee_id is None: current = PricingContext.current if not current.is_entered: with current: current_pricing_date = current.pricing_date else: current_pricing_date = current.pricing_date raise MqValueError( f"Current SecMasterAsset does not have a Marquee Id as of {current_pricing_date}. " f"Perhaps asset did not exist at that time, or is a not an exchange-level asset.") return marquee_id def get_identifier(self, id_type: Union[AssetIdentifier, SecurityIdentifier], as_of: dt.date = None): # Add an exception since original get_identifier() takes id_type: AssetIdentifier if not isinstance(id_type, SecurityIdentifier): raise MqTypeError( f"""Expected id_type: SecurityIdentifier.enum for Assets sourced from SecurityMaster. Received: {id_type}""") if id_type == SecurityIdentifier.GSID: return self.entity['identifiers'].get(SecurityIdentifier.GSID.value) if id_type == SecurityIdentifier.ID: return self.entity['id'] ids = self.get_identifiers(as_of=as_of) return ids.get(id_type.value, None) def get_identifiers(self, as_of: dt.date = None) -> dict: # Cache identifiers if not already there if self.__cached_identifiers is None: self.__load_identifiers() # Retrieve from cached identifiers if as_of is None: current = PricingContext.current if not current.is_entered: with current: as_of = current.pricing_date else: as_of = current.pricing_date identifiers = dict() for id_type in SecurityIdentifier: id_history = self.__cached_identifiers.get(id_type.value) if id_history is not None: for xref in id_history: if xref["start_date"] <= as_of <= xref["end_date"]: identifiers[id_type.value] = xref["value"] break # Add GSID and ID as it is not exposed in Get Identifiers History identifiers[SecurityIdentifier.ID.value] = self.entity.get('id') identifiers[SecurityIdentifier.GSID.value] = self.entity.get('identifiers').get(SecurityIdentifier.GSID.value) # Mainly for currencies, where assetId is not exposed in Get Identifiers History if SecurityIdentifier.ASSET_ID.value not in identifiers and self.__asset_type == AssetType.CURRENCY: identifiers[SecurityIdentifier.ASSET_ID.value] = self.entity.get("identifiers").get("assetId") # TODO: BCID and BBID are not exposed in Get Identifiers History. return identifiers def get_data_series(self, measure: DataMeasure, dimensions: Optional[DataDimensions] = None, frequency: Optional[DataFrequency] = None, start: Optional[DateOrDatetime] = None, end: Optional[DateOrDatetime] = None, dates: List[dt.date] = None, operator: DataAggregationOperator = None) -> pd.Series: """ Will be also called by Asset.get_close_prices(), Asset.get_close_price_for_date(). """ coordinate = self.get_data_coordinate(measure, dimensions, frequency) if coordinate is None: raise MqValueError(f"No data coordinate found for parameters:{measure, dimensions, frequency}") range_start, range_end = coordinate.get_range(start, end) if self.__is_validate_range(start=range_start, end=range_end): with PricingContext(range_start): return super(SecMasterAsset, self).get_data_series(measure=measure, dimensions=dimensions, frequency=frequency, start=range_start, end=range_end, dates=dates, operator=operator) def get_hloc_prices(self, start: dt.date = DateLimit.LOW_LIMIT.value, end: dt.date = dt.date.today(), interval_frequency: IntervalFrequency = IntervalFrequency.DAILY) -> pd.DataFrame: if self.__is_validate_range(start=start, end=end): with PricingContext(start): return super(SecMasterAsset, self).get_hloc_prices(start=start, end=end, interval_frequency=interval_frequency) def __is_validate_range(self, start: DateOrDatetime, end: DateOrDatetime = dt.date.today()) -> bool: """ Validates that only one Marquee Id exist in start and end. - This function will return True if only one Marquee id exists in range. - This function will raise MqValueError if either many Marquee Ids exists in or none exist in input range or the Id at start_date is not equal to id at end_date. Example: __cached_identifiers = {"assetId" : [{start: 2020-01-01, end: 2020-01-05, value: "marqueeId1"} {start: 2020-01-06, end: 2020-10-10, value: "marqueeId2"}]} args = {start: 2020-01-01, end: 2020-01-07} return: MqValueError """ if self.__cached_identifiers is None: self.__load_identifiers() if isinstance(start, datetime.datetime): start_date = start.date else: start_date = start if isinstance(end, datetime.datetime): end_date = end.date else: end_date = end with PricingContext(start_date): start_marquee_id = self.get_marquee_id() with PricingContext(end_date): end_marquee_id = self.get_marquee_id() if start_marquee_id is None or end_marquee_id is None or start_marquee_id != end_marquee_id: raise MqValueError( f"Asset's Marquee Id is either none or different. start:[{start_date}->{start_marquee_id}] to " f"end=[{end_date}->{end_marquee_id}].") marquee_id_xref = self.__cached_identifiers.get(SecurityIdentifier.ASSET_ID.value) marquee_ids = set() output_range_start = None output_range_end = None overlap_ranges = defaultdict(list) for xref in marquee_id_xref: if end_date < xref['start_date'] or start_date > xref['end_date']: # Skip xrefs that are outside range continue marquee_id = xref.get("value") range_start = max(start_date, xref['start_date']) range_end = min(end_date, xref['end_date']) if range_start <= range_end: marquee_ids.add(marquee_id) overlap_ranges[marquee_id].append([range_start.strftime("%Y-%m-%d"), range_end.strftime("%Y-%m-%d")]) output_range_start = min(output_range_start, range_start) if output_range_start is not None else output_range_start output_range_end = max(output_range_end, range_end) if output_range_end is not None else output_range_end if len(marquee_ids) > 1: raise MqValueError( f"Asset has multiple Marquee ids between [start,end]=[{start_date},{end_date}] due to corporate " f"actions. Try limiting the range over a single Marquee id. Marquee Ids found: {overlap_ranges}.") if len(marquee_ids) == 0: raise MqValueError( f"Asset was not assigned Marquee Id over range [start,end]=[{start_date},{end_date}]. " f"Perhaps asset did not exist at that range, or is a not an exchange-level asset.") return True def __load_identifiers(self) -> None: if self.__cached_identifiers is None: r = GsSession.current._get(f'/markets/securities/{self.entity["id"]}/identifiers') results = r['results'] xrefs = defaultdict(list) for temporal_xref in results: id_type = temporal_xref['type'] xref_dict = { "start_date": datetime.datetime.strptime(temporal_xref['startDate'], "%Y-%m-%d").date(), "update_date": temporal_xref['updateTime'], "value": temporal_xref['value'] } if temporal_xref['endDate'] == "9999-99-99": xref_dict['end_date'] = datetime.datetime.max.date() else: xref_dict['end_date'] = datetime.datetime.strptime(temporal_xref['endDate'], "%Y-%m-%d").date() xrefs[id_type].append(xref_dict) self.__cached_identifiers = xrefs
[docs]class Stock(Asset): """Base Security Type Represents a financial asset which can be held in a portfolio, or has an observable price fixing which can be referenced in a derivative transaction """
[docs] def __init__(self, id_: str, name: str, exchange: Optional[str] = None, currency: Optional[Currency] = None, entity: Optional[Dict] = None): Asset.__init__(self, id_, AssetClass.Equity, name, exchange, currency, entity=entity)
def get_type(self) -> AssetType: return AssetType.STOCK def get_currency(self) -> Optional[Currency]: return self.currency def get_thematic_beta(self, basket_identifier: str, start: dt.date = DateLimit.LOW_LIMIT.value, end: dt.date = dt.date.today()) -> pd.DataFrame: response = GsAssetApi.resolve_assets(identifier=[basket_identifier], fields=['id', 'type'], limit=1)[basket_identifier] _id, _type = get(response, '0.id'), get(response, '0.type') if len(response) == 0 or _id is None: raise MqValueError(f'Basket could not be found using identifier {basket_identifier}.') if _type not in BasketType.to_list(): raise MqValueError(f'Asset {basket_identifier} of type {_type} is not a Custom or Research Basket.') query = DataQuery(where={'gsid': self.get_identifier(AssetIdentifier.GSID, end), 'basketId': _id}, start_date=start, end_date=end) response = GsDataApi.query_data(query=query, dataset_id=IndicesDatasets.THEMATIC_FACTOR_BETAS_STANDARD.value) df = [] for r in response: df.append({'date': r['date'], 'gsid': r['gsid'], 'basketId': r['basketId'], 'thematicBeta': r['beta']}) df = pd.DataFrame(df) return df.set_index('date')
class Cross(Asset): """Base Security Type Represents a financial asset which can be held in a portfolio, or has an observable price fixing which can be referenced in a derivative transaction """ def __init__(self, id_: str, name: str, entity: Optional[Dict] = None, asset_class: Optional[Union[AssetClass, str]] = AssetClass.FX): if isinstance(asset_class, str): asset_class = get_enum_value(AssetClass, asset_class) Asset.__init__(self, id_, asset_class, name, entity=entity) def get_type(self) -> AssetType: return AssetType.CROSS class Future(Asset): """Future Security Type Represents a standardized listed contract which provides delivery of an asset at a pre-defined forward date and can be settled in cash or physical form """ def __init__(self, id_: str, asset_class: Union[AssetClass, str], name: str, currency: Optional[Currency] = None, entity: Optional[Dict] = None): if isinstance(asset_class, str): asset_class = get_enum_value(AssetClass, asset_class) Asset.__init__(self, id_, asset_class, name, currency=currency, entity=entity) def get_type(self) -> AssetType: return AssetType.FUTURE def get_currency(self) -> Optional[Currency]: return self.currency class Currency(Asset): """Base Security Type Represents a financial asset which can be held in a portfolio, or has an observable price fixing which can be referenced in a derivative transaction """ def __init__(self, id_: str, name: str, entity: Optional[Dict] = None): Asset.__init__(self, id_, AssetClass.Cash, name, entity=entity) def get_type(self) -> AssetType: return AssetType.CURRENCY class Rate(Asset): """Base Security Type Represents a financial asset which can be held in a portfolio, or has an observable price fixing which can be referenced in a derivative transaction """ def __init__(self, id_: str, name: str, entity: Optional[Dict] = None): Asset.__init__(self, id_, AssetClass.Rates, name, entity=entity) def get_type(self) -> AssetType: return AssetType.RATE class Cash(Asset): """Cash Security Type Represents a financial asset which can be held in a portfolio, or has an observable price fixing which can be referenced in a derivative transaction """ def __init__(self, id_: str, name: str, entity: Optional[Dict] = None): Asset.__init__(self, id_, AssetClass.Cash, name, entity=entity) def get_type(self) -> AssetType: return AssetType.CASH class WeatherIndex(Asset): """Weather Index Type Represents an underlying index on a weather derivative, including where the data (e.g. CPD) has been collected, an actual physical reference point (weather station) and various fall back arrangements. """ def __init__(self, id_: str, name: str, entity: Optional[Dict] = None): Asset.__init__(self, id_, AssetClass.Commod, name, entity=entity) def get_type(self) -> AssetType: return AssetType.WEATHER_INDEX class CommodityReferencePrice(Asset): """Commodity Reference Price Represents an underlying index for commodities in the event that no ISDA Commodity Reference Price exists. Includes base, details, unit, currency and exchange id or publication etc. """ def __init__(self, id_: str, name: str, entity: Optional[Dict] = None): Asset.__init__(self, id_, AssetClass.Commod, name, entity=entity) def get_type(self) -> AssetType: return AssetType.COMMODITY_REFERENCE_PRICE class CommodityNaturalGasHub(Asset): """Commodity Natural Gas Hub Represents a distinct location in commodity Natural Gas markets """ def __init__(self, id_: str, name: str, entity: Optional[Dict] = None): Asset.__init__(self, id_, AssetClass.Commod, name, entity=entity) def get_type(self) -> AssetType: return AssetType.COMMODITY_NATURAL_GAS_HUB class CommodityEUNaturalGasHub(Asset): """Commodity EU Natural Gas Hub Represents a virtual/physical hub in EU Natural Gas markets """ def __init__(self, id_: str, name: str, entity: Optional[Dict] = None): Asset.__init__(self, id_, AssetClass.Commod, name, entity=entity) def get_type(self) -> AssetType: return AssetType.COMMODITY_EU_NATURAL_GAS_HUB class Cryptocurrency(Asset): """Cryptocurrency Represents a cryptocurrency """ def __init__(self, id_: str, asset_class: Union[AssetClass, str], name: str, entity: Optional[Dict] = None): Asset.__init__(self, id_, asset_class, name, entity=entity) def get_type(self) -> AssetType: return AssetType.CRYPTOCURRENCY class CommodityPowerNode(Asset): """Commodity Power Node Represents a distinct location in commodity power markets """ def __init__(self, id_: str, name: str, entity: Optional[Dict] = None): Asset.__init__(self, id_, AssetClass.Commod, name, entity=entity) def get_type(self) -> AssetType: return AssetType.COMMODITY_POWER_NODE class CommodityPowerAggregatedNodes(Asset): """Commodity Power Aggregated Nodes Represents a group of locations in commodity power markets """ def __init__(self, id_: str, name: str, entity: Optional[Dict] = None): Asset.__init__(self, id_, AssetClass.Commod, name, entity=entity) def get_type(self) -> AssetType: return AssetType.COMMODITY_POWER_AGGREGATED_NODES class Commodity(Asset): """Commodity Represents a commodity. """ def __init__(self, id_: str, name: str, entity: Optional[Dict] = None): Asset.__init__(self, id_, AssetClass.Commod, name, entity=entity) def get_type(self) -> AssetType: return AssetType.COMMODITY class Bond(Asset): """Bond Represents a bond. """ def __init__(self, id_: str, name: str, asset_class: AssetClass = AssetClass.Credit, entity: Optional[Dict] = None): Asset.__init__(self, id_, asset_class, name, entity=entity) def get_type(self) -> AssetType: return AssetType.BOND class Fund(Asset): """Fund Represents a fund. """ def __init__(self, id_: str, name: str, asset_class: AssetClass, entity: Optional[Dict] = None): Asset.__init__(self, id_, asset_class, name, entity=entity) def get_type(self) -> AssetType: return AssetType.FUND class FutureMarket(Asset): """Future Market Represents a future market """ def __init__(self, id_: str, asset_class: Union[AssetClass, str], name: str, entity: Optional[Dict] = None): if isinstance(asset_class, str): asset_class = get_enum_value(AssetClass, asset_class) Asset.__init__(self, id_, asset_class, name, entity=entity) def get_type(self) -> AssetType: return AssetType.FUTURE_MARKET class FutureContract(Asset): """Future Contract Represents a future contract """ def __init__(self, id_: str, asset_class: Union[AssetClass, str], name: Union[AssetClass, str], entity: Optional[Dict] = None): if isinstance(asset_class, str): asset_class = get_enum_value(AssetClass, asset_class) Asset.__init__(self, id_, asset_class, name, entity=entity) def get_type(self) -> AssetType: return AssetType.FUTURE_CONTRACT class Swap(Asset): """Swap Instrument Type Represents a Swap Instrument """ def __init__(self, id_: str, asset_class: Union[AssetClass, str], name: str, entity: Optional[Dict] = None): if isinstance(asset_class, str): asset_class = get_enum_value(AssetClass, asset_class) Asset.__init__(self, id_, asset_class, name, entity=entity) def get_type(self) -> AssetType: return AssetType.SWAP class Option(Asset): """Option Instrument Type Represents an Option Instrument """ def __init__(self, id_: str, asset_class: Union[AssetClass, str], name: str, entity: Optional[Dict] = None): if isinstance(asset_class, str): asset_class = get_enum_value(AssetClass, asset_class) Asset.__init__(self, id_, asset_class, name, entity=entity) def get_type(self) -> AssetType: return AssetType.OPTION class Forward(Asset): def __init__(self, id_: str, asset_class: Union[AssetClass, str], name: str, entity: Optional[Dict] = None): if isinstance(asset_class, str): asset_class = get_enum_value(AssetClass, asset_class) Asset.__init__(self, id_, asset_class, name, entity=entity) def get_type(self) -> AssetType: return AssetType.FORWARD class ETF(Asset, PositionedEntity): """ETF Asset ETF which tracks an evolving portfolio of securities, and can be traded on exchange """ def __init__(self, id_: str, asset_class: AssetClass, name: str, exchange: Optional[str] = None, currency: Optional[Currency] = None, entity: Optional[Dict] = None): Asset.__init__(self, id_, asset_class, name, exchange, currency, entity=entity) PositionedEntity.__init__(self, id_, EntityType.ASSET) def get_type(self) -> AssetType: return AssetType.ETF def get_currency(self) -> Optional[Currency]: return self.currency class Swaption(Asset): """Swaption Represents a swaption. """ def __init__(self, id_: str, name: str, entity: Optional[Dict] = None): Asset.__init__(self, id_, AssetClass.Rates, name, entity=entity) def get_type(self) -> AssetType: return AssetType.SWAPTION class Binary(Asset): """Binary Represents a binary. """ def __init__(self, id_: str, name: str, asset_class: AssetClass, entity: Optional[Dict] = None): Asset.__init__(self, id_, asset_class, name, entity=entity) def get_type(self) -> AssetType: return AssetType.BINARY class DefaultSwap(Asset): """DefaultSwap Represents a default swap. """ def __init__(self, id_: str, name: str, entity: Optional[Dict] = None): Asset.__init__(self, id_, AssetClass.Credit, name, entity=entity) def get_type(self) -> AssetType: return AssetType.DEFAULT_SWAP class XccySwapMTM(Asset): """XccySwapMTM Represents a cross-currency mark-to-market swap. """ def __init__(self, id_: str, name: str, entity: Optional[Dict] = None): Asset.__init__(self, id_, AssetClass.Rates, name, entity=entity) def get_type(self) -> AssetType: return AssetType.XccySwapMTM class MutualFund(Asset): """MutualFund Represents a mutual fund asset. """ def __init__(self, id_: str, name: str, asset_class: AssetClass, entity: Optional[Dict] = None): Asset.__init__(self, id_, asset_class, name, entity=entity) def get_type(self) -> AssetType: return AssetType.MUTUAL_FUND class SecurityMasterSource(Enum): ASSET_SERVICE = auto() SECURITY_MASTER = auto() class Security: def __init__(self, json: dict): for k, v in json.items(): if k == 'identifiers': self._ids = {inner_k: inner_v for inner_k, inner_v in v.items()} else: setattr(self, k, v) def __str__(self): return str({k: v for k, v in self.__dict__.items() if not k.startswith("_")}) def get_identifiers(self): return deepcopy(self._ids) @backoff.on_exception(backoff.expo, MqRequestError, giveup=lambda e: e.status != 429) def _get_with_retries(url, payload): return GsSession.current._get(url, payload=payload)
[docs]class SecurityMaster: """Security Master The SecurityMaster class provides an interface to security lookup functions. This allows querying and retrieval of different security types (assets) based on a variety of different identifiers through point-in-time lookups. Uses the current PricingContext to provide as of dates if optional arguments are not provided. Will return the relevant asset subclass depending on the type of the security **See also** :class:`Asset` """ _source = SecurityMasterSource.ASSET_SERVICE _page_size = 1000 @classmethod def __gs_asset_to_asset(cls, gs_asset: GsAsset) -> Asset: asset_type = gs_asset.type.value asset_entity: Dict = json.loads(json.dumps(gs_asset.as_dict(), cls=JSONEncoder)) if asset_type in (GsAssetType.Single_Stock.value,): return Stock(gs_asset.id, gs_asset.name, gs_asset.exchange, gs_asset.currency, entity=asset_entity) if asset_type in (GsAssetType.ETF.value,): return ETF(gs_asset.id, gs_asset.assetClass, gs_asset.name, gs_asset.exchange, gs_asset.currency, entity=asset_entity) if asset_type in ( GsAssetType.Index.value, GsAssetType.Access.value, GsAssetType.Multi_Asset_Allocation.value, GsAssetType.Risk_Premia.value, GsAssetType.Systematic_Hedging.value): from gs_quant.markets.index import Index return Index(gs_asset.id, gs_asset.assetClass, gs_asset.name, gs_asset.exchange, gs_asset.currency, entity=asset_entity) if asset_type in ( GsAssetType.Custom_Basket.value, GsAssetType.Research_Basket.value): from gs_quant.markets.baskets import Basket return Basket(gs_asset=gs_asset) if asset_type in (GsAssetType.Future.value,): return Future(gs_asset.id, gs_asset.assetClass, gs_asset.name, gs_asset.currency, entity=asset_entity) if asset_type in (GsAssetType.Cross.value,): return Cross(gs_asset.id, gs_asset.name, entity=asset_entity, asset_class=gs_asset.assetClass) if asset_type in (GsAssetType.Currency.value,): return Currency(gs_asset.id, gs_asset.name, entity=asset_entity) if asset_type in (GsAssetType.Rate.value,): return Rate(gs_asset.id, gs_asset.name, entity=asset_entity) if asset_type in (GsAssetType.Cash.value,): return Cash(gs_asset.id, gs_asset.name, entity=asset_entity) if asset_type in (GsAssetType.WeatherIndex.value,): return WeatherIndex(gs_asset.id, gs_asset.name, entity=asset_entity) if asset_type in (GsAssetType.Swap.value,): return Swap(gs_asset.id, gs_asset.assetClass, gs_asset.name, entity=asset_entity) if asset_type in (GsAssetType.Option.value,): return Option(gs_asset.id, gs_asset.assetClass, gs_asset.name, entity=asset_entity) if asset_type in (GsAssetType.CommodityReferencePrice.value,): return CommodityReferencePrice(gs_asset.id, gs_asset.name, entity=asset_entity) if asset_type in (GsAssetType.CommodityNaturalGasHub.value,): return CommodityNaturalGasHub(gs_asset.id, gs_asset.name, entity=asset_entity) if asset_type in (GsAssetType.CommodityEUNaturalGasHub.value,): return CommodityEUNaturalGasHub(gs_asset.id, gs_asset.name, entity=asset_entity) if asset_type in (GsAssetType.CommodityPowerNode.value,): return CommodityPowerNode(gs_asset.id, gs_asset.name, entity=asset_entity) if asset_type in (GsAssetType.CommodityPowerAggregatedNodes.value,): return CommodityPowerAggregatedNodes(gs_asset.id, gs_asset.name, entity=asset_entity) if asset_type in (GsAssetType.Bond.value,): return Bond(gs_asset.id, gs_asset.name, gs_asset.assetClass or AssetClass.Credit, entity=asset_entity) if asset_type in (GsAssetType.Commodity.value,): return Commodity(gs_asset.id, gs_asset.name, entity=asset_entity) if asset_type in (GsAssetType.FutureMarket.value,): return FutureMarket(gs_asset.id, gs_asset.assetClass, gs_asset.name, entity=asset_entity) if asset_type in (GsAssetType.FutureContract.value,): return FutureContract(gs_asset.id, gs_asset.assetClass, gs_asset.name, entity=asset_entity) # workaround as casing is being migrated if asset_type == GsAssetType.Cryptocurrency.value: return Cryptocurrency(gs_asset.id, gs_asset.assetClass, gs_asset.name, entity=asset_entity) if asset_type == GsAssetType.Forward.value: return Forward(gs_asset.id, gs_asset.assetClass, gs_asset.name, entity=asset_entity) if asset_type == GsAssetType.Fund.value: return Fund(gs_asset.id, gs_asset.name, gs_asset.assetClass, entity=asset_entity) if asset_type == GsAssetType.Default_Swap.value: return DefaultSwap(gs_asset.id, gs_asset.asset_class, entity=asset_entity) if asset_type == GsAssetType.Swaption.value: return Swaption(gs_asset.id, gs_asset.name, entity=asset_entity) if asset_type == GsAssetType.Binary.value: return Binary(gs_asset.id, gs_asset.name, gs_asset.assetClass, entity=asset_entity) if asset_type == GsAssetType.XccySwapMTM.value: return XccySwapMTM(gs_asset.id, gs_asset.name, entity=asset_entity) if asset_type == GsAssetType.Mutual_Fund.value: return MutualFund(gs_asset.id, gs_asset.name, gs_asset.asset_class, entity=asset_entity) raise TypeError(f'unsupported asset type {asset_type}') @classmethod def __asset_type_to_gs_types(cls, asset_type: AssetType) -> Tuple[GsAssetType, ...]: asset_map = { AssetType.STOCK: (GsAssetType.Single_Stock,), AssetType.INDEX: ( GsAssetType.Index, GsAssetType.Multi_Asset_Allocation, GsAssetType.Risk_Premia, GsAssetType.Access), AssetType.ETF: (GsAssetType.ETF, GsAssetType.ETN), AssetType.CUSTOM_BASKET: (GsAssetType.Custom_Basket,), AssetType.RESEARCH_BASKET: (GsAssetType.Research_Basket,), AssetType.FUTURE: (GsAssetType.Future,), AssetType.RATE: (GsAssetType.Rate,), } return asset_map.get(asset_type) @classmethod def set_source(cls, source: SecurityMasterSource): cls._source = source @classmethod def get_asset(cls, id_value: str, id_type: Union[AssetIdentifier, SecurityIdentifier], as_of: Union[dt.date, dt.datetime] = None, exchange_code: ExchangeCode = None, asset_type: AssetType = None, sort_by_rank: bool = True, fields: Optional[List[str]] = None) -> Asset: """ Get an asset by identifier and identifier type :param id_value: identifier value :param id_type: identifier type :param exchange_code: exchange code :param asset_type: asset type :param as_of: As of date for query :param sort_by_rank: whether to sort assets by rank. This flag is ignored when using SecMasterContext :param fields: asset fields to return :return: Asset object or None **Usage** Get asset object using a specified identifier and identifier type. Where the identifiers are temporal (and can change over time), will use the current MarketContext to evaluate based on the specified date. **Examples** Get asset by bloomberg id: >>> gs = SecurityMaster.get_asset("GS UN", AssetIdentifier.BLOOMBERG_ID) Get asset by ticker and exchange code: >>> gs = SecurityMaster.get_asset("GS", AssetIdentifier.TICKER, exchange_code=ExchangeCode.NYSE) Get asset by ticker and asset type: >>> spx = SecurityMaster.get_asset("SPX", AssetIdentifier.TICKER, asset_type=AssetType.INDEX) **See also** :class:`AssetIdentifier` :func:`get_many_assets` """ if cls._source == SecurityMasterSource.SECURITY_MASTER: if not isinstance(id_type, SecurityIdentifier): raise MqTypeError('expected a security identifier') if exchange_code or asset_type: raise NotImplementedError('argument not implemented for Security Master (supported in Asset Service)') return cls._get_security_master_asset(id_value, id_type, as_of=as_of, fields=fields) if not as_of: current = PricingContext.current if not current.is_entered: with current: as_of = current.pricing_date else: as_of = current.pricing_date if isinstance(as_of, dt.date): as_of = dt.datetime.combine(as_of, dt.time(0, 0), pytz.utc) if id_type is AssetIdentifier.MARQUEE_ID: gs_asset = GsAssetApi.get_asset(id_value) return cls.__gs_asset_to_asset(gs_asset) query = {id_type.value.lower(): id_value} if exchange_code is not None: query['exchange'] = exchange_code.value if asset_type is not None: query['type'] = [t.value for t in cls.__asset_type_to_gs_types(asset_type)] if sort_by_rank: results = GsAssetApi.get_many_assets(as_of=as_of, return_type=dict, order_by=['>rank'], **query) result = get(results, '0') if result: result = GsAsset.from_dict(result) else: results = GsAssetApi.get_many_assets(as_of=as_of, **query) result = next(iter(results), None) if result: return cls.__gs_asset_to_asset(result) @classmethod def _get_security_master_asset(cls, id_value: str, id_type: SecurityIdentifier, as_of: Union[dt.date, dt.datetime] = None, fields: Optional[List[str]] = None) -> SecMasterAsset: as_of = as_of or datetime.datetime(2100, 1, 1) type_ = id_type.value params = { type_: id_value, 'asOfDate': as_of.strftime('%Y-%m-%d') # TODO: update endpoint to take times } if fields is not None: request_fields = { 'identifiers', 'assetClass', 'type', 'currency', 'exchange', 'id' } request_fields.update(fields) params['fields'] = request_fields r = GsSession.current._get('/markets/securities', payload=params) if r['totalResults'] == 0: return None asset_dict = r['results'][0] asset_id = asset_dict['identifiers'].get("assetId", None) # Converting dict to Asset Class asset_name = asset_dict.get('name', None) asset_exchange = asset_dict.get("exchange").get("name", None) if "exchange" in asset_dict else None asset_currency = asset_dict.get('currency', None) try: asset_type = AssetType(asset_dict['type']) asset_class = AssetClass(asset_dict['assetClass']) return SecMasterAsset(id_=asset_id, asset_type=asset_type, asset_class=asset_class, name=asset_name, exchange=asset_exchange, currency=asset_currency, entity=asset_dict) except ValueError: raise NotImplementedError(f"Not yet implemented for AssetType={asset_dict['type']}, " f"AssetClass={asset_dict['assetClass']}.") @classmethod def get_identifiers(cls, id_values: List[str], id_type: SecurityIdentifier, as_of: datetime.datetime = None, start: datetime.datetime = None, end: datetime.datetime = None) -> dict: """ Get identifiers for given assets. :param id_values: identifier values e.g. ['GS UN'] :param id_type: identifier type e.g. BBID :param as_of: point in time to use for resolving given ids to assets :param start: restrict results to ids updated after this time :param end: restrict results to ids updated before this time :return: dict from IDs (of id_type) to available identifiers """ if cls._source != SecurityMasterSource.SECURITY_MASTER: raise NotImplementedError("method not available when using Asset Service") as_of = as_of or datetime.datetime.now() start = start or datetime.datetime(1970, 1, 1) end = end or datetime.datetime(2100, 1, 1) type_ = id_type.value params = { type_: id_values, 'fields': ['id', 'identifiers'], 'asOfDate': as_of.strftime('%Y-%m-%d') # TODO: update endpoint to take times } r = GsSession.current._get('/markets/securities', payload=params) id_map = {} for asset in r['results']: id_map[asset['identifiers'][type_]] = asset['id'] if len(id_map) == 0: return {} output = {} for k, v in id_map.items(): r = GsSession.current._get(f'/markets/securities/{v}/identifiers') piece = [] for e in r['results']: time_str = e['updateTime'].split('.')[0] if time_str.endswith('Z'): time_str = time_str[0:-1] time = datetime.datetime.strptime(time_str, '%Y-%m-%dT%H:%M:%S') if start <= time <= end: piece.append(e) output[k] = piece return output @staticmethod def asset_type_to_str(asset_class: AssetClass, asset_type: AssetType): if asset_type == AssetType.STOCK: return "Common Stock" if asset_type == AssetType.INDEX and asset_class == AssetClass.Equity: return "Equity Index" return asset_type.value @classmethod def get_all_identifiers_gen(cls, class_: AssetClass = None, types: Optional[List[AssetType]] = None, as_of: datetime.datetime = None, *, id_type: SecurityIdentifier = SecurityIdentifier.ID, use_offset_key=True, sleep=0.5) -> Generator[dict, None, None]: """ Get identifiers for all matching assets. Returns a generator iterator so that the caller can load each page of results using next(). :param class_: if not None, restrict results to assets of given class e.g. Equity :param types: if not None, restrict results to given types e.g. Stock :param as_of: point in time for which identifiers are fetched :param id_type: identifier type to use for keys of results :param use_offset_key: whether to use offset keys for pagination (required for large result sets) :param sleep: seconds to sleep between API calls (to avoid server-side throttling) :return: a generator iterator that yields dicts from id (of the id_type) to available identifiers """ if cls._source != SecurityMasterSource.SECURITY_MASTER: raise NotImplementedError("method not available when using Asset Service") as_of = as_of or datetime.datetime.now() if types is not None: p = partial(cls.asset_type_to_str, class_) types = set(map(p, types)) params = { 'fields': ['id', 'identifiers', 'assetClass', 'type'], 'asOfDate': as_of.date(), 'limit': cls._page_size, 'type': types } while True: r = _get_with_retries('/markets/securities', params) if r['totalResults'] == 0: return output = {} for e in r['results']: # TODO: perform assetClass filtering on server side once assetClass is supported (just Equties is # supported for now) if class_ is None or e['assetClass'] == class_.value: box = e['identifiers'] box['id'] = e['id'] # copy top-level security id into result key = box[id_type.value] if key in box: _logger.debug(f'encountered duplicate key {key}') output[key] = box yield output if use_offset_key: if 'offsetKey' not in r: return params['offsetKey'] = r['offsetKey'] else: params['offset'] = params.get('offset', 0) + cls._page_size if params['offset'] + params['limit'] > 10000: _logger.warning('reached result size limit; enable use of offset keys to retrieve all results') return time.sleep(sleep) @classmethod def get_all_identifiers(cls, class_: AssetClass = None, types: Optional[List[AssetType]] = None, as_of: datetime.datetime = None, *, id_type: SecurityIdentifier = SecurityIdentifier.ID, use_offset_key=True, sleep=0.5) -> Dict[str, dict]: """ Get identifiers for all matching assets. :param class_: if not None, restrict results to assets of given class e.g. Equity :param types: if not None, restrict results to given types e.g. Stock :param as_of: point in time for which identifiers are fetched :param id_type: identifier type to use for keys of results :param use_offset_key: whether to use offset keys for pagination (required for large result sets) :param sleep: seconds to sleep between API calls (to avoid server-side throttling) :return: dict from id (of the id_type) to available identifiers """ gen = cls.get_all_identifiers_gen(class_, types, as_of, id_type=id_type, use_offset_key=use_offset_key, sleep=sleep) accumulator = dict() while True: try: accumulator.update(next(gen)) except StopIteration: return accumulator @classmethod def map_identifiers( cls, input_type: SecurityIdentifier, ids: Iterable[str], output_types: Iterable[SecurityIdentifier] = frozenset([SecurityIdentifier.GSID]), start_date: datetime.date = None, end_date: datetime.date = None, as_of_date: datetime.date = None ) -> Dict[datetime.date, dict]: """ Map to other identifier types, from given IDs. :param input_type: type of input IDs :param ids: security IDs :param output_types: types of IDs to map to :param start_date: first as-of date (defaults to current date) :param end_date: last as-of date (defaults to current date) :param as_of_date: an exact as-of date for mapping :return: dict containing mappings for as-of date(s) **Examples** Get CUSIP for GS UN: >>> result = SecurityMaster.map_identifiers(SecurityIdentifier.BBID, ["GS UN"], [SecurityIdentifier.CUSIP]) Get Bloomberg ticker for 104563 as-of a past date: >>> result = SecurityMaster.map_identifiers(SecurityIdentifier.GSID, ["104563"], [SecurityIdentifier.BBG], ... as_of_date=datetime.date(2021, 4, 19)) """ if isinstance(ids, str): raise MqTypeError("expected an iterable of strings e.g. list of strings") def get_asset_id_type(type_: SecurityIdentifier): try: return GsIdType[type_.value] except KeyError: raise MqValueError(f'unsupported type {type_.value}') if cls._source == SecurityMasterSource.ASSET_SERVICE: output_types = list(output_types) if len(output_types) != 1: raise MqValueError('provide exactly one output type') if (start_date or end_date) is not None: raise MqValueError('use as_of_date instead of start_date and/or end_date') if as_of_date is None: as_of_date = datetime.date.today() input_type = get_asset_id_type(input_type) output_type = get_asset_id_type(output_types[0]) as_of = None if as_of_date is None else datetime.datetime.combine(as_of_date, datetime.time(tzinfo=pytz.UTC)) result = GsAssetApi.map_identifiers(input_type, output_type, list(ids), as_of=as_of, multimap=True) if len(result) == 0: return result inner = {k: {output_type.name: v} for k, v in result.items()} return {as_of.strftime('%Y-%m-%d'): inner} assert cls._source == SecurityMasterSource.SECURITY_MASTER params = { input_type.value: list(ids), 'toIdentifiers': [identifier.value for identifier in output_types], 'compact': True } if as_of_date is not None: if (start_date or end_date) is not None: raise MqValueError('provide (start date / end date) or as-of date, but not both') params['startDate'] = as_of_date params['endDate'] = as_of_date if start_date is not None: params['startDate'] = start_date if end_date is not None: params['endDate'] = end_date r = _get_with_retries('/markets/securities/map', params) results = r['results'] if isinstance(results, dict): return results output = dict() date_format = '%Y-%m-%d' date_delta = datetime.timedelta(days=1) for row in results: current = datetime.datetime.strptime(row['startDate'], date_format) end = datetime.datetime.strptime(row['endDate'], date_format) while current <= end: outer = output.setdefault(current, dict()) inner = outer.setdefault(row["input"], dict()) output_type = row["outputType"] output_value = row["outputValue"] if output_type == "ric": if SecurityIdentifier.RIC in output_types: values = inner.setdefault('ric', []) if output_value not in values: values.append(output_value) if SecurityIdentifier.ASSET_ID in output_types and "assetId" in row: values = inner.setdefault('assetId', []) asset_id = row['assetId'] if asset_id not in values: values.append(asset_id) elif output_type == "bbg": if SecurityIdentifier.BBG in output_types: if SecurityIdentifier.BBG.value not in inner: inner[SecurityIdentifier.BBG.value] = [] inner[SecurityIdentifier.BBG.value].append(output_value) if SecurityIdentifier.BBID in output_types: exchange = row.get('exchange') if SecurityIdentifier.BBID.value not in inner: inner[SecurityIdentifier.BBID.value] = [] if exchange is not None: inner[SecurityIdentifier.BBID.value].append(f"{output_value} {exchange}") else: inner[SecurityIdentifier.BBID.value].append(f"{output_value}") if SecurityIdentifier.BCID in output_types: composite_exchange = row.get('compositeExchange') if composite_exchange is not None: if SecurityIdentifier.BCID.value not in inner: inner[SecurityIdentifier.BCID.value] = [] inner[SecurityIdentifier.BCID.value].append(f"{output_value} {composite_exchange}") else: if SecurityIdentifier(output_type) in output_types: if output_type not in inner: inner[output_type] = [] if output_value not in inner[output_type]: inner[output_type].append(output_value) current += date_delta # much faster to run strftime (once for each date) at the end return {k.strftime(date_format): v for k, v in output.items()}