from __future__ import print_function
import random
import matplotlib
import numpy as np
import pandas as pd
import scipy.stats
import sklearn.cluster
import sklearn.covariance
import sklearn.manifold
from matplotlib import pyplot as plt # noqa
from pandas.core.base import PandasObject
from scipy.optimize import minimize
from scipy.stats import t
from sklearn.utils import resample
from tabulate import tabulate
from . import utils
from .utils import fmtn, fmtp, fmtpn, get_freq_name
try:
import seaborn as sns
sns.set(style="ticks", palette="Set2")
except ImportError:
pass
# module level variable, can be different for non traditional markets (eg. crypto - 360)
TRADING_DAYS_PER_YEAR = 252
[docs]class GroupStats(dict):
"""
GroupStats enables one to compare multiple series side by side.
It is a wrapper around a dict of {price.name: PerformanceStats} and
provides many convenience methods.
The order of the series passed in will be preserved.
Individual PerformanceStats objects can be accessed via index
position or name via the [] accessor.
Args:
* prices (Series): Multiple price series to be compared.
Attributes:
* stats (DataFrame): Dataframe containing stats for each
series provided. Stats in rows, series in columns.
* lookback_returns (DataFrame): Returns for diffrent
lookback periods (1m, 3m, 6m, ytd...)
Period in rows, series in columns.
* prices (DataFrame): The merged and rebased prices.
"""
def __init__(self, *prices):
names = []
for p in prices:
if isinstance(p, pd.DataFrame):
names.extend(p.columns)
elif isinstance(p, pd.Series):
names.append(p.name)
else:
print("else")
names.append(getattr(p, "name", "n/a"))
self._names = names
# store original prices
self._prices = merge(*prices).dropna()
# proper ordering
self._prices = self._prices[self._names]
# check for duplicate columns
if len(self._prices.columns) != len(set(self._prices.columns)):
raise ValueError(
"One or more data series provided",
"have same name! Please provide unique names",
)
self._start = self._prices.index[0]
self._end = self._prices.index[-1]
# calculate stats for entire series
self._update(self._prices)
def __getitem__(self, key):
if isinstance(key, int):
# if type(key) == int:
return self[self._names[key]]
else:
return self.get(key)
def _update(self, data):
self._calculate(data)
self._update_stats()
def _calculate(self, data):
self.prices = data
for c in data.columns:
prc = data[c]
self[c] = PerformanceStats(prc)
def _stats(self):
stats = [
("start", "Start", "dt"),
("end", "End", "dt"),
("rf", "Risk-free rate", "p"),
(None, None, None),
("total_return", "Total Return", "p"),
("daily_sharpe", "Daily Sharpe", "n"),
("daily_sortino", "Daily Sortino", "n"),
("cagr", "CAGR", "p"),
("max_drawdown", "Max Drawdown", "p"),
("calmar", "Calmar Ratio", "n"),
(None, None, None),
("mtd", "MTD", "p"),
("three_month", "3m", "p"),
("six_month", "6m", "p"),
("ytd", "YTD", "p"),
("one_year", "1Y", "p"),
("three_year", "3Y (ann.)", "p"),
("five_year", "5Y (ann.)", "p"),
("ten_year", "10Y (ann.)", "p"),
("incep", "Since Incep. (ann.)", "p"),
(None, None, None),
("daily_sharpe", "Daily Sharpe", "n"),
("daily_sortino", "Daily Sortino", "n"),
("daily_mean", "Daily Mean (ann.)", "p"),
("daily_vol", "Daily Vol (ann.)", "p"),
("daily_skew", "Daily Skew", "n"),
("daily_kurt", "Daily Kurt", "n"),
("best_day", "Best Day", "p"),
("worst_day", "Worst Day", "p"),
(None, None, None),
("monthly_sharpe", "Monthly Sharpe", "n"),
("monthly_sortino", "Monthly Sortino", "n"),
("monthly_mean", "Monthly Mean (ann.)", "p"),
("monthly_vol", "Monthly Vol (ann.)", "p"),
("monthly_skew", "Monthly Skew", "n"),
("monthly_kurt", "Monthly Kurt", "n"),
("best_month", "Best Month", "p"),
("worst_month", "Worst Month", "p"),
(None, None, None),
("yearly_sharpe", "Yearly Sharpe", "n"),
("yearly_sortino", "Yearly Sortino", "n"),
("yearly_mean", "Yearly Mean", "p"),
("yearly_vol", "Yearly Vol", "p"),
("yearly_skew", "Yearly Skew", "n"),
("yearly_kurt", "Yearly Kurt", "n"),
("best_year", "Best Year", "p"),
("worst_year", "Worst Year", "p"),
(None, None, None),
("avg_drawdown", "Avg. Drawdown", "p"),
("avg_drawdown_days", "Avg. Drawdown Days", "n"),
("avg_up_month", "Avg. Up Month", "p"),
("avg_down_month", "Avg. Down Month", "p"),
("win_year_perc", "Win Year %", "p"),
("twelve_month_win_perc", "Win 12m %", "p"),
]
return stats
def _update_stats(self):
# lookback returns dataframe
self.lookback_returns = pd.DataFrame(
{x.lookback_returns.name: x.lookback_returns for x in self.values()}
)
self.stats = pd.DataFrame({x.name: x.stats for x in self.values()})
def _get_default_plot_title(self, freq, kind):
if freq is None:
return "%s" % kind
else:
return "%s %s" % (get_freq_name(freq), kind)
[docs] def set_riskfree_rate(self, rf):
"""
Set annual `risk-free rate <https://www.investopedia.com/terms/r/risk-freerate.asp>`_ property and calculate properly annualized
monthly and daily rates. Then performance stats are recalculated.
Affects only those instances of PerformanceStats that are children of
this GroupStats object.
Args:
* rf (float, Series): Annual risk-free rate or risk-free rate price series
"""
for key in self._names:
self[key].set_riskfree_rate(rf)
# calculate stats for entire series
self._update_stats()
[docs] def set_date_range(self, start=None, end=None):
"""
Update date range of stats, charts, etc. If None then
the original date range is used. So to reset to the original
range, just call with no args.
Args:
* start (date): start date
* end (end): end date
"""
start = self._start if start is None else pd.to_datetime(start)
end = self._end if end is None else pd.to_datetime(end)
self._update(self._prices.loc[start:end])
[docs] def display(self):
"""
Display summary stats table.
"""
data = []
first_row = ["Stat"]
first_row.extend(self._names)
data.append(first_row)
stats = self._stats()
for stat in stats:
k, n, f = stat
# blank row
if k is None:
row = [""] * len(data[0])
data.append(row)
continue
row = [n]
for key in self._names:
raw = getattr(self[key], k)
# if rf is a series print nan
if k == "rf" and not type(raw) == float:
row.append(np.nan)
elif f is None:
row.append(raw)
elif f == "p":
row.append(fmtp(raw))
elif f == "n":
row.append(fmtn(raw))
elif f == "dt":
row.append(raw.strftime("%Y-%m-%d"))
else:
raise NotImplementedError("unsupported format %s" % f)
data.append(row)
print(tabulate(data, headers="firstrow"))
[docs] def display_lookback_returns(self):
"""
Displays the current lookback returns for each series.
"""
return self.lookback_returns.apply(lambda x: x.map("{:,.2%}".format), axis=1)
[docs] def plot(self, freq=None, figsize=(15, 5), title=None, logy=False, **kwargs):
"""
Helper function for plotting the series.
Args:
* freq (str): Data frequency used for display purposes.
Refer to pandas docs for valid freq strings.
* figsize ((x,y)): figure size
* title (str): Title if default not appropriate
* logy (bool): log-scale for y axis
* kwargs: passed to pandas' plot method
"""
if title is None:
title = self._get_default_plot_title(freq, "Equity Progression")
ser = self._get_series(freq).rebase()
return ser.plot(figsize=figsize, logy=logy, title=title, **kwargs)
[docs] def plot_scatter_matrix(self, freq=None, title=None, figsize=(10, 10), **kwargs):
"""
Wrapper around pandas' scatter_matrix.
Args:
* freq (str): Data frequency used for display purposes.
Refer to pandas docs for valid freq strings.
* figsize ((x,y)): figure size
* title (str): Title if default not appropriate
* kwargs: passed to pandas' scatter_matrix method
"""
if title is None:
title = self._get_default_plot_title(freq, "Return Scatter Matrix")
plt.figure()
ser = self._get_series(freq).to_returns().dropna()
if hasattr(pd, "scatter_matrix"):
pd.scatter_matrix(ser, figsize=figsize, **kwargs)
else:
pd.plotting.scatter_matrix(ser, figsize=figsize, **kwargs)
return plt.suptitle(title)
[docs] def plot_histograms(self, freq=None, title=None, figsize=(10, 10), **kwargs):
"""
Wrapper around pandas' hist.
Args:
* freq (str): Data frequency used for display purposes.
Refer to pandas docs for valid freq strings.
* figsize ((x,y)): figure size
* title (str): Title if default not appropriate
* kwargs: passed to pandas' hist method
"""
if title is None:
title = self._get_default_plot_title(freq, "Return Histogram Matrix")
plt.figure()
ser = self._get_series(freq).to_returns().dropna()
ser.hist(figsize=figsize, **kwargs)
return plt.suptitle(title)
[docs] def plot_correlation(self, freq=None, title=None, figsize=(12, 6), **kwargs):
"""
Utility function to plot correlations.
Args:
* freq (str): Pandas data frequency alias string
* title (str): Plot title
* figsize (tuple (x,y)): figure size
* kwargs: passed to Pandas' plot_corr_heatmap function
"""
if title is None:
title = self._get_default_plot_title(freq, "Return Correlation Matrix")
rets = self._get_series(freq).to_returns().dropna()
return rets.plot_corr_heatmap(title=title, figsize=figsize, **kwargs)
def _get_series(self, freq):
if freq is None:
return self.prices
if freq == "y":
freq = "a"
return self.prices.asfreq(freq, "ffill")
[docs] def to_csv(self, sep=",", path=None):
"""
Returns a CSV string with appropriate formatting.
If path is not None, the string will be saved to file
at path.
Args:
* sep (char): Separator
* path (str): If None, CSV string returned. Else file
written to specified path.
"""
data = []
first_row = ["Stat"]
first_row.extend(self._names)
data.append(sep.join(first_row))
stats = self._stats()
for stat in stats:
k, n, f = stat
# blank row
if k is None:
row = [""] * len(data[0])
data.append(sep.join(row))
continue
row = [n]
for key in self._names:
raw = getattr(self[key], k)
if f is None:
row.append(raw)
elif f == "p":
row.append(fmtp(raw))
elif f == "n":
row.append(fmtn(raw))
elif f == "dt":
row.append(raw.strftime("%Y-%m-%d"))
else:
raise NotImplementedError("unsupported format %s" % f)
data.append(sep.join(row))
res = "\n".join(data)
if path is not None:
with open(path, "w") as fl:
fl.write(res)
else:
return res
[docs]def to_returns(prices):
"""
Calculates the simple arithmetic returns of a price series.
Formula is: (t1 / t0) - 1
Args:
* prices: Expects a price series
"""
return prices / prices.shift(1) - 1
[docs]def to_log_returns(prices):
"""
Calculates the log returns of a price series.
Formula is: ln(p1/p0)
Args:
* prices: Expects a price series
"""
return np.log(prices / prices.shift(1))
[docs]def to_price_index(returns, start=100):
"""
Returns a price index given a series of returns.
Args:
* returns: Expects a return series
* start (number): Starting level
Assumes arithmetic returns.
Formula is: cumprod (1+r)
"""
return (returns.replace(to_replace=np.nan, value=0) + 1).cumprod() * start
[docs]def rebase(prices, value=100):
"""
Rebase all series to a given intial value.
This makes comparing/plotting different series
together easier.
Args:
* prices: Expects a price series
* value (number): starting value for all series.
"""
return prices / prices.iloc[0] * value
[docs]def calc_perf_stats(prices):
"""
Calculates the performance statistics given an object.
The object should be a Series of prices.
A PerformanceStats object will be returned containing all the stats.
Args:
* prices (Series): Series of prices
"""
return PerformanceStats(prices)
[docs]def calc_stats(prices):
"""
Calculates performance stats of a given object.
If object is Series, a PerformanceStats object is
returned. If object is DataFrame, a GroupStats object
is returned.
Args:
* prices (Series, DataFrame): Set of prices
"""
if isinstance(prices, pd.Series):
return PerformanceStats(prices)
elif isinstance(prices, pd.DataFrame):
return GroupStats(*[prices[x] for x in prices.columns])
else:
raise NotImplementedError("Unsupported type")
[docs]def to_drawdown_series(prices):
"""
Calculates the `drawdown <https://www.investopedia.com/terms/d/drawdown.asp>`_ series.
This returns a series representing a drawdown.
When the price is at all time highs, the drawdown
is 0. However, when prices are below high water marks,
the drawdown series = current / hwm - 1
The max drawdown can be obtained by simply calling .min()
on the result (since the drawdown series is negative)
Method ignores all gaps of NaN's in the price series.
Args:
* prices (Series or DataFrame): Series of prices.
"""
# make a copy so that we don't modify original data
drawdown = prices.copy()
# Fill NaN's with previous values
drawdown = drawdown.fillna(method="ffill")
# Ignore problems with NaN's in the beginning
drawdown[np.isnan(drawdown)] = -np.Inf
# Rolling maximum
if isinstance(drawdown, pd.DataFrame):
roll_max = pd.DataFrame()
for col in drawdown:
roll_max[col] = np.maximum.accumulate(drawdown[col])
else:
roll_max = np.maximum.accumulate(drawdown)
drawdown = drawdown / roll_max - 1.0
return drawdown
[docs]def calc_mtd(daily_prices, monthly_prices):
"""
Calculates mtd return of a price series.
Use daily_prices if prices are only available from same month
else use monthly_prices
"""
if len(monthly_prices) == 1:
return daily_prices[-1] / daily_prices[0] - 1
else:
return daily_prices[-1] / monthly_prices[-2] - 1
[docs]def calc_ytd(daily_prices, yearly_prices):
"""
Calculates ytd return of a price series.
Use daily_prices if prices are only available from same year
else use yearly_prices
"""
if len(yearly_prices) == 1:
return daily_prices[-1] / daily_prices[0] - 1
else:
return daily_prices[-1] / yearly_prices[-2] - 1
[docs]def calc_max_drawdown(prices):
"""
Calculates the max drawdown of a price series. If you want the
actual drawdown series, please use to_drawdown_series.
"""
return (prices / prices.expanding(min_periods=1).max()).min() - 1
[docs]def drawdown_details(drawdown, index_type=pd.DatetimeIndex):
"""
Returns a data frame with start, end, days (duration) and
drawdown for each drawdown in a drawdown series.
.. note::
days are actual calendar days, not trading days
Args:
* drawdown (pandas.Series): A drawdown Series
(can be obtained w/ drawdown(prices).
Returns:
* pandas.DataFrame -- A data frame with the following
columns: start, end, days, drawdown.
"""
is_zero = drawdown == 0
# find start dates (first day where dd is non-zero after a zero)
start = ~is_zero & is_zero.shift(1)
start = list(start[start == True].index) # NOQA
# find end dates (first day where dd is 0 after non-zero)
end = is_zero & (~is_zero).shift(1)
end = list(end[end == True].index) # NOQA
if len(start) == 0: # start.empty
return None
# drawdown has no end (end period in dd)
if len(end) == 0: # end.empty
end.append(drawdown.index[-1])
# if the first drawdown start is larger than the first drawdown end it
# means the drawdown series begins in a drawdown and therefore we must add
# the first index to the start series
if start[0] > end[0]:
start.insert(0, drawdown.index[0])
# if the last start is greater than the end then we must add the last index
# to the end series since the drawdown series must finish with a drawdown
if start[-1] > end[-1]:
end.append(drawdown.index[-1])
result = pd.DataFrame(
columns=("Start", "End", "Length", "drawdown"), index=range(0, len(start))
)
for i in range(0, len(start)):
dd = drawdown[start[i] : end[i]].min()
if index_type is pd.DatetimeIndex:
result.iloc[i] = (start[i], end[i], (end[i] - start[i]).days, dd)
else:
result.iloc[i] = (start[i], end[i], (end[i] - start[i]), dd)
return result
[docs]def calc_cagr(prices):
"""
Calculates the `CAGR (compound annual growth rate) <https://www.investopedia.com/terms/c/cagr.asp>`_ for a given price series.
Args:
* prices (pandas.Series): A Series of prices.
Returns:
* float -- cagr.
"""
start = prices.index[0]
end = prices.index[-1]
return (prices.iloc[-1] / prices.iloc[0]) ** (1 / year_frac(start, end)) - 1
[docs]def calc_risk_return_ratio(returns):
"""
Calculates the return / risk ratio. Basically the
`Sharpe ratio <https://www.investopedia.com/terms/s/sharperatio.asp>`_ without factoring in the `risk-free rate <https://www.investopedia.com/terms/r/risk-freerate.asp>`_.
"""
return calc_sharpe(returns)
[docs]def calc_sharpe(returns, rf=0.0, nperiods=None, annualize=True):
"""
Calculates the `Sharpe ratio <https://www.investopedia.com/terms/s/sharperatio.asp>`_
(see `Sharpe vs. Sortino <https://www.investopedia.com/ask/answers/010815/what-difference-between-sharpe-ratio-and-sortino-ratio.asp>`_).
If rf is non-zero and a float, you must specify nperiods. In this case, rf is assumed
to be expressed in yearly (annualized) terms.
Args:
* returns (Series, DataFrame): Input return series
* rf (float, Series): `Risk-free rate <https://www.investopedia.com/terms/r/risk-freerate.asp>`_ expressed as a yearly (annualized) return or return series
* nperiods (int): Frequency of returns (252 for daily, 12 for monthly,
etc.)
"""
if nperiods is None:
nperiods = infer_freq(returns)
if isinstance(rf, float) and rf != 0 and nperiods is None:
raise Exception("Must provide nperiods if rf != 0")
er = returns.to_excess_returns(rf, nperiods=nperiods)
std = np.std(er, ddof=1)
res = np.divide(er.mean(), std)
if annualize:
if nperiods is None:
nperiods = 1
return res * np.sqrt(nperiods)
else:
return res
[docs]def calc_prob_mom(returns, other_returns):
"""
`Probabilistic momentum <http://cssanalytics.wordpress.com/2014/01/28/are-simple-momentum-strategies-too-dumb-introducing-probabilistic-momentum/>`_ (see `momentum investing <https://www.investopedia.com/terms/m/momentum_investing.asp>`_)
Basically the "probability or confidence that one asset
is going to outperform the other".
Source:
http://cssanalytics.wordpress.com/2014/01/28/are-simple-momentum-strategies-too-dumb-introducing-probabilistic-momentum/ # NOQA
"""
return t.cdf(returns.calc_information_ratio(other_returns), len(returns) - 1)
[docs]def calc_total_return(prices):
"""
Calculates the total return of a series.
last / first - 1
"""
return (prices.iloc[-1] / prices.iloc[0]) - 1
[docs]def year_frac(start, end):
"""
Similar to excel's yearfrac function. Returns
a year fraction between two dates (i.e. 1.53 years).
Approximation using the average number of seconds
in a year.
Args:
* start (datetime): start date
* end (datetime): end date
"""
if start > end:
raise ValueError("start cannot be larger than end")
# obviously not perfect but good enough
return (end - start).total_seconds() / (31557600)
[docs]def merge(*series):
"""
Merge Series and/or DataFrames together.
Returns a DataFrame.
"""
dfs = []
for s in series:
if isinstance(s, pd.DataFrame):
dfs.append(s)
elif isinstance(s, pd.Series):
tmpdf = pd.DataFrame({s.name: s})
dfs.append(tmpdf)
else:
raise NotImplementedError("Unsupported merge type")
return pd.concat(dfs, axis=1)
[docs]def drop_duplicate_cols(df):
"""
Removes duplicate columns from a dataframe
and keeps column w/ longest history
"""
names = set(df.columns)
for n in names:
if len(df[n].shape) > 1:
# get subset of df w/ colname n
sub = df[n]
# make unique colnames
sub.columns = ["%s-%s" % (n, x) for x in range(sub.shape[1])]
# get colname w/ max # of data
keep = sub.count().idxmax()
# drop all columns of name n from original df
del df[n]
# update original df w/ longest col with name n
df[n] = sub[keep]
return df
[docs]def to_monthly(series, method="ffill", how="end"):
"""
Convenience method that wraps asfreq_actual
with 'M' param (method='ffill', how='end').
"""
return series.asfreq_actual("M", method=method, how=how)
[docs]def asfreq_actual(series, freq, method="ffill", how="end", normalize=False):
"""
Similar to pandas' asfreq but keeps the actual dates.
For example, if last data point in Jan is on the 29th,
that date will be used instead of the 31st.
"""
orig = series
is_series = False
if isinstance(series, pd.Series):
is_series = True
name = series.name if series.name else "data"
orig = pd.DataFrame({name: series})
# add date column
t = pd.concat(
[orig, pd.DataFrame({"dt": orig.index.values}, index=orig.index.values)], axis=1
)
# fetch dates
dts = t.asfreq(freq=freq, method=method, how=how, normalize=normalize)["dt"]
res = orig.loc[dts.values]
if is_series:
return res[name]
else:
return res
[docs]def calc_inv_vol_weights(returns):
"""
Calculates weights proportional to inverse volatility of each column.
Returns weights that are inversely proportional to the column's
volatility resulting in a set of portfolio weights where each position
has the same level of volatility.
Note, that assets with returns all equal to NaN or 0 are excluded from
the portfolio (their weight is set to NaN).
Returns:
Series {col_name: weight}
"""
# calc vols
vol = np.divide(1.0, np.std(returns, ddof=1))
vol[np.isinf(vol)] = np.NaN
volsum = vol.sum()
return np.divide(vol, volsum)
[docs]def calc_mean_var_weights(
returns, weight_bounds=(0.0, 1.0), rf=0.0, covar_method="ledoit-wolf", options=None
):
"""
Calculates the mean-variance weights given a DataFrame of returns.
Args:
* returns (DataFrame): Returns for multiple securities.
* weight_bounds ((low, high)): Weigh limits for optimization.
* rf (float): `Risk-free rate <https://www.investopedia.com/terms/r/risk-freerate.asp>`_ used in utility calculation
* covar_method (str): Covariance matrix estimation method.
Currently supported:
- `ledoit-wolf <http://www.ledoit.net/honey.pdf>`_
- standard
* options (dict): options for minimizing, e.g. {'maxiter': 10000 }
Returns:
Series {col_name: weight}
"""
def fitness(weights, exp_rets, covar, rf):
# portfolio mean
mean = sum(exp_rets * weights)
# portfolio var
var = np.dot(np.dot(weights, covar), weights)
# utility - i.e. sharpe ratio
util = (mean - rf) / np.sqrt(var)
# negative because we want to maximize and optimizer
# minimizes metric
return -util
n = len(returns.columns)
# expected return defaults to mean return by default
exp_rets = returns.mean()
# calc covariance matrix
if covar_method == "ledoit-wolf":
covar = sklearn.covariance.ledoit_wolf(returns)[0]
elif covar_method == "standard":
covar = returns.cov()
else:
raise NotImplementedError("covar_method not implemented")
weights = np.ones([n]) / n
bounds = [weight_bounds for i in range(n)]
# sum of weights must be equal to 1
constraints = {"type": "eq", "fun": lambda W: sum(W) - 1.0}
optimized = minimize(
fitness,
weights,
(exp_rets, covar, rf),
method="SLSQP",
constraints=constraints,
bounds=bounds,
options=options,
)
# check if success
if not optimized.success:
raise Exception(optimized.message)
# return weight vector
return pd.Series({returns.columns[i]: optimized.x[i] for i in range(n)})
def _erc_weights_slsqp(x0, cov, b, maximum_iterations, tolerance):
"""
Calculates the equal risk contribution / risk parity weights given
a DataFrame of returns.
Args:
* x0 (np.array): Starting asset weights.
* cov (np.array): covariance matrix.
* b (np.array): Risk target weights. By definition target total risk contributions are all equal which makes this redundant.
* maximum_iterations (int): Maximum iterations in iterative solutions.
* tolerance (float): Tolerance level in iterative solutions.
Returns:
np.array {weight}
You can read more about ERC at
http://thierry-roncalli.com/download/erc.pdf
"""
def fitness(weights, covar):
# total risk contributions
# trc = weights*np.matmul(covar,weights)/np.sqrt(np.matmul(weights.T,np.matmul(covar,weights)))
# instead of using the true definition for trc we will use the optimization on page 5
trc = weights * np.matmul(covar, weights)
n = len(trc)
# sum of squared differences of total risk contributions
sse = 0.0
for i in range(n):
for j in range(n):
# switched from squared deviations to absolute deviations to avoid numerical instability
sse += np.abs(trc[i] - trc[j])
# minimizes metric
return sse
# nonnegative
bounds = [(0, None) for i in range(len(x0))]
# sum of weights must be equal to 1
constraints = {"type": "eq", "fun": lambda W: sum(W) - 1.0}
options = {"maxiter": maximum_iterations}
optimized = minimize(
fitness,
x0,
(cov),
method="SLSQP",
constraints=constraints,
bounds=bounds,
options=options,
tol=tolerance,
)
# check if success
if not optimized.success:
raise Exception(optimized.message)
# return weight vector
return optimized.x
def _erc_weights_ccd(x0, cov, b, maximum_iterations, tolerance):
"""
Calculates the equal risk contribution / risk parity weights given
a DataFrame of returns.
Args:
* x0 (np.array): Starting asset weights.
* cov (np.array): covariance matrix.
* b (np.array): Risk target weights.
* maximum_iterations (int): Maximum iterations in iterative solutions.
* tolerance (float): Tolerance level in iterative solutions.
Returns:
np.array {weight}
Reference:
Griveau-Billion, Theophile and Richard, Jean-Charles and Roncalli,
Thierry, A Fast Algorithm for Computing High-Dimensional Risk Parity
Portfolios (2013).
Available at SSRN: https://ssrn.com/abstract=2325255
"""
n = len(x0)
x = x0.copy()
var = np.diagonal(cov)
ctr = cov.dot(x)
sigma_x = np.sqrt(x.T.dot(ctr))
for iteration in range(maximum_iterations):
for i in range(n):
alpha = var[i]
beta = ctr[i] - x[i] * alpha
gamma = -b[i] * sigma_x
x_tilde = (-beta + np.sqrt(beta * beta - 4 * alpha * gamma)) / (2 * alpha)
x_i = x[i]
ctr = ctr - cov[i] * x_i + cov[i] * x_tilde
sigma_x = sigma_x * sigma_x - 2 * x_i * cov[i].dot(x) + x_i * x_i * var[i]
x[i] = x_tilde
sigma_x = np.sqrt(
sigma_x + 2 * x_tilde * cov[i].dot(x) - x_tilde * x_tilde * var[i]
)
# check convergence
if np.power((x - x0) / x.sum(), 2).sum() < tolerance:
return x / x.sum()
x0 = x.copy()
# no solution found
raise ValueError(
"No solution found after {0} iterations.".format(maximum_iterations)
)
[docs]def calc_erc_weights(
returns,
initial_weights=None,
risk_weights=None,
covar_method="ledoit-wolf",
risk_parity_method="ccd",
maximum_iterations=100,
tolerance=1e-8,
):
"""
Calculates the equal risk contribution / risk parity weights given a
DataFrame of returns.
Args:
* returns (DataFrame): Returns for multiple securities.
* initial_weights (list): Starting asset weights [default inverse vol].
* risk_weights (list): Risk target weights [default equal weight].
* covar_method (str): Covariance matrix estimation method.
Currently supported:
- `ledoit-wolf <http://www.ledoit.net/honey.pdf>`_ [default]
- standard
* risk_parity_method (str): Risk parity estimation method.
Currently supported:
- ccd (cyclical coordinate descent)[default]
- slsqp (scipy's implementation of sequential least squares programming)
* maximum_iterations (int): Maximum iterations in iterative solutions.
* tolerance (float): Tolerance level in iterative solutions.
Returns:
Series {col_name: weight}
"""
n = len(returns.columns)
# calc covariance matrix
if covar_method == "ledoit-wolf":
covar = sklearn.covariance.ledoit_wolf(returns)[0]
elif covar_method == "standard":
covar = returns.cov().values
else:
raise NotImplementedError("covar_method not implemented")
# initial weights (default to inverse vol)
if initial_weights is None:
inv_vol = 1.0 / np.sqrt(np.diagonal(covar))
initial_weights = inv_vol / inv_vol.sum()
# default to equal risk weight
if risk_weights is None:
risk_weights = np.ones(n) / n
# calc risk parity weights matrix
if risk_parity_method == "ccd":
# cyclical coordinate descent implementation
erc_weights = _erc_weights_ccd(
initial_weights, covar, risk_weights, maximum_iterations, tolerance
)
elif risk_parity_method == "slsqp":
# scipys slsqp optimizer
erc_weights = _erc_weights_slsqp(
initial_weights, covar, risk_weights, maximum_iterations, tolerance
)
else:
raise NotImplementedError("risk_parity_method not implemented")
# return erc weights vector
return pd.Series(erc_weights, index=returns.columns, name="erc")
[docs]def get_num_days_required(
offset, period="d", perc_required=0.90, annualization_factor=252
):
"""
Estimates the number of days required to assume that data is OK.
Helper function used to determine if there are enough "good" data
days over a given period.
Args:
* offset (DateOffset): Offset (lookback) period.
* period (str): Period string.
* perc_required (float): percentage of number of days
expected required.
"""
x = pd.to_datetime("2010-01-01")
delta = x - (x - offset)
# convert to 'trading days' - rough guestimate
days = delta.days * 0.69
if period == "d":
req = days * perc_required
elif period == "m":
req = (days / 20) * perc_required
elif period == "y":
req = (days / annualization_factor) * perc_required
else:
raise NotImplementedError("period not supported. Supported periods are d, m, y")
return req
[docs]def calc_clusters(returns, n=None, plot=False):
"""
Calculates the clusters based on k-means
clustering.
Args:
* returns (pd.DataFrame): DataFrame of returns
* n (int): Specify # of clusters. If None, this
will be automatically determined
* plot (bool): Show plot?
Returns:
* dict with structure: {cluster# : [col names]}
"""
# calculate correlation
corr = returns.corr()
# calculate dissimilarity matrix
diss = 1 - corr
# scale down to 2 dimensions using MDS
# (multi-dimensional scaling) using the
# dissimilarity matrix
mds = sklearn.manifold.MDS(dissimilarity="precomputed")
xy = mds.fit_transform(diss)
def routine(k):
# fit KMeans
km = sklearn.cluster.KMeans(n_clusters=k)
km_fit = km.fit(xy)
labels = km_fit.labels_
centers = km_fit.cluster_centers_
# get {ticker: label} mappings
mappings = dict(zip(returns.columns, labels))
# print % of var explained
totss = 0
withinss = 0
# column average fot totss
avg = np.array([np.mean(xy[:, 0]), np.mean(xy[:, 1])])
for idx, lbl in enumerate(labels):
withinss += sum((xy[idx] - centers[lbl]) ** 2)
totss += sum((xy[idx] - avg) ** 2)
pvar_expl = 1.0 - withinss / totss
return mappings, pvar_expl, labels
if n:
result = routine(n)
else:
n = len(returns.columns)
n1 = int(np.ceil(n * 0.6666666666))
for i in range(2, n1 + 1):
result = routine(i)
if result[1] > 0.9:
break
if plot:
fig, ax = plt.subplots()
ax.scatter(xy[:, 0], xy[:, 1], c=result[2], s=90)
for i, txt in enumerate(returns.columns):
ax.annotate(txt, (xy[i, 0], xy[i, 1]), size=14)
# sanitize return value
tmp = result[0]
# map as such {cluster: [list of tickers], cluster2: [...]}
inv_map = {}
for k, v in tmp.items():
inv_map[v] = inv_map.get(v, [])
inv_map[v].append(k)
return inv_map
[docs]def calc_ftca(returns, threshold=0.5):
"""
Implementation of David Varadi's `Fast Threshold Clustering Algorithm (FTCA) <http://cssanalytics.wordpress.com/2013/11/26/fast-threshold-clustering-algorithm-ftca/>`_.
http://cssanalytics.wordpress.com/2013/11/26/fast-threshold-clustering-algorithm-ftca/ # NOQA
More stable than k-means for clustering purposes.
If you want more clusters, use a higher threshold.
Args:
* returns - expects a pandas dataframe of returns where
each column is the name of a given security.
* threshold (float): Threshold parameter - use higher value
for more clusters. Basically controls how similar
(correlated) series have to be.
Returns:
dict of cluster name (a number) and list of securities in cluster
"""
# cluster index (name)
i = 0
# correlation matrix
corr = returns.corr()
# remaining securities to cluster
remain = list(corr.index.copy())
n = len(remain)
res = {}
while n > 0:
# if only one left then create cluster and finish
if n == 1:
i += 1
res[i] = remain
n = 0
# if not then we have some work to do
else:
# filter down correlation matrix to current remain
cur_corr = corr[remain].loc[remain]
# get mean correlations, ordered
mc = cur_corr.mean().sort_values()
# get lowest and highest mean correlation
low = mc.index[0]
high = mc.index[-1]
# case if corr(high,low) > threshold
if corr[high][low] > threshold:
i += 1
# new cluster for high and low
res[i] = [low, high]
remain.remove(low)
remain.remove(high)
rmv = []
for x in remain:
avg_corr = (corr[x][high] + corr[x][low]) / 2.0
if avg_corr > threshold:
res[i].append(x)
rmv.append(x)
[remain.remove(x) for x in rmv]
n = len(remain)
# otherwise we are creating two clusters - one for high
# and one for low
else:
# add cluster with HC
i += 1
res[i] = [high]
remain.remove(high)
remain.remove(low)
rmv = []
for x in remain:
if corr[x][high] > threshold:
res[i].append(x)
rmv.append(x)
[remain.remove(x) for x in rmv]
i += 1
res[i] = [low]
rmv = []
for x in remain:
if corr[x][low] > threshold:
res[i].append(x)
rmv.append(x)
[remain.remove(x) for x in rmv]
n = len(remain)
return res
[docs]def limit_weights(weights, limit=0.1):
"""
Limits weights and redistributes excedent amount
proportionally.
ex:
- weights are {a: 0.7, b: 0.2, c: 0.1}
- call with limit=0.5
- excess 0.2 in a is ditributed to b and c
proportionally.
- result is {a: 0.5, b: 0.33, c: 0.167}
Args:
* weights (Series): A series describing the weights
* limit (float): Maximum weight allowed
"""
if 1.0 / limit > len(weights):
raise ValueError("invalid limit -> 1 / limit must be <= len(weights)")
if isinstance(weights, dict):
weights = pd.Series(weights)
if np.round(weights.sum(), 1) != 1.0:
raise ValueError(
"Expecting weights (that sum to 1) - sum is %s" % weights.sum()
)
res = np.round(weights.copy(), 4)
to_rebalance = (res[res > limit] - limit).sum()
ok = res[res < limit]
ok += (ok / ok.sum()) * to_rebalance
res[res > limit] = limit
res[res < limit] = ok
if any(x > limit for x in res):
return limit_weights(res, limit=limit)
return res
[docs]def random_weights(n, bounds=(0.0, 1.0), total=1.0):
"""
Generate pseudo-random weights.
Returns a list of random weights that is of length
n, where each weight is in the range bounds, and
where the weights sum up to total.
Useful for creating random portfolios when benchmarking.
Args:
* n (int): number of random weights
* bounds ((low, high)): bounds for each weight
* total (float): total sum of the weights
"""
low = bounds[0]
high = bounds[1]
if high < low:
raise ValueError("Higher bound must be greater or " "equal to lower bound")
if n * high < total or n * low > total:
raise ValueError("solution not possible with given n and bounds")
w = [0] * n
tgt = -float(total)
for i in range(n):
rn = n - i - 1
rhigh = rn * high
rlow = rn * low
lowb = max(-rhigh - tgt, low)
highb = min(-rlow - tgt, high)
rw = random.uniform(lowb, highb)
w[i] = rw
tgt += rw
random.shuffle(w)
return w
[docs]def plot_heatmap(
data,
title="Heatmap",
show_legend=True,
show_labels=True,
label_fmt=".2f",
vmin=None,
vmax=None,
figsize=None,
label_color="w",
cmap="RdBu",
**kwargs
):
"""
Plot a heatmap using matplotlib's pcolor.
Args:
* data (DataFrame): DataFrame to plot. Usually small matrix (ex.
correlation matrix).
* title (string): Plot title
* show_legend (bool): Show color legend
* show_labels (bool): Show value labels
* label_fmt (str): Label format string
* vmin (float): Min value for scale
* vmax (float): Max value for scale
* cmap (string): Color map
* kwargs: Passed to matplotlib's pcolor
"""
fig, ax = plt.subplots(figsize=figsize)
heatmap = ax.pcolor(data, vmin=vmin, vmax=vmax, cmap=cmap)
# for some reason heatmap has the y values backwards....
ax.invert_yaxis()
if title is not None:
plt.title(title)
if show_legend:
fig.colorbar(heatmap)
if show_labels:
vals = data.values
for x in range(data.shape[0]):
for y in range(data.shape[1]):
plt.text(
x + 0.5,
y + 0.5,
format(vals[y, x], label_fmt),
horizontalalignment="center",
verticalalignment="center",
color=label_color,
)
plt.yticks(np.arange(0.5, len(data.index), 1), data.index)
plt.xticks(np.arange(0.5, len(data.columns), 1), data.columns)
return plt
[docs]def plot_corr_heatmap(data, **kwargs):
"""
Plots the correlation heatmap for a given DataFrame.
"""
return plot_heatmap(data.corr(), vmin=-1, vmax=1, **kwargs)
[docs]def rollapply(data, window, fn):
"""
Apply a function fn over a rolling window of size window.
Args:
* data (Series or DataFrame): Series or DataFrame
* window (int): Window size
* fn (function): Function to apply over the rolling window.
For a series, the return value is expected to be a single
number. For a DataFrame, it shuold return a new row.
Returns:
* Object of same dimensions as data
"""
res = data.copy()
res[:] = np.nan
n = len(data)
if window > n:
return res
for i in range(window - 1, n):
res.iloc[i] = fn(data.iloc[i - window + 1 : i + 1])
return res
def _winsorize_wrapper(x, limits):
"""
Wraps scipy winsorize function to drop na's
"""
if isinstance(x, pd.Series):
if x.count() == 0:
return x
notnanx = ~np.isnan(x)
x[notnanx] = scipy.stats.mstats.winsorize(x[notnanx], limits=limits)
return x
else:
return scipy.stats.mstats.winsorize(x, limits=limits)
[docs]def winsorize(x, axis=0, limits=0.01):
"""
`Winsorize <https://en.wikipedia.org/wiki/Winsorizing>`_ values based on limits
"""
# operate on copy
x = x.copy()
if isinstance(x, pd.DataFrame):
return x.apply(_winsorize_wrapper, axis=axis, args=(limits,))
else:
return pd.Series(_winsorize_wrapper(x, limits).values, index=x.index)
[docs]def rescale(x, min=0.0, max=1.0, axis=0):
"""
Rescale values to fit a certain range [min, max]
"""
def innerfn(x, min, max):
return np.interp(x, [np.min(x), np.max(x)], [min, max])
if isinstance(x, pd.DataFrame):
return x.apply(
innerfn,
axis=axis,
args=(
min,
max,
),
)
else:
return pd.Series(innerfn(x, min, max), index=x.index)
[docs]def annualize(returns, durations, one_year=365.0):
"""
Annualize returns using their respective durations.
Formula used is:
(1 + returns) ** (1 / (durations / one_year)) - 1
"""
return np.power(1.0 + returns, 1.0 / (durations / one_year)) - 1.0
[docs]def deannualize(returns, nperiods):
"""
Convert return expressed in annual terms on a different basis.
Args:
* returns (float, Series, DataFrame): Return(s)
* nperiods (int): Target basis, typically 252 for daily, 12 for
monthly, etc.
"""
if nperiods is None:
nperiods = infer_freq(returns)
return np.power(1 + returns, 1.0 / nperiods) - 1.0
[docs]def infer_freq(data):
"""
Infer the most likely frequency given the input index. If the frequency is
uncertain or index is not DateTime like, just return None
Args:
* data (DataFrame, Series): Any timeseries dataframe or series
"""
try:
return pd.infer_freq(data.index, warn=False)
except Exception:
return None
[docs]def infer_nperiods(data, annualization_factor=None):
if annualization_factor is None:
annualization_factor = TRADING_DAYS_PER_YEAR
freq = infer_freq(data)
if freq is None:
return None
def whole_periods_str_to_nperiods(freq):
if freq == "Y" or freq == "A":
return 1
if freq == "M":
return 12
if freq == "D":
return annualization_factor
if freq == "H":
return annualization_factor * 24
if freq == "T":
return annualization_factor * 24 * 60
if freq == "S":
return annualization_factor * 24 * 60 * 60
return None
""
if len(freq) == 1:
return whole_periods_str_to_nperiods(freq)
else:
try:
if freq.startswith("A"):
return 1
else:
whole_periods_str = freq[-1]
num_str = freq[:-1]
num = int(num_str)
return num * whole_periods_str_to_nperiods(whole_periods_str)
except KeyboardInterrupt:
raise
except BaseException:
return None
return None
[docs]def calc_sortino_ratio(returns, rf=0.0, nperiods=None, annualize=True):
"""
Calculates the `Sortino ratio <https://www.investopedia.com/terms/s/sortinoratio.asp>`_ given a series of returns
(see `Sharpe vs. Sortino <https://www.investopedia.com/ask/answers/010815/what-difference-between-sharpe-ratio-and-sortino-ratio.asp>`_).
Args:
* returns (Series or DataFrame): Returns
* rf (float, Series): `Risk-free rate <https://www.investopedia.com/terms/r/risk-freerate.asp>`_ expressed in yearly (annualized) terms or return series.
* nperiods (int): Number of periods used for annualization. Must be
provided if rf is non-zero and rf is not a price series
"""
if isinstance(rf, float) and rf != 0 and nperiods is None:
raise Exception("nperiods must be set if rf != 0 and rf is not a price series")
if nperiods is None:
nperiods = infer_freq(returns)
er = returns.to_excess_returns(rf, nperiods=nperiods)
negative_returns = np.minimum(er[1:], 0.0)
std = np.std(negative_returns, ddof=1)
with np.errstate(divide="ignore"):
res = np.divide(er.mean(), std)
if annualize:
if nperiods is None:
nperiods = 1
return res * np.sqrt(nperiods)
return res
[docs]def to_excess_returns(returns, rf, nperiods=None):
"""
Given a series of returns, it will return the excess returns over rf.
Args:
* returns (Series, DataFrame): Returns
* rf (float, Series): `Risk-Free rate(s) <https://www.investopedia.com/terms/r/risk-freerate.asp>`_ expressed in annualized term or return series
* nperiods (int): Optional. If provided, will convert rf to different
frequency using deannualize only if rf is a float
Returns:
* excess_returns (Series, DataFrame): Returns - rf
"""
if nperiods is None:
nperiods = infer_freq(returns)
if isinstance(rf, float) and nperiods is not None:
_rf = deannualize(rf, nperiods)
else:
_rf = rf
return returns - _rf
[docs]def calc_calmar_ratio(prices):
"""
Calculates the `Calmar ratio <https://www.investopedia.com/terms/c/calmarratio.asp>`_ given a series of prices
Args:
* prices (Series, DataFrame): Price series
"""
return np.divide(prices.calc_cagr(), abs(prices.calc_max_drawdown()))
[docs]def to_ulcer_index(prices):
"""
Converts from prices -> `Ulcer index <https://www.investopedia.com/terms/u/ulcerindex.asp>`_
See https://en.wikipedia.org/wiki/Ulcer_index
Args:
* prices (Series, DataFrame): Prices
"""
dd = prices.to_drawdown_series()
return np.divide(np.sqrt(np.sum(np.power(dd, 2))), dd.count())
[docs]def resample_returns(returns, func, seed=0, num_trials=100):
"""
Resample the returns and calculate any statistic on every new sample.
https://en.wikipedia.org/wiki/Resampling_(statistics)
:param returns (Series, DataFrame): Returns
:param func: Given the resampled returns calculate a statistic
:param seed: Seed for random number generator
:param num_trials: Number of times to resample and run the experiment
:return: Series of resampled statistics
"""
# stats = []
if type(returns) is pd.Series:
stats = pd.Series(index=range(num_trials))
elif type(returns) is pd.DataFrame:
stats = pd.DataFrame(index=range(num_trials), columns=returns.columns)
else:
raise (TypeError("returns needs to be a Series or DataFrame!"))
n = returns.shape[0]
for i in range(num_trials):
random_indices = resample(returns.index, n_samples=n, random_state=seed + i)
stats.loc[i] = func(returns.loc[random_indices])
return stats
[docs]def extend_pandas():
"""
Extends pandas' PandasObject (Series, Series,
DataFrame) with some functions defined in this file.
This facilitates common functional composition used in quant
finance.
Ex:
prices.to_returns().dropna().calc_clusters()
(where prices would be a DataFrame)
"""
PandasObject.to_returns = to_returns
PandasObject.to_log_returns = to_log_returns
PandasObject.to_price_index = to_price_index
PandasObject.rebase = rebase
PandasObject.calc_perf_stats = calc_perf_stats
PandasObject.to_drawdown_series = to_drawdown_series
PandasObject.calc_max_drawdown = calc_max_drawdown
PandasObject.calc_cagr = calc_cagr
PandasObject.calc_total_return = calc_total_return
PandasObject.as_percent = utils.as_percent
PandasObject.as_format = utils.as_format
PandasObject.to_monthly = to_monthly
PandasObject.asfreq_actual = asfreq_actual
PandasObject.drop_duplicate_cols = drop_duplicate_cols
PandasObject.calc_information = calc_information_ratio
PandasObject.calc_information_ratio = calc_information_ratio
PandasObject.calc_prob_mom = calc_prob_mom
PandasObject.calc_risk_return = calc_risk_return_ratio
PandasObject.calc_risk_return_ratio = calc_risk_return_ratio
PandasObject.calc_erc_weights = calc_erc_weights
PandasObject.calc_inv_vol_weights = calc_inv_vol_weights
PandasObject.calc_mean_var_weights = calc_mean_var_weights
PandasObject.calc_clusters = calc_clusters
PandasObject.calc_ftca = calc_ftca
PandasObject.calc_stats = calc_stats
PandasObject.plot_heatmap = plot_heatmap
PandasObject.plot_corr_heatmap = plot_corr_heatmap
PandasObject.rollapply = rollapply
PandasObject.winsorize = winsorize
PandasObject.rescale = rescale
PandasObject.calc_sortino = calc_sortino_ratio
PandasObject.calc_sortino_ratio = calc_sortino_ratio
PandasObject.calc_calmar = calc_calmar_ratio
PandasObject.calc_calmar_ratio = calc_calmar_ratio
PandasObject.calc_sharpe = calc_sharpe
PandasObject.calc_sharpe_ratio = calc_sharpe
PandasObject.to_excess_returns = to_excess_returns
PandasObject.to_ulcer_index = to_ulcer_index
PandasObject.to_ulcer_performance_index = to_ulcer_performance_index