"""
A collection of Algos used to create Strategy logic.
"""
from __future__ import division
import abc
import random
import re
import numpy as np
import pandas as pd
import sklearn.covariance
import bt
from bt.core import Algo, AlgoStack, SecurityBase, is_zero
[docs]def run_always(f):
"""
Run always decorator to be used with Algo
to ensure stack runs the decorated Algo
on each pass, regardless of failures in the stack.
"""
f.run_always = True
return f
[docs]class PrintDate(Algo):
"""
This Algo simply print's the current date.
Can be useful for debugging purposes.
"""
def __call__(self, target):
print(target.now)
return True
[docs]class PrintTempData(Algo):
"""
This Algo prints the temp data.
Useful for debugging.
Args:
* fmt_string (str): A string that will later be formatted with the
target's temp dict. Therefore, you should provide
what you want to examine within curly braces ( { } )
"""
def __init__(self, fmt_string=None):
super(PrintTempData, self).__init__()
self.fmt_string = fmt_string
def __call__(self, target):
if self.fmt_string:
print(self.fmt_string.format(**target.temp))
else:
print(target.temp)
return True
[docs]class PrintInfo(Algo):
"""
Prints out info associated with the target strategy. Useful for debugging
purposes.
Args:
* fmt_string (str): A string that will later be formatted with the
target object's __dict__ attribute. Therefore, you should provide
what you want to examine within curly braces ( { } )
Ex:
PrintInfo('Strategy {name} : {now}')
This will print out the name and the date (now) on each call.
Basically, you provide a string that will be formatted with target.__dict__
"""
def __init__(self, fmt_string="{name} {now}"):
super(PrintInfo, self).__init__()
self.fmt_string = fmt_string
def __call__(self, target):
print(self.fmt_string.format(**target.__dict__))
return True
[docs]class Debug(Algo):
"""
Utility Algo that calls pdb.set_trace when triggered.
In the debug session, 'target' is available and can be examined through the
StrategyBase interface.
"""
def __call__(self, target):
import pdb
pdb.set_trace()
return True
[docs]class RunOnce(Algo):
"""
Returns True on first run then returns False.
Args:
* run_on_first_call: bool which determines if it runs the first time the algo is called
As the name says, the algo only runs once. Useful in situations
where we want to run the logic once (buy and hold for example).
"""
def __init__(self):
super(RunOnce, self).__init__()
self.has_run = False
def __call__(self, target):
# if it hasn't run then we will
# run it and set flag
if not self.has_run:
self.has_run = True
return True
# return false to stop future execution
return False
[docs]class RunPeriod(Algo):
def __init__(
self, run_on_first_date=True, run_on_end_of_period=False, run_on_last_date=False
):
super(RunPeriod, self).__init__()
self._run_on_first_date = run_on_first_date
self._run_on_end_of_period = run_on_end_of_period
self._run_on_last_date = run_on_last_date
def __call__(self, target):
# get last date
now = target.now
# if none nothing to do - return false
if now is None:
return False
# not a known date in our universe
if now not in target.data.index:
return False
# get index of the current date
index = target.data.index.get_loc(target.now)
result = False
# index 0 is a date added by the Backtest Constructor
if index == 0:
return False
# first date
if index == 1:
if self._run_on_first_date:
result = True
# last date
elif index == (len(target.data.index) - 1):
if self._run_on_last_date:
result = True
else:
# create pandas.Timestamp for useful .week,.quarter properties
now = pd.Timestamp(now)
index_offset = -1
if self._run_on_end_of_period:
index_offset = 1
date_to_compare = target.data.index[index + index_offset]
date_to_compare = pd.Timestamp(date_to_compare)
result = self.compare_dates(now, date_to_compare)
return result
[docs] @abc.abstractmethod
def compare_dates(self, now, date_to_compare):
raise (NotImplementedError("RunPeriod Algo is an abstract class!"))
[docs]class RunDaily(RunPeriod):
"""
Returns True on day change.
Args:
* run_on_first_date (bool): determines if it runs the first time the algo is called
* run_on_end_of_period (bool): determines if it should run at the end of the period
or the beginning
* run_on_last_date (bool): determines if it runs on the last time the algo is called
Returns True if the target.now's day has changed
compared to the last(or next if run_on_end_of_period) date, if not returns False.
Useful for daily rebalancing strategies.
"""
[docs] def compare_dates(self, now, date_to_compare):
if now.date() != date_to_compare.date():
return True
return False
[docs]class RunWeekly(RunPeriod):
"""
Returns True on week change.
Args:
* run_on_first_date (bool): determines if it runs the first time the algo is called
* run_on_end_of_period (bool): determines if it should run at the end of the period
or the beginning
* run_on_last_date (bool): determines if it runs on the last time the algo is called
Returns True if the target.now's week has changed
since relative to the last(or next) date, if not returns False. Useful for
weekly rebalancing strategies.
"""
[docs] def compare_dates(self, now, date_to_compare):
if now.year != date_to_compare.year or now.week != date_to_compare.week:
return True
return False
[docs]class RunMonthly(RunPeriod):
"""
Returns True on month change.
Args:
* run_on_first_date (bool): determines if it runs the first time the algo is called
* run_on_end_of_period (bool): determines if it should run at the end of the period
or the beginning
* run_on_last_date (bool): determines if it runs on the last time the algo is called
Returns True if the target.now's month has changed
since relative to the last(or next) date, if not returns False. Useful for
monthly rebalancing strategies.
"""
[docs] def compare_dates(self, now, date_to_compare):
if now.year != date_to_compare.year or now.month != date_to_compare.month:
return True
return False
[docs]class RunQuarterly(RunPeriod):
"""
Returns True on quarter change.
Args:
* run_on_first_date (bool): determines if it runs the first time the algo is called
* run_on_end_of_period (bool): determines if it should run at the end of the period
or the beginning
* run_on_last_date (bool): determines if it runs on the last time the algo is called
Returns True if the target.now's quarter has changed
since relative to the last(or next) date, if not returns False. Useful for
quarterly rebalancing strategies.
"""
[docs] def compare_dates(self, now, date_to_compare):
if now.year != date_to_compare.year or now.quarter != date_to_compare.quarter:
return True
return False
[docs]class RunYearly(RunPeriod):
"""
Returns True on year change.
Args:
* run_on_first_date (bool): determines if it runs the first time the algo is called
* run_on_end_of_period (bool): determines if it should run at the end of the period
or the beginning
* run_on_last_date (bool): determines if it runs on the last time the algo is called
Returns True if the target.now's year has changed
since relative to the last(or next) date, if not returns False. Useful for
yearly rebalancing strategies.
"""
[docs] def compare_dates(self, now, date_to_compare):
if now.year != date_to_compare.year:
return True
return False
[docs]class RunOnDate(Algo):
"""
Returns True on a specific set of dates.
Args:
* dates (list): List of dates to run Algo on.
"""
def __init__(self, *dates):
"""
Args:
* dates (*args): A list of dates. Dates will be parsed
by pandas.to_datetime so pass anything that it can
parse. Typically, you will pass a string 'yyyy-mm-dd'.
"""
super(RunOnDate, self).__init__()
# parse dates and save
self.dates = [pd.to_datetime(d) for d in dates]
def __call__(self, target):
return target.now in self.dates
[docs]class RunAfterDate(Algo):
"""
Returns True after a date has passed
Args:
* date: Date after which to start trading
Note:
This is useful for algos that rely on trailing averages where you
don't want to start trading until some amount of data has been built up
"""
def __init__(self, date):
"""
Args:
* date: Date after which to start trading
"""
super(RunAfterDate, self).__init__()
# parse dates and save
self.date = pd.to_datetime(date)
def __call__(self, target):
return target.now > self.date
[docs]class RunAfterDays(Algo):
"""
Returns True after a specific number of 'warmup' trading days have passed
Args:
* days (int): Number of trading days to wait before starting
Note:
This is useful for algos that rely on trailing averages where you
don't want to start trading until some amount of data has been built up
"""
def __init__(self, days):
"""
Args:
* days (int): Number of trading days to wait before starting
"""
super(RunAfterDays, self).__init__()
self.days = days
def __call__(self, target):
if self.days > 0:
self.days -= 1
return False
return True
[docs]class RunIfOutOfBounds(Algo):
"""
This algo returns true if any of the target weights deviate by an amount greater
than tolerance. For example, it will be run if the tolerance is set to 0.5 and
a security grows from a target weight of 0.2 to greater than 0.3.
A strategy where rebalancing is performed quarterly or whenever any
security's weight deviates by more than 20% could be implemented by:
Or([runQuarterlyAlgo,runIfOutOfBoundsAlgo(0.2)])
Args:
* tolerance (float): Allowed deviation of each security weight.
Requires:
* Weights
"""
def __init__(self, tolerance):
self.tolerance = float(tolerance)
super(RunIfOutOfBounds, self).__init__()
def __call__(self, target):
if "weights" not in target.temp:
return True
targets = target.temp["weights"]
for cname in target.children:
if cname in targets:
c = target.children[cname]
deviation = abs((c.weight - targets[cname]) / targets[cname])
if deviation > self.tolerance:
return True
if "cash" in target.temp:
cash_deviation = abs(
(target.capital - targets.value) / targets.value - target.temp["cash"]
)
if cash_deviation > self.tolerance:
return True
return False
[docs]class RunEveryNPeriods(Algo):
"""
This algo runs every n periods.
Args:
* n (int): Run each n periods
* offset (int): Applies to the first run. If 0, this algo will run the
first time it is called.
This Algo can be useful for the following type of strategy:
Each month, select the top 5 performers. Hold them for 3 months.
You could then create 3 strategies with different offsets and create a
master strategy that would allocate equal amounts of capital to each.
"""
def __init__(self, n, offset=0):
super(RunEveryNPeriods, self).__init__()
self.n = n
self.offset = offset
self.idx = n - offset - 1
self.lcall = 0
def __call__(self, target):
# ignore multiple calls on same period
if self.lcall == target.now:
return False
else:
self.lcall = target.now
# run when idx == (n-1)
if self.idx == (self.n - 1):
self.idx = 0
return True
else:
self.idx += 1
return False
[docs]class SelectAll(Algo):
"""
Sets temp['selected'] with all securities (based on universe).
Selects all the securities and saves them in temp['selected'].
By default, SelectAll does not include securities that have no
data (nan) on current date or those whose price is zero or negative.
Args:
* include_no_data (bool): Include securities that do not have data?
* include_negative (bool): Include securities that have negative
or zero prices?
Sets:
* selected
"""
def __init__(self, include_no_data=False, include_negative=False):
super(SelectAll, self).__init__()
self.include_no_data = include_no_data
self.include_negative = include_negative
def __call__(self, target):
if self.include_no_data:
target.temp["selected"] = target.universe.columns
else:
universe = target.universe.loc[target.now].dropna()
if self.include_negative:
target.temp["selected"] = list(universe.index)
else:
target.temp["selected"] = list(universe[universe > 0].index)
return True
[docs]class SelectThese(Algo):
"""
Sets temp['selected'] with a set list of tickers.
Args:
* ticker (list): List of tickers to select.
* include_no_data (bool): Include securities that do not have data?
* include_negative (bool): Include securities that have negative
or zero prices?
Sets:
* selected
"""
def __init__(self, tickers, include_no_data=False, include_negative=False):
super(SelectThese, self).__init__()
self.tickers = tickers
self.include_no_data = include_no_data
self.include_negative = include_negative
def __call__(self, target):
if self.include_no_data:
target.temp["selected"] = self.tickers
else:
universe = target.universe.loc[target.now, self.tickers].dropna()
if self.include_negative:
target.temp["selected"] = list(universe.index)
else:
target.temp["selected"] = list(universe[universe > 0].index)
return True
[docs]class SelectHasData(Algo):
"""
Sets temp['selected'] based on all items in universe that meet
data requirements.
This is a more advanced version of SelectAll. Useful for selecting
tickers that need a certain amount of data for future algos to run
properly.
For example, if we need the items with 3 months of data or more,
we could use this Algo with a lookback period of 3 months.
When providing a lookback period, it is also wise to provide a min_count.
This is basically the number of data points needed within the lookback
period for a series to be considered valid. For example, in our 3 month
lookback above, we might want to specify the min_count as being
57 -> a typical trading month has give or take 20 trading days. If we
factor in some holidays, we can use 57 or 58. It's really up to you.
If you don't specify min_count, min_count will default to ffn's
get_num_days_required.
Args:
* lookback (DateOffset): A DateOffset that determines the lookback
period.
* min_count (int): Minimum number of days required for a series to be
considered valid. If not provided, ffn's get_num_days_required is
used to estimate the number of points required.
* include_no_data (bool): Include securities that do not have data?
* include_negative (bool): Include securities that have negative
or zero prices?
Sets:
* selected
"""
def __init__(
self,
lookback=pd.DateOffset(months=3),
min_count=None,
include_no_data=False,
include_negative=False,
):
super(SelectHasData, self).__init__()
self.lookback = lookback
if min_count is None:
min_count = bt.ffn.get_num_days_required(lookback)
self.min_count = min_count
self.include_no_data = include_no_data
self.include_negative = include_negative
def __call__(self, target):
if "selected" in target.temp:
selected = target.temp["selected"]
else:
selected = target.universe.columns
filt = target.universe.loc[target.now - self.lookback :, selected]
cnt = filt.count()
cnt = cnt[cnt >= self.min_count]
if not self.include_no_data:
cnt = cnt[~target.universe.loc[target.now, selected].isnull()]
if not self.include_negative:
cnt = cnt[target.universe.loc[target.now, selected] > 0]
target.temp["selected"] = list(cnt.index)
return True
[docs]class SelectN(Algo):
"""
Sets temp['selected'] based on ranking temp['stat'].
Selects the top or botton N items based on temp['stat'].
This is usually some kind of metric that will be computed in a
previous Algo and will be used for ranking purposes. Can select
top or bottom N based on sort_descending parameter.
Args:
* n (int): select top n items.
* sort_descending (bool): Should the stat be sorted in descending order
before selecting the first n items?
* all_or_none (bool): If true, only populates temp['selected'] if we
have n items. If we have less than n, then temp['selected'] = [].
* filter_selected (bool): If True, will only select from the existing
'selected' list.
Sets:
* selected
Requires:
* stat
"""
def __init__(
self, n, sort_descending=True, all_or_none=False, filter_selected=False
):
super(SelectN, self).__init__()
if n < 0:
raise ValueError("n cannot be negative")
self.n = n
self.ascending = not sort_descending
self.all_or_none = all_or_none
self.filter_selected = filter_selected
def __call__(self, target):
stat = target.temp["stat"].dropna()
if self.filter_selected and "selected" in target.temp:
stat = stat.loc[stat.index.intersection(target.temp["selected"])]
stat.sort_values(ascending=self.ascending, inplace=True)
# handle percent n
keep_n = self.n
if self.n < 1:
keep_n = int(self.n * len(stat))
sel = list(stat[:keep_n].index)
if self.all_or_none and len(sel) < keep_n:
sel = []
target.temp["selected"] = sel
return True
[docs]class SelectMomentum(AlgoStack):
"""
Sets temp['selected'] based on a simple momentum filter.
Selects the top n securities based on the total return over
a given lookback period. This is just a wrapper around an
AlgoStack with two algos: StatTotalReturn and SelectN.
Note, that SelectAll() or similar should be called before
SelectMomentum(), as StatTotalReturn uses values of temp['selected']
Args:
* n (int): select first N elements
* lookback (DateOffset): lookback period for total return
calculation
* lag (DateOffset): Lag interval for total return calculation
* sort_descending (bool): Sort descending (highest return is best)
* all_or_none (bool): If true, only populates temp['selected'] if we
have n items. If we have less than n, then temp['selected'] = [].
Sets:
* selected
Requires:
* selected
"""
def __init__(
self,
n,
lookback=pd.DateOffset(months=3),
lag=pd.DateOffset(days=0),
sort_descending=True,
all_or_none=False,
):
super(SelectMomentum, self).__init__(
StatTotalReturn(lookback=lookback, lag=lag),
SelectN(n=n, sort_descending=sort_descending, all_or_none=all_or_none),
)
[docs]class SelectWhere(Algo):
"""
Selects securities based on an indicator DataFrame.
Selects securities where the value is True on the current date
(target.now) only if current date is present in signal DataFrame.
For example, this could be the result of a pandas boolean comparison such
as data > 100.
Args:
* signal (str|DataFrame): Boolean DataFrame containing selection logic.
If a string is passed, frame is accessed using target.get_data
This is the preferred way of using the algo.
* include_no_data (bool): Include securities that do not have data?
* include_negative (bool): Include securities that have negative
or zero prices?
Sets:
* selected
"""
def __init__(self, signal, include_no_data=False, include_negative=False):
super(SelectWhere, self).__init__()
if isinstance(signal, pd.DataFrame):
self.signal_name = None
self.signal = signal
else:
self.signal_name = signal
self.signal = None
self.include_no_data = include_no_data
self.include_negative = include_negative
def __call__(self, target):
# get signal Series at target.now
if self.signal_name is None:
signal = self.signal
else:
signal = target.get_data(self.signal_name)
if target.now in signal.index:
sig = signal.loc[target.now]
# get tickers where True
# selected = sig.index[sig]
selected = sig[sig == True].index # noqa: E712
# save as list
if not self.include_no_data:
universe = target.universe.loc[target.now, list(selected)].dropna()
if self.include_negative:
selected = list(universe.index)
else:
selected = list(universe[universe > 0].index)
target.temp["selected"] = list(selected)
return True
[docs]class SelectRandomly(AlgoStack):
"""
Sets temp['selected'] based on a random subset of
the items currently in temp['selected'].
Selects n random elements from the list stored in temp['selected'].
This is useful for benchmarking against a strategy where we believe
the selection algorithm is adding value.
For example, if we are testing a momentum strategy and we want to see if
selecting securities based on momentum is better than just selecting
securities randomly, we could use this Algo to create a random Strategy
used for random benchmarking.
Note:
Another selection algorithm should be use prior to this Algo to
populate temp['selected']. This will typically be SelectAll.
Args:
* n (int): Select N elements randomly.
* include_no_data (bool): Include securities that do not have data?
* include_negative (bool): Include securities that have negative
or zero prices?
Sets:
* selected
Requires:
* selected
"""
def __init__(self, n=None, include_no_data=False, include_negative=False):
super(SelectRandomly, self).__init__()
self.n = n
self.include_no_data = include_no_data
self.include_negative = include_negative
def __call__(self, target):
if "selected" in target.temp:
sel = target.temp["selected"]
else:
sel = list(target.universe.columns)
if not self.include_no_data:
universe = target.universe.loc[target.now, sel].dropna()
if self.include_negative:
sel = list(universe.index)
else:
sel = list(universe[universe > 0].index)
if self.n is not None:
n = self.n if self.n < len(sel) else len(sel)
sel = random.sample(sel, int(n))
target.temp["selected"] = sel
return True
[docs]class SelectRegex(Algo):
"""
Sets temp['selected'] based on a regex on their names.
Useful when working with a large universe of different kinds of securities
Args:
* regex (str): regular expression on the name
Sets:
* selected
Requires:
* selected
"""
def __init__(self, regex):
super(SelectRegex, self).__init__()
self.regex = re.compile(regex)
def __call__(self, target):
selected = target.temp["selected"]
selected = [s for s in selected if self.regex.search(s)]
target.temp["selected"] = selected
return True
[docs]class ResolveOnTheRun(Algo):
"""
Looks at securities set in temp['selected'] and searches for names that
match the names of "aliases" for on-the-run securities in the provided
data. Then replaces the alias with the name of the underlying security
appropriate for the given date, and sets it back on temp['selected']
Args:
* on_the_run (str): Name of a Data frame with
- columns set to "on the run" ticker names
- index set to the timeline for the backtest
- values are the actual security name to use for the given date
* include_no_data (bool): Include securities that do not have data?
* include_negative (bool): Include securities that have negative
or zero prices?
Requires:
* selected
Sets:
* selected
"""
def __init__(self, on_the_run, include_no_data=False, include_negative=False):
super(ResolveOnTheRun, self).__init__()
self.on_the_run = on_the_run
self.include_no_data = include_no_data
self.include_negative = include_negative
def __call__(self, target):
# Resolve real tickers based on OTR
on_the_run = target.get_data(self.on_the_run)
selected = target.temp["selected"]
aliases = [s for s in selected if s in on_the_run.columns]
resolved = on_the_run.loc[target.now, aliases].tolist()
if not self.include_no_data:
universe = target.universe.loc[target.now, resolved].dropna()
if self.include_negative:
resolved = list(universe.index)
else:
resolved = list(universe[universe > 0].index)
target.temp["selected"] = resolved + [
s for s in selected if s not in on_the_run.columns
]
return True
[docs]class SetStat(Algo):
"""
Sets temp['stat'] for use by downstream algos (such as SelectN).
Args:
* stat (str|DataFrame): A dataframe of the same dimension as target.universe
If a string is passed, frame is accessed using target.get_data
This is the preferred way of using the algo.
Sets:
* stat
"""
def __init__(self, stat):
if isinstance(stat, pd.DataFrame):
self.stat_name = None
self.stat = stat
else:
self.stat_name = stat
self.stat = None
def __call__(self, target):
if self.stat_name is None:
stat = self.stat
else:
stat = target.get_data(self.stat_name)
target.temp["stat"] = stat.loc[target.now]
return True
[docs]class StatTotalReturn(Algo):
"""
Sets temp['stat'] with total returns over a given period.
Sets the 'stat' based on the total return of each element in
temp['selected'] over a given lookback period. The total return
is determined by ffn's calc_total_return.
Args:
* lookback (DateOffset): lookback period.
* lag (DateOffset): Lag interval. Total return is calculated in
the inteval [now - lookback - lag, now - lag]
Sets:
* stat
Requires:
* selected
"""
def __init__(self, lookback=pd.DateOffset(months=3), lag=pd.DateOffset(days=0)):
super(StatTotalReturn, self).__init__()
self.lookback = lookback
self.lag = lag
def __call__(self, target):
selected = target.temp["selected"]
t0 = target.now - self.lag
prc = target.universe.loc[t0 - self.lookback : t0, selected]
target.temp["stat"] = prc.calc_total_return()
return True
[docs]class WeighEqually(Algo):
"""
Sets temp['weights'] by calculating equal weights for all items in
selected.
Equal weight Algo. Sets the 'weights' to 1/n for each item in 'selected'.
Sets:
* weights
Requires:
* selected
"""
def __init__(self):
super(WeighEqually, self).__init__()
def __call__(self, target):
selected = target.temp["selected"]
n = len(selected)
if n == 0:
target.temp["weights"] = {}
else:
w = 1.0 / n
target.temp["weights"] = {x: w for x in selected}
return True
[docs]class WeighSpecified(Algo):
"""
Sets temp['weights'] based on a provided dict of ticker:weights.
Sets the weights based on pre-specified targets.
Args:
* weights (dict): target weights -> ticker: weight
Sets:
* weights
"""
def __init__(self, **weights):
super(WeighSpecified, self).__init__()
self.weights = weights
def __call__(self, target):
# added copy to make sure these are not overwritten
target.temp["weights"] = self.weights.copy()
return True
[docs]class ScaleWeights(Algo):
"""
Sets temp['weights'] based on a scaled version of itself.
Useful for going short, or scaling up/down when using
:class:`FixedIncomeStrategy <bt.core.FixedIncomeStrategy>`.
Args:
* scale (float): the scaling factor
Sets:
* weights
Requires:
* weights
"""
def __init__(self, scale):
super(ScaleWeights, self).__init__()
self.scale = scale
def __call__(self, target):
target.temp["weights"] = {
k: self.scale * w for k, w in target.temp["weights"].items()
}
return True
[docs]class WeighTarget(Algo):
"""
Sets target weights based on a target weight DataFrame.
If the target weight dataFrame is of same dimension
as the target.universe, the portfolio will effectively be rebalanced on
each period. For example, if we have daily data and the target DataFrame
is of the same shape, we will have daily rebalancing.
However, if we provide a target weight dataframe that has only month end
dates, then rebalancing only occurs monthly.
Basically, if a weight is provided on a given date, the target weights are
set and the algo moves on (presumably to a Rebalance algo). If not, not
target weights are set.
Args:
* weights (str|DataFrame): DataFrame containing the target weights
If a string is passed, frame is accessed using target.get_data
This is the preferred way of using the algo.
Sets:
* weights
"""
def __init__(self, weights):
super(WeighTarget, self).__init__()
if isinstance(weights, pd.DataFrame):
self.weights_name = None
self.weights = weights
else:
self.weights_name = weights
self.weights = None
def __call__(self, target):
# get current target weights
if self.weights_name is None:
weights = self.weights
else:
weights = target.get_data(self.weights_name)
if target.now in weights.index:
w = weights.loc[target.now]
# dropna and save
target.temp["weights"] = w.dropna()
return True
else:
return False
[docs]class WeighInvVol(Algo):
"""
Sets temp['weights'] based on the inverse volatility Algo.
Sets the target weights based on ffn's calc_inv_vol_weights. This
is a commonly used technique for risk parity portfolios. The least
volatile elements receive the highest weight under this scheme. Weights
are proportional to the inverse of their volatility.
Args:
* lookback (DateOffset): lookback period for estimating volatility
Sets:
* weights
Requires:
* selected
"""
def __init__(self, lookback=pd.DateOffset(months=3), lag=pd.DateOffset(days=0)):
super(WeighInvVol, self).__init__()
self.lookback = lookback
self.lag = lag
def __call__(self, target):
selected = target.temp["selected"]
if len(selected) == 0:
target.temp["weights"] = {}
return True
if len(selected) == 1:
target.temp["weights"] = {selected[0]: 1.0}
return True
t0 = target.now - self.lag
prc = target.universe.loc[t0 - self.lookback : t0, selected]
tw = bt.ffn.calc_inv_vol_weights(prc.to_returns().dropna())
target.temp["weights"] = tw.dropna()
return True
[docs]class WeighERC(Algo):
"""
Sets temp['weights'] based on equal risk contribution algorithm.
Sets the target weights based on ffn's calc_erc_weights. This
is an extension of the inverse volatility risk parity portfolio in
which the correlation of asset returns is incorporated into the
calculation of risk contribution of each asset.
The resulting portfolio is similar to a minimum variance portfolio
subject to a diversification constraint on the weights of its components
and its volatility is located between those of the minimum variance and
equally-weighted portfolios (Maillard 2008).
See:
https://en.wikipedia.org/wiki/Risk_parity
Args:
* lookback (DateOffset): lookback period for estimating covariance
* initial_weights (list): Starting asset weights [default inverse vol].
* risk_weights (list): Risk target weights [default equal weight].
* covar_method (str): method used to estimate the covariance. See ffn's
calc_erc_weights for more details. (default ledoit-wolf).
* risk_parity_method (str): Risk parity estimation method. see ffn's
calc_erc_weights for more details. (default ccd).
* maximum_iterations (int): Maximum iterations in iterative solutions
(default 100).
* tolerance (float): Tolerance level in iterative solutions (default 1E-8).
Sets:
* weights
Requires:
* selected
"""
def __init__(
self,
lookback=pd.DateOffset(months=3),
initial_weights=None,
risk_weights=None,
covar_method="ledoit-wolf",
risk_parity_method="ccd",
maximum_iterations=100,
tolerance=1e-8,
lag=pd.DateOffset(days=0),
):
super(WeighERC, self).__init__()
self.lookback = lookback
self.initial_weights = initial_weights
self.risk_weights = risk_weights
self.covar_method = covar_method
self.risk_parity_method = risk_parity_method
self.maximum_iterations = maximum_iterations
self.tolerance = tolerance
self.lag = lag
def __call__(self, target):
selected = target.temp["selected"]
if len(selected) == 0:
target.temp["weights"] = {}
return True
if len(selected) == 1:
target.temp["weights"] = {selected[0]: 1.0}
return True
t0 = target.now - self.lag
prc = target.universe.loc[t0 - self.lookback : t0, selected]
tw = bt.ffn.calc_erc_weights(
prc.to_returns().dropna(),
initial_weights=self.initial_weights,
risk_weights=self.risk_weights,
covar_method=self.covar_method,
risk_parity_method=self.risk_parity_method,
maximum_iterations=self.maximum_iterations,
tolerance=self.tolerance,
)
target.temp["weights"] = tw.dropna()
return True
[docs]class WeighMeanVar(Algo):
"""
Sets temp['weights'] based on mean-variance optimization.
Sets the target weights based on ffn's calc_mean_var_weights. This is a
Python implementation of Markowitz's mean-variance optimization.
See:
http://en.wikipedia.org/wiki/Modern_portfolio_theory#The_efficient_frontier_with_no_risk-free_asset
Args:
* lookback (DateOffset): lookback period for estimating volatility
* bounds ((min, max)): tuple specifying the min and max weights for
each asset in the optimization.
* covar_method (str): method used to estimate the covariance. See ffn's
calc_mean_var_weights for more details.
* rf (float): risk-free rate used in optimization.
Sets:
* weights
Requires:
* selected
"""
def __init__(
self,
lookback=pd.DateOffset(months=3),
bounds=(0.0, 1.0),
covar_method="ledoit-wolf",
rf=0.0,
lag=pd.DateOffset(days=0),
):
super(WeighMeanVar, self).__init__()
self.lookback = lookback
self.lag = lag
self.bounds = bounds
self.covar_method = covar_method
self.rf = rf
def __call__(self, target):
selected = target.temp["selected"]
if len(selected) == 0:
target.temp["weights"] = {}
return True
if len(selected) == 1:
target.temp["weights"] = {selected[0]: 1.0}
return True
t0 = target.now - self.lag
prc = target.universe.loc[t0 - self.lookback : t0, selected]
tw = bt.ffn.calc_mean_var_weights(
prc.to_returns().dropna(),
weight_bounds=self.bounds,
covar_method=self.covar_method,
rf=self.rf,
)
target.temp["weights"] = tw.dropna()
return True
[docs]class WeighRandomly(Algo):
"""
Sets temp['weights'] based on a random weight vector.
Sets random target weights for each security in 'selected'.
This is useful for benchmarking against a strategy where we believe
the weighing algorithm is adding value.
For example, if we are testing a low-vol strategy and we want to see if
our weighing strategy is better than just weighing
securities randomly, we could use this Algo to create a random Strategy
used for random benchmarking.
This is an Algo wrapper around ffn's random_weights function.
Args:
* bounds ((low, high)): Tuple including low and high bounds for each
security
* weight_sum (float): What should the weights sum up to?
Sets:
* weights
Requires:
* selected
"""
def __init__(self, bounds=(0.0, 1.0), weight_sum=1):
super(WeighRandomly, self).__init__()
self.bounds = bounds
self.weight_sum = weight_sum
def __call__(self, target):
sel = target.temp["selected"]
n = len(sel)
w = {}
try:
rw = bt.ffn.random_weights(n, self.bounds, self.weight_sum)
w = dict(zip(sel, rw))
except ValueError:
pass
target.temp["weights"] = w
return True
[docs]class LimitDeltas(Algo):
"""
Modifies temp['weights'] based on weight delta limits.
Basically, this can be used if we want to restrict how much a security's
target weight can change from day to day. Useful when we want to be more
conservative about how much we could actually trade on a given day without
affecting the market.
For example, if we have a strategy that is currently long 100% one
security, and the weighing Algo sets the new weight to 0%, but we
use this Algo with a limit of 0.1, the new target weight will
be 90% instead of 0%.
Args:
* limit (float, dict): Weight delta limit. If float, this will be a
global limit for all securities. If dict, you may specify by-ticker
limit.
Sets:
* weights
Requires:
* weights
"""
def __init__(self, limit=0.1):
super(LimitDeltas, self).__init__()
self.limit = limit
# determine if global or specific
self.global_limit = True
if isinstance(limit, dict):
self.global_limit = False
def __call__(self, target):
tw = target.temp["weights"]
all_keys = set(list(target.children.keys()) + list(tw.keys()))
for k in all_keys:
tgt = tw[k] if k in tw else 0.0
cur = target.children[k].weight if k in target.children else 0.0
delta = tgt - cur
# check if we need to limit
if self.global_limit:
if abs(delta) > self.limit:
tw[k] = cur + (self.limit * np.sign(delta))
else:
# make sure we have a limit defined in case of limit dict
if k in self.limit:
lmt = self.limit[k]
if abs(delta) > lmt:
tw[k] = cur + (lmt * np.sign(delta))
return True
[docs]class LimitWeights(Algo):
"""
Modifies temp['weights'] based on weight limits.
This is an Algo wrapper around ffn's limit_weights. The purpose of this
Algo is to limit the weight of any one specifc asset. For example, some
Algos will set some rather extreme weights that may not be acceptable.
Therefore, we can use this Algo to limit the extreme weights. The excess
weight is then redistributed to the other assets, proportionally to
their current weights.
See ffn's limit_weights for more information.
Args:
* limit (float): Weight limit.
Sets:
* weights
Requires:
* weights
"""
def __init__(self, limit=0.1):
super(LimitWeights, self).__init__()
self.limit = limit
def __call__(self, target):
if "weights" not in target.temp:
return True
tw = target.temp["weights"]
if len(tw) == 0:
return True
# if the limit < equal weight then set weights to 0
if self.limit < 1.0 / len(tw):
tw = {}
else:
tw = bt.ffn.limit_weights(tw, self.limit)
target.temp["weights"] = tw
return True
[docs]class TargetVol(Algo):
"""
Updates temp['weights'] based on the target annualized volatility desired.
Args:
* target_volatility: annualized volatility to target
* lookback (DateOffset): lookback period for estimating volatility
* lag (DateOffset): amount of time to wait to calculate the covariance
* covar_method: method of calculating volatility
* annualization_factor: number of periods to annualize by.
It is assumed that target volatility is already annualized by this factor.
Updates:
* weights
Requires:
* temp['weights']
"""
def __init__(
self,
target_volatility,
lookback=pd.DateOffset(months=3),
lag=pd.DateOffset(days=0),
covar_method="standard",
annualization_factor=252,
):
super(TargetVol, self).__init__()
self.target_volatility = target_volatility
self.lookback = lookback
self.lag = lag
self.covar_method = covar_method
self.annualization_factor = annualization_factor
def __call__(self, target):
current_weights = target.temp["weights"]
selected = current_weights.keys()
# if there were no weights already set then skip
if len(selected) == 0:
return True
t0 = target.now - self.lag
prc = target.universe.loc[t0 - self.lookback : t0, selected]
returns = bt.ffn.to_returns(prc)
# calc covariance matrix
if self.covar_method == "ledoit-wolf":
covar = sklearn.covariance.ledoit_wolf(returns)
elif self.covar_method == "standard":
covar = returns.cov()
else:
raise NotImplementedError("covar_method not implemented")
weights = pd.Series(
[current_weights[x] for x in covar.columns], index=covar.columns
)
vol = np.sqrt(
np.matmul(weights.values.T, np.matmul(covar.values, weights.values))
* self.annualization_factor
)
if isinstance(self.target_volatility, (float, int)):
self.target_volatility = {
k: self.target_volatility for k in target.temp["weights"].keys()
}
for k in target.temp["weights"].keys():
if k in self.target_volatility.keys():
target.temp["weights"][k] = (
target.temp["weights"][k] * self.target_volatility[k] / vol
)
return True
[docs]class PTE_Rebalance(Algo):
"""
Triggers a rebalance when PTE from static weights is past a level.
Args:
* PTE_volatility_cap: annualized volatility to target
* target_weights: dataframe of weights that needs to have the same index as the price dataframe
* lookback (DateOffset): lookback period for estimating volatility
* lag (DateOffset): amount of time to wait to calculate the covariance
* covar_method: method of calculating volatility
* annualization_factor: number of periods to annualize by.
It is assumed that target volatility is already annualized by this factor.
"""
def __init__(
self,
PTE_volatility_cap,
target_weights,
lookback=pd.DateOffset(months=3),
lag=pd.DateOffset(days=0),
covar_method="standard",
annualization_factor=252,
):
super(PTE_Rebalance, self).__init__()
self.PTE_volatility_cap = PTE_volatility_cap
self.target_weights = target_weights
self.lookback = lookback
self.lag = lag
self.covar_method = covar_method
self.annualization_factor = annualization_factor
def __call__(self, target):
if target.now is None:
return False
if target.positions.shape == (0, 0):
return True
positions = target.positions.loc[target.now]
if positions is None:
return True
prices = target.universe.loc[target.now, positions.index]
if prices is None:
return True
current_weights = positions * prices / target.value
target_weights = self.target_weights.loc[target.now, :]
cols = list(current_weights.index.copy())
for c in target_weights.keys():
if c not in cols:
cols.append(c)
weights = pd.Series(np.zeros(len(cols)), index=cols)
for c in cols:
if c in current_weights:
weights[c] = current_weights[c]
if c in target_weights:
weights[c] -= target_weights[c]
t0 = target.now - self.lag
prc = target.universe.loc[t0 - self.lookback : t0, cols]
returns = bt.ffn.to_returns(prc)
# calc covariance matrix
if self.covar_method == "ledoit-wolf":
covar = sklearn.covariance.ledoit_wolf(returns)
elif self.covar_method == "standard":
covar = returns.cov()
else:
raise NotImplementedError("covar_method not implemented")
PTE_vol = np.sqrt(
np.matmul(weights.values.T, np.matmul(covar.values, weights.values))
* self.annualization_factor
)
if pd.isnull(PTE_vol):
return False
# vol is too high
if PTE_vol > self.PTE_volatility_cap:
return True
else:
return False
return True
[docs]class CapitalFlow(Algo):
"""
Used to model capital flows. Flows can either be inflows or outflows.
This Algo can be used to model capital flows. For example, a pension
fund might have inflows every month or year due to contributions. This
Algo will affect the capital of the target node without affecting returns
for the node.
Since this is modeled as an adjustment, the capital will remain in the
strategy until a re-allocation/rebalancement is made.
Args:
* amount (float): Amount of adjustment
"""
def __init__(self, amount):
"""
CapitalFlow constructor.
Args:
* amount (float): Amount to adjust by
"""
super(CapitalFlow, self).__init__()
self.amount = float(amount)
def __call__(self, target):
target.adjust(self.amount)
return True
[docs]class CloseDead(Algo):
"""
Closes all positions for which prices are equal to zero (we assume
that these stocks are dead) and removes them from temp['weights'] if
they enter it by any chance.
To be called before Rebalance().
In a normal workflow it is not needed, as those securities will not
be selected by SelectAll(include_no_data=False) or similar method, and
Rebalance() closes positions that are not in temp['weights'] anyway.
However in case when for some reasons include_no_data=False could not
be used or some modified weighting method is used, CloseDead() will
allow to avoid errors.
Requires:
* weights
"""
def __init__(self):
super(CloseDead, self).__init__()
def __call__(self, target):
if "weights" not in target.temp:
return True
targets = target.temp["weights"]
for c in target.children:
if target.universe[c].loc[target.now] <= 0:
target.close(c)
if c in targets:
del targets[c]
return True
[docs]class SetNotional(Algo):
"""
Sets the notional_value to use as the base for rebalancing for
:class:`FixedIncomeStrategy <bt.core.FixedIncomeStrategy>` targets
Args:
* notional_value (str): Name of a pd.Series object containing the
target notional values of the strategy over time.
Sets:
* notional_value
"""
def __init__(self, notional_value):
self.notional_value = notional_value
super(SetNotional, self).__init__()
def __call__(self, target):
notional_value = target.get_data(self.notional_value)
if target.now in notional_value.index:
target.temp["notional_value"] = notional_value.loc[target.now]
return True
else:
return False
[docs]class Rebalance(Algo):
"""
Rebalances capital based on temp['weights']
Rebalances capital based on temp['weights']. Also closes
positions if open but not in target_weights. This is typically
the last Algo called once the target weights have been set.
Requires:
* weights
* cash (optional): You can set a 'cash' value on temp. This should be a
number between 0-1 and determines the amount of cash to set aside.
For example, if cash=0.3, the strategy will allocate 70% of its
value to the provided weights, and the remaining 30% will be kept
in cash. If this value is not provided (default), the full value
of the strategy is allocated to securities.
* notional_value (optional): Required only for fixed_income targets. This is the base
balue of total notional that will apply to the weights.
"""
def __init__(self):
super(Rebalance, self).__init__()
def __call__(self, target):
if "weights" not in target.temp:
return True
targets = target.temp["weights"]
# save value because it will change after each call to allocate
# use it as base in rebalance calls
# call it before de-allocation so that notional_value is correct
if target.fixed_income:
if "notional_value" in target.temp:
base = target.temp["notional_value"]
else:
base = target.notional_value
else:
base = target.value
# de-allocate children that are not in targets and have non-zero value
# (open positions)
for cname in target.children:
# if this child is in our targets, we don't want to close it out
if cname in targets:
continue
# get child and value
c = target.children[cname]
if target.fixed_income:
v = c.notional_value
else:
v = c.value
# if non-zero and non-null, we need to close it out
if v != 0.0 and not np.isnan(v):
target.close(cname, update=False)
# If cash is set (it should be a value between 0-1 representing the
# proportion of cash to keep), calculate the new 'base'
if "cash" in target.temp and not target.fixed_income:
base = base * (1 - target.temp["cash"])
# Turn off updating while we rebalance each child
for item in targets.items():
target.rebalance(item[1], child=item[0], base=base, update=False)
# Now update
target.root.update(target.now)
return True
[docs]class RebalanceOverTime(Algo):
"""
Similar to Rebalance but rebalances to target
weight over n periods.
Rebalances towards a target weight over a n periods. Splits up the weight
delta over n periods.
This can be useful if we want to make more conservative rebalacing
assumptions. Some strategies can produce large swings in allocations. It
might not be reasonable to assume that this rebalancing can occur at the
end of one specific period. Therefore, this algo can be used to simulate
rebalancing over n periods.
This has typically been used in monthly strategies where we want to spread
out the rebalancing over 5 or 10 days.
Note:
This Algo will require the run_always wrapper in the above case. For
example, the RunMonthly will return True on the first day, and
RebalanceOverTime will be 'armed'. However, RunMonthly will return
False the rest days of the month. Therefore, we must specify that we
want to always run this algo.
Args:
* n (int): number of periods over which rebalancing takes place.
Requires:
* weights
"""
def __init__(self, n=10):
super(RebalanceOverTime, self).__init__()
self.n = float(n)
self._rb = Rebalance()
self._weights = None
self._days_left = None
def __call__(self, target):
# new weights specified - update rebalance data
if "weights" in target.temp:
self._weights = target.temp["weights"]
self._days_left = self.n
# if _weights are not None, we have some work to do
if self._weights is not None:
tgt = {}
# scale delta relative to # of periods left and set that as the new
# target
for cname in self._weights.keys():
curr = (
target.children[cname].weight if cname in target.children else 0.0
)
dlt = (self._weights[cname] - curr) / self._days_left
tgt[cname] = curr + dlt
# mock weights and call real Rebalance
target.temp["weights"] = tgt
self._rb(target)
# dec _days_left. If 0, set to None & set _weights to None
self._days_left -= 1
if self._days_left == 0:
self._days_left = None
self._weights = None
return True
[docs]class Require(Algo):
"""
Flow control Algo.
This algo returns the value of a predicate
on an temp entry. Useful for controlling
flow.
For example, we might want to make sure we have some items selected.
We could pass a lambda function that checks the len of 'selected':
pred=lambda x: len(x) == 0
item='selected'
Args:
* pred (Algo): Function that returns a Bool given the strategy. This
is the definition of an Algo. However, this is typically used
with a simple lambda function.
* item (str): An item within temp.
* if_none (bool): Result if the item required is not in temp or if it's
value if None
"""
def __init__(self, pred, item, if_none=False):
super(Require, self).__init__()
self.item = item
self.pred = pred
self.if_none = if_none
def __call__(self, target):
if self.item not in target.temp:
return self.if_none
item = target.temp[self.item]
if item is None:
return self.if_none
return self.pred(item)
[docs]class Not(Algo):
"""
Flow control Algo
It is usful for "inverting" other flow control algos,
For example Not( RunAfterDate(...) ), Not( RunAfterDays(...) ), etc
Args:
* list_of_algos (Algo): The algo to run and invert the return value of
"""
def __init__(self, algo):
super(Not, self).__init__()
self._algo = algo
def __call__(self, target):
return not self._algo(target)
[docs]class Or(Algo):
"""
Flow control Algo
It useful for combining multiple signals into one signal.
For example, we might want two different rebalance signals to work together:
runOnDateAlgo = bt.algos.RunOnDate(pdf.index[0]) # where pdf.index[0] is the first date in our time series
runMonthlyAlgo = bt.algos.RunMonthly()
orAlgo = Or([runMonthlyAlgo,runOnDateAlgo])
orAlgo will return True if it is the first date or if it is 1st of the month
Args:
* list_of_algos: Iterable list of algos.
Runs each algo and
returns true if any algo returns true.
"""
def __init__(self, list_of_algos):
super(Or, self).__init__()
self._list_of_algos = list_of_algos
return
def __call__(self, target):
res = False
for algo in self._list_of_algos:
tempRes = algo(target)
res = res | tempRes
return res
[docs]class SelectTypes(Algo):
"""
Sets temp['selected'] based on node type.
If temp['selected'] is already set, it will filter the existing
selection.
Args:
* include_types (list): Types of nodes to include
* exclude_types (list): Types of nodes to exclude
Sets:
* selected
"""
def __init__(self, include_types=(bt.core.Node,), exclude_types=()):
super(SelectTypes, self).__init__()
self.include_types = include_types
self.exclude_types = exclude_types or (type(None),)
def __call__(self, target):
selected = [
sec_name
for sec_name, sec in target.children.items()
if isinstance(sec, self.include_types)
and not isinstance(sec, self.exclude_types)
]
if "selected" in target.temp:
selected = [s for s in selected if s in target.temp["selected"]]
target.temp["selected"] = selected
return True
[docs]class ClosePositionsAfterDates(Algo):
"""
Close positions on securities after a given date.
This can be used to make sure positions on matured/redeemed securities are
closed. It can also be used as part of a strategy to, i.e. make sure
the strategy doesn't hold any securities with time to maturity less than a year
Note that if placed after a RunPeriod algo in the stack, that the actual
closing of positions will occur after the provided date. For this to work,
the "price" of the security (even if matured) must exist up until that date.
Alternatively, run this with the @run_always decorator to close the positions
immediately.
Also note that this algo does not operate using temp['weights'] and Rebalance.
This is so that hedges (which are excluded from that workflow) will also be
closed as necessary.
Args:
* close_dates (str): the name of a dataframe indexed by security name, with columns
"date": the date after which we want to close the position ASAP
Sets:
* target.perm['closed'] : to keep track of which securities have already closed
"""
def __init__(self, close_dates):
super(ClosePositionsAfterDates, self).__init__()
self.close_dates = close_dates
def __call__(self, target):
if "closed" not in target.perm:
target.perm["closed"] = set()
close_dates = target.get_data(self.close_dates)["date"]
# Find securities that are candidate for closing
sec_names = [
sec_name
for sec_name, sec in target.children.items()
if isinstance(sec, SecurityBase)
and sec_name in close_dates.index
and sec_name not in target.perm["closed"]
]
# Check whether closed
is_closed = close_dates.loc[sec_names] <= target.now
# Close position
for sec_name in is_closed[is_closed].index:
target.close(sec_name, update=False)
target.perm["closed"].add(sec_name)
# Now update
target.root.update(target.now)
return True
[docs]class RollPositionsAfterDates(Algo):
"""
Roll securities based on the provided map.
This can be used for any securities which have "On-The-Run" and "Off-The-Run"
versions (treasury bonds, index swaps, etc).
Also note that this algo does not operate using temp['weights'] and Rebalance.
This is so that hedges (which are excluded from that workflow) will also be
rolled as necessary.
Args:
* roll_data (str): the name of a dataframe indexed by security name, with columns
- "date": the first date at which the roll can occur
- "target": the security name we are rolling into
- "factor": the conversion factor. One unit of the original security
rolls into "factor" units of the new one.
Sets:
* target.perm['rolled'] : to keep track of which securities have already rolled
"""
def __init__(self, roll_data):
super(RollPositionsAfterDates, self).__init__()
self.roll_data = roll_data
def __call__(self, target):
if "rolled" not in target.perm:
target.perm["rolled"] = set()
roll_data = target.get_data(self.roll_data)
transactions = {}
# Find securities that are candidate for roll
sec_names = [
sec_name
for sec_name, sec in target.children.items()
if isinstance(sec, SecurityBase)
and sec_name in roll_data.index
and sec_name not in target.perm["rolled"]
]
# Calculate new transaction and close old position
for sec_name, sec_fields in roll_data.loc[sec_names].iterrows():
if sec_fields["date"] <= target.now:
target.perm["rolled"].add(sec_name)
new_quantity = sec_fields["factor"] * target[sec_name].position
new_sec = sec_fields["target"]
if new_sec in transactions:
transactions[new_sec] += new_quantity
else:
transactions[new_sec] = new_quantity
target.close(sec_name, update=False)
# Do all the new transactions at the end, to do any necessary aggregations first
for new_sec, quantity in transactions.items():
target.transact(quantity, new_sec, update=False)
# Now update
target.root.update(target.now)
return True
[docs]class SelectActive(Algo):
"""
Sets temp['selected'] based on filtering temp['selected'] to exclude
those securities that have been closed or rolled after a certain date
using ClosePositionsAfterDates or RollPositionsAfterDates. This makes sure
not to select them again for weighting (even if they have prices).
Requires:
* selected
* perm['closed'] or perm['rolled']
Sets:
* selected
"""
def __call__(self, target):
selected = target.temp["selected"]
rolled = target.perm.get("rolled", set())
closed = target.perm.get("closed", set())
selected = [s for s in selected if s not in set.union(rolled, closed)]
target.temp["selected"] = selected
return True
[docs]class ReplayTransactions(Algo):
"""
Replay a list of transactions that were executed.
This is useful for taking a blotter of actual trades that occurred,
and measuring performance against hypothetical strategies.
In particular, one can replay the outputs of backtest.Result.get_transactions
Note that this allows the timestamps and prices of the reported transactions
to be completely arbitrary, so while the strategy may track performance
on a daily basis, it will accurately account for the actual PNL of
the trades based on where they actually traded, and the bidofferpaid
attribute on the strategy will capture the "slippage" as measured
against the daily prices.
Args:
* transactions (str): name of a MultiIndex dataframe with format
Date, Security | quantity, price.
Note this schema follows the output of backtest.Result.get_transactions
"""
def __init__(self, transactions):
super(ReplayTransactions, self).__init__()
self.transactions = transactions
def __call__(self, target):
timeline = target.data.index
index = timeline.get_loc(target.now)
end = target.now
if index == 0:
start = pd.Timestamp.min
else:
start = timeline[index - 1]
# Get the transactions since the last update
all_transactions = target.get_data(self.transactions)
timestamps = all_transactions.index.get_level_values("Date")
transactions = all_transactions[(timestamps > start) & (timestamps <= end)]
for (_, security), transaction in transactions.iterrows():
c = target[security]
c.transact(
transaction["quantity"], price=transaction["price"], update=False
)
# Now update
target.root.update(target.now)
return True
[docs]class SimulateRFQTransactions(Algo):
"""
An algo that simulates the outcomes from RFQs (Request for Quote)
using a "model" that determines which ones becomes transactions and at what price
those transactions happen. This can be used from the perspective of the sender of the
RFQ or the receiver.
Args:
* rfqs (str): name of a dataframe with columns
Date, Security | quantity, *additional columns as required by model
* model (object): a function/callable object with arguments
- rfqs : data frame of rfqs to respond to
- target : the strategy object, for access to position and value data
and which returns a set of transactions, a MultiIndex DataFrame with:
Date, Security | quantity, price
"""
def __init__(self, rfqs, model):
super(SimulateRFQTransactions, self).__init__()
self.rfqs = rfqs
self.model = model
def __call__(self, target):
timeline = target.data.index
index = timeline.get_loc(target.now)
end = target.now
if index == 0:
start = pd.Timestamp.min
else:
start = timeline[index - 1]
# Get the RFQs since the last update
all_rfqs = target.get_data(self.rfqs)
timestamps = all_rfqs.index.get_level_values("Date")
rfqs = all_rfqs[(timestamps > start) & (timestamps <= end)]
# Turn the RFQs into transactions
transactions = self.model(rfqs, target)
for (_, security), transaction in transactions.iterrows():
c = target[security]
c.transact(
transaction["quantity"], price=transaction["price"], update=False
)
# Now update
target.root.update(target.now)
return True
def _get_unit_risk(security, data, index=None):
try:
unit_risks = data[security]
unit_risk = unit_risks.values[index]
except Exception:
# No risk data, assume zero
unit_risk = 0.0
return unit_risk
[docs]class UpdateRisk(Algo):
"""
Tracks a risk measure on all nodes of the strategy. To use this node, the
``additional_data`` argument on :class:`Backtest <bt.backtest.Backtest>` must
have a "unit_risk" key. The value should be a dictionary, keyed
by risk measure, of DataFrames with a column per security that is sensitive to that measure.
Args:
* name (str): the name of the risk measure (IR01, PVBP, IsIndustials, etc).
The name must coincide with the keys of the dictionary passed to additional_data as the
"unit_risk" argument.
* history (int): The level of depth in the tree at which to track the time series of risk numbers.
i.e. 0=no tracking, 1=first level only, etc. More levels is more expensive.
Modifies:
* The "risk" attribute on the target and all its children
* If history==True, the "risks" attribute on the target and all its children
"""
def __init__(self, measure, history=0):
super(UpdateRisk, self).__init__(name="UpdateRisk>%s" % measure)
self.measure = measure
self.history = history
def _setup_risk(self, target, set_history):
"""Setup risk attributes on the node in question"""
target.risk = {}
if set_history:
target.risks = pd.DataFrame(index=target.data.index)
def _setup_measure(self, target, set_history):
"""Setup a risk measure within the risk attributes on the node in question"""
target.risk[self.measure] = np.NaN
if set_history:
target.risks[self.measure] = np.NaN
def _set_risk_recursive(self, target, depth, unit_risk_frame):
set_history = depth < self.history
# General setup of risk on nodes
if not hasattr(target, "risk"):
self._setup_risk(target, set_history)
if self.measure not in target.risk:
self._setup_measure(target, set_history)
if isinstance(target, bt.core.SecurityBase):
# Use target.root.now as non-traded securities may not have been updated yet
# and there is no need to update them here as we only use position
index = unit_risk_frame.index.get_loc(target.root.now)
unit_risk = _get_unit_risk(target.name, unit_risk_frame, index)
if is_zero(target.position):
risk = 0.0
else:
risk = unit_risk * target.position * target.multiplier
else:
risk = 0.0
for child in target.children.values():
self._set_risk_recursive(child, depth + 1, unit_risk_frame)
risk += child.risk[self.measure]
target.risk[self.measure] = risk
if depth < self.history:
target.risks.loc[target.now, self.measure] = risk
def __call__(self, target):
unit_risk_frame = target.get_data("unit_risk")[self.measure]
self._set_risk_recursive(target, 0, unit_risk_frame)
return True
[docs]class PrintRisk(Algo):
"""
This Algo prints the risk data.
Args:
* fmt_string (str): A string that will later be formatted with the
target object's risk attributes. Therefore, you should provide
what you want to examine within curly braces ( { } )
If not provided, will print the entire dictionary with no formatting.
"""
def __init__(self, fmt_string=""):
super(PrintRisk, self).__init__()
self.fmt_string = fmt_string
def __call__(self, target):
if hasattr(target, "risk"):
if self.fmt_string:
print(self.fmt_string.format(**target.risk))
else:
print(target.risk)
return True
[docs]class HedgeRisks(Algo):
"""
Hedges risk measures with selected instruments.
Make sure that the UpdateRisk algo has been called beforehand.
Args:
* measures (list): the names of the risk measures to hedge
* pseudo (bool): whether to use the pseudo-inverse to compute
the inverse Jacobian. If False, will fail if the number
of selected instruments is not equal to the number of
measures, or if the Jacobian is singular
* strategy (StrategyBase): If provided, will hedge the risk
from this strategy in addition to the risk from target.
This is to allow separate tracking of hedged and unhedged
performance. Note that risk_strategy must occur earlier than
'target' in a depth-first traversal of the children of the root,
otherwise hedging will occur before positions of risk_strategy are
updated.
* throw_nan (bool): Whether to throw on nan hedge notionals, rather
than simply not hedging.
Requires:
* selected
"""
def __init__(self, measures, pseudo=False, strategy=None, throw_nan=True):
super(HedgeRisks, self).__init__()
if len(measures) == 0:
raise ValueError("Must pass in at least one measure to hedge")
self.measures = measures
self.pseudo = pseudo
self.strategy = strategy
self.throw_nan = throw_nan
def _get_target_risk(self, target, measure):
if not hasattr(target, "risk"):
raise ValueError("risk not set up on target %s" % target.name)
if measure not in target.risk:
raise ValueError("measure %s not set on target %s" % (measure, target.name))
return target.risk[measure]
def __call__(self, target):
securities = target.temp["selected"]
# Get target risk
target_risk = np.array(
[self._get_target_risk(target, m) for m in self.measures]
)
if self.strategy is not None:
# Add the target risk of the strategy to the risk of the target
# (which contains existing hedges)
target_risk += np.array(
[self._get_target_risk(self.strategy, m) for m in self.measures]
)
# Turn target_risk into a column array
target_risk = target_risk.reshape(len(self.measures), 1)
# Get hedge risk as a Jacobian matrix
data = []
for m in self.measures:
d = target.get_data("unit_risk").get(m)
if d is None:
raise ValueError(
"unit_risk for %s not present in temp on %s"
% (self.measure, target.name)
)
i = d.index.get_loc(target.now)
data.append((i, d))
hedge_risk = np.array(
[[_get_unit_risk(s, d, i) for (i, d) in data] for s in securities]
)
# Get hedge ratios
if self.pseudo:
inv = np.linalg.pinv(hedge_risk).T
else:
inv = np.linalg.inv(hedge_risk).T
notionals = np.matmul(inv, -target_risk).flatten()
# Hedge
for notional, security in zip(notionals, securities):
if np.isnan(notional) and self.throw_nan:
raise ValueError("%s has nan hedge notional" % security)
target.transact(notional, security)
return True