這一部分的API可以參考官網(wǎng):https://www.backtrader.com/docu/strategy/
Backtrader的策略中有四個常用的函數(shù):
next()
都會觸發(fā)next()
都會觸發(fā)其中:notify_order(order)
與notify_trade(trade)
是需要觸發(fā)訂單和交易觸發(fā)才可以執(zhí)行,可以跟蹤訂單和交易情況
剩余的函數(shù)執(zhí)行順序是:next()
->notify_cashvalue(cash, value)
->notify_fund(cash, value, fundvalue, shares)
,可以做每日報告
from datetime import datetime import backtrader from loguru import logger import matplotlib.pyplot as plt from utils import get_k_data class MyStrategy1(backtrader.Strategy): # 策略 def __init__(self): # 初始化交易指令、買賣價格和手續(xù)費 self.close_price = self.datas[0].close # 這里加一個數(shù)據(jù)引用,方便后續(xù)操作 self.sma = backtrader.indicators.SimpleMovingAverage(self.datas[0], period=5) # 借用這個策略,計算5日的均線 def notify_order(self, order): # 當訂單狀態(tài)變化時觸發(fā) # 通知訂單狀態(tài) if order.status in [order.Submitted, order.Accepted]: # 接受訂單交易,正常情況 return if order.status in [order.Completed]: if order.isbuy(): logger.debug('已買入, 購入金額 {:.2f}, 費用 {:.2f} ,手續(xù)費{:.2f}'.format( order.executed.price, order.executed.value, order.executed.comm)) elif order.issell(): logger.debug('已賣出, 賣出金額 {:.2f}, 費用 {:.2f} ,手續(xù)費{:.2f}'.format( order.executed.price, order.executed.value, order.executed.comm)) elif order.status in [order.Canceled, order.Margin, order.Rejected]: logger.debug('訂單取消、保證金不足、金額不足拒絕交易') def notify_trade(self, trade): if not trade.isclosed: return logger.debug('交易利潤, 毛利潤 {:.2}, 凈利潤 {:.2}'.format(trade.pnl, trade.pnlcomm)) def notify_cashvalue(self, cash, value): # 現(xiàn)金,總價值 logger.info('notify_cashvalue報告 —— 今天是:{},當前現(xiàn)金:{},總價值:{}', self.datetime.date(), cash, value) def notify_fund(self, cash, value, fundvalue, shares): # 當前現(xiàn)金、現(xiàn)有總價值、基金價值和股份份額 logger.info('notify_fund報告 —— 今天是:{},現(xiàn)金:{},總價值:{}', self.datetime.date(), cash, value) def next(self): # 固定的函數(shù),框架執(zhí)行過程中會不斷循環(huán)next(),過一個K線,執(zhí)行一次next() # 此時調(diào)用 self.datas[0]即可查看當天的數(shù)據(jù) # 執(zhí)行買入條件判斷:當天收盤價格突破5日均線 if self.close_price[0] > self.sma[0]: # 執(zhí)行買入 logger.debug('buy 500 in {}, 預期購入金額 {}, 剩余可用資金 {}', self.datetime.date(), self.data.close[0], self.broker.getcash()) self.buy(size=500, price=self.data.close[0]) # 執(zhí)行賣出條件已有持倉,且收盤價格跌破5日均線 if self.position: if self.close_price[0] < self.sma[0]: # 執(zhí)行賣出 logger.debug('sell in {}, 預期賣出金額 {}, 剩余可用資金 {}', self.datetime.date(), self.data.close[0], self.broker.getcash()) self.sell(size=500, price=self.data.close[0]) if __name__ == '__main__': # 獲取數(shù)據(jù) start_time = datetime(2015, 1, 1) end_time = datetime(2021, 1, 1) dataframe = get_k_data('600519', begin=start_time, end=end_time) # =============== 為系統(tǒng)注入數(shù)據(jù) ================= # 加載數(shù)據(jù) data = backtrader.feeds.PandasData(dataname=dataframe, fromdate=start_time, todate=end_time) # 初始化cerebro回測系統(tǒng) cerebral_system = backtrader.Cerebro() # Cerebro引擎在后臺創(chuàng)建了broker(經(jīng)紀人)實例,系統(tǒng)默認每個broker的初始資金量為10000 # 將數(shù)據(jù)傳入回測系統(tǒng) cerebral_system.adddata(data) # 導入數(shù)據(jù),在策略中使用 self.datas 來獲取數(shù)據(jù)源 # 將交易策略加載到回測系統(tǒng)中 cerebral_system.addstrategy(MyStrategy1) # =============== 系統(tǒng)設置 ================== # 設置啟動資金為 100000 start_cash = 1000000 cerebral_system.broker.setcash(start_cash) # 設置手續(xù)費 萬2.5 cerebral_system.broker.setcommission(commission=0.00025) logger.debug('初始資金: {} 回測期間:from {} to {}'.format(start_cash, start_time, end_time)) # 運行回測系統(tǒng) cerebral_system.run() # 獲取回測結束后的總資金 portvalue = cerebral_system.broker.getvalue() pnl = portvalue - start_cash # 打印結果 logger.debug('凈收益: {}', pnl) logger.debug('總資金: {}', portvalue) cerebral_system.plot(style='candlestick') plt.show()
其中utils
中的get_k_data
函數(shù)如下:
import efinance import pandas as pd from datetime import datetime def get_k_data(stock_code, begin: datetime, end: datetime) -> pd.DataFrame: ''' 根據(jù)efinance工具包獲取股票數(shù)據(jù) :param stock_code:股票代碼 :param begin: 開始日期 :param end: 結束日期 :return: ''' # stock_code = '600519' # 股票代碼,茅臺 k_dataframe: pd.DataFrame = efinance.stock.get_quote_history( stock_code, beg=begin.strftime('%Y%m%d'), end=end.strftime('%Y%m%d')) k_dataframe = k_dataframe.iloc[:, :9] k_dataframe.columns = ['name', 'code', 'date', 'open', 'close', 'high', 'low', 'volume', 'turnover'] k_dataframe.index = pd.to_datetime(k_dataframe.date) k_dataframe.drop(['name', 'code', 'date'], axis=1, inplace=True) return k_dataframe