BigTrader 回调函数常用代码
由jliang创建,最终由jliang 被浏览 18 用户
bigtrader.run
- bigtrader 版本要求,最新版本: import bigtrader
- BigTrader可视化模块: TODO
def run(
*,
market: Market = Market.CN_STOCK,
frequency: Frequency = Frequency.DAILY,
instruments: list[str] | None = None,
start_date: str = "",
end_date: str = "",
data: Optional["pd.DataFrame"] = None,
capital_base: float = 1.0e6,
initialize: Callable[["IContext"], None] | None = None,
before_trading_start: Callable[["IContext", "IBarData"], None] | None = None,
handle_data: Callable[["IContext", "IBarData"], None] | None = None,
handle_trade: Callable[["IContext", "ITradeData"], None] | None = None,
handle_order: Callable[["IContext", "IOrderData"], None] | None = None,
handle_tick: Callable[["IContext", "ITickData"], None] | None = None,
handle_l2order: Callable[["IContext", "IL2TradeData"], None] | None = None,
handle_l2trade: Callable[["IContext", "IL2OrderData"], None] | None = None,
after_trading: Callable[["IContext", "IBarData"], None] | None = None,
benchmark: Literal[
"000300.SH", "000905.SH", "000852.SH", "000903.SH", "000001.SH", "000016.SH", "000688.SH", "399001.SZ", "399006.SZ", "399330.SZ", "899050.BJ"
] = "000300.SH",
options_data: Any | None = None,
before_start_days: int = 0,
volume_limit: float = 1,
order_price_field_buy: OrderPriceField = "open",
order_price_field_sell: OrderPriceField = "open",
user_data: UserDataFeed | dict | None = None,
engine: Literal["py", "cpp", "vt"] | None = None,
logger: structlog.BoundLogger = None,
backtest_only: bool = False,
_runner: Runner | None = None,
) -> Performance:
"""Executes a backtest or trading session with the bigtrader engine.
This is the main function to initiate a backtest, paper trading, or live trading session on the BigQuant platform.
It orchestrates the entire process from data loading to engine execution and performance evaluation.
Args:
market (Market): The market to trade in. Defaults to `Market.CN_STOCK`.
frequency (Frequency): The frequency of data bars (e.g., daily, minute). Defaults to `Frequency.DAILY`.
instruments (Optional[List[str]]): List of instruments to trade. If None, uses all instruments in the data. Defaults to None.
start_date (str): Start date for backtest/trading in "YYYY-MM-DD" format. Defaults to "".
end_date (str): End date for backtest/trading in "YYYY-MM-DD" format. Defaults to "".
data (Optional[pd.DataFrame]): User-provided data for signals/factors. Defaults to None.
capital_base (float): Initial capital for the account. Defaults to 1.0e6.
initialize (Optional[Callable[[IContext], None]]): User-defined initialization function. Defaults to None.
initialize (Optional[Callable[[IContext], None]]): User-defined initialization function. The initialization function is executed once each time the program starts. In backtest mode: executed once at the start of the backtest; in daily simulation trading: executed once at the start of each day's simulation trading; in live trading mode: executed once at startup. It is recommended to perform factor/signal calculations here, such as using `dai.query` / pandas DataFrame, etc., for batch loading and vectorized computations, which is generally 10-100 times faster than handling data in `handle_data`. For example, `context.data = dai.query("SELECT date, instrument, close / m_lag(close, 1) AS return_0 FROM cn_stock_bar1d ORDER BY date, return_0 DESC", filters={"date": [pd.to_datetime(context.start_date) + pd.Timedelta(days=10), context.end_date]}).df()`. Defaults to None.
before_trading_start (Optional[Callable[[IContext, IBarData], None]]): Function called before each trading day starts. Defaults to None.
handle_data (Optional[Callable[[IContext, IBarData], None]]): Main function called for each data bar, where trading logic is implemented. Defaults to None.
handle_trade (Optional[Callable[[IContext, ITradeData], None]]): Function called when a trade is executed. Defaults to None.
handle_order (Optional[Callable[[IContext, IOrderData], None]]): Function called when an order status updates. Defaults to None.
handle_tick (Optional[Callable[[IContext, ITickData], None]]): Function called for each tick data update. Defaults to None.
handle_l2order (Optional[Callable[[IContext, IL2TradeData], None]]): Function called for each Level-2 order book update. Defaults to None.
handle_l2trade (Optional[Callable[[IContext, IL2OrderData], None]]): Function called for each Level-2 trade update. Defaults to None.
after_trading (Optional[Callable[[IContext, IBarData], None]]): Function called after each trading day ends. Defaults to None.
benchmark (Literal): Benchmark index for performance evaluation. Defaults to "000300.SH".
options_data (Optional[Any]): Placeholder for options data. Defaults to None.
before_start_days (int): Days to pre-load historical data before start_date. Defaults to 0.
volume_limit (float): Volume limit factor for orders. Defaults to 1.
order_price_field_buy (OrderPriceField): Price field to use for buy orders. Defaults to "open".
order_price_field_sell (OrderPriceField): Price field to use for sell orders. Defaults to "close".
user_data (Optional[Union[UserDataFeed, dict]]): User-defined data feed. Defaults to None.
engine (Optional[Literal]): Execution engine to use ("py", "cpp", "vt"). Defaults to None (auto-select).
logger (Optional[structlog.BoundLogger]): Logger instance for logging. Defaults to None (default logger).
backtest_only (bool): Flag for backtesting only mode. Defaults to False.
Returns:
Performance: A `Performance` object containing the results of the backtest or trading session, including
raw performance data, account performances, market and frequency information, and logger.
"""
简单策略例子
from bigquant import bigtrader
def initialize(context: bigtrader.IContext):
context.set_commission(bigtrader.PerOrder(buy_cost=0.0003, sell_cost=0.0013, min_cost=5))
def handle_data(context: bigtrader.IContext, data: bigtrader.IBarData):
context.order_target_percent("000001.SZ", 1)
performance = bigtrader.run(
market=bigtrader.Market.CN_STOCK,
frequency=bigtrader.Frequency.DAILY,
instruments=["000001.SZ"],
start_date="2025-01-01",
end_date="2025-03-07",
capital_base=1000000,
initialize=initialize,
handle_data=handle_data,
)
performance.render()
initialize 初始化函数
def initialize(context: bigtrader.IContext):
# 股票:设置手续费
context.set_commission(bigtrader.PerOrder(buy_cost=0.0003, sell_cost=0.0013, min_cost=5))
# 根据机器学习预测的 score 来调仓(输入数据已经按 score 有序):持有前10只,权重 1.0 / log2(排序位置 + 4)
context.data = (context.data.groupby('date', group_keys=False)
.head(10)
.reset_index(drop=True))
context.data['weight'] = (context.data.groupby('date')['score']
.rank(method='first', ascending=False)
.pipe(lambda x: 1 / np.log2(x + 4))
.groupby(context.data['date'])
.transform(lambda x: x/x.sum()))
# 调仓周期:5个交易日
context.data = bigtrader.TradingDaysRebalance(5, context=context).select_rebalance_data(df)
handle_data K线处理函数
def handle_data(context: bigtrader.IContext, data: bigtrader.IBarData):
# 买入到百分比 1==100%
context.order_target_percent("000001.SZ", 1)
def handle_data(context: bigtrader.IContext, data: bigtrader.IBarData):
# bigtrader内置函数: 根据 context.data["weight"] 调整仓位
return bigtrader.HandleDataLib.handle_data_weight_based(context, data, show_progress="%Y-%m")
def handle_data(context: bigtrader.IContext, data: bigtrader.IBarData):
# bigtrader内置函数: 根据 context.data["signal"] 触发信号,持有 context.data["weight"] 仓位
return bigtrader.HandleDataLib.handle_data_signal_based(
context,
data,
# 最大持有天数
max_hold_days=5,
# 止盈止损
take_profit=0.2,
stop_loss=0.1,
# 每天仓库量上限
max_open_weights_per_day=1,
show_progress="%Y-%m")
\
bigtrader.HandleDataLib源代码
# bigtrader.HandleDataLib
import typing
from .rebalanceperiod import RebalancePeriod
if typing.TYPE_CHECKING:
from .common import IBarData, IContext
class HandleDataLib:
"""常用的 handle_data 函数库,提供可配置的交易场景实现"""
@staticmethod
def handle_data_weight_based(
context: "IContext",
data: "IBarData",
show_progress: str = None,
) -> None:
"""Rebalance portfolio based on weights defined in context.data(date, instrument, [weight]).
Usage:
In initialize function:
1. Calculate data or factors to weights, save to context.data
2. [Optional] Rebalance period: context.data = bigtrader.TradingDaysRebalance(5, context=context).select_rebalance_data(context.data)
3. [Optional] Market timing/position weight: Calculate overall position weight based on market conditions, multiply to individual stock weights
Rebalancing logic:
Rebalance according to weight column in context.data. Skip non-rebalancing days.
Parameters
----------
context : IContext
The strategy context object that contains data and portfolio information
data : IBarData
Current bar data containing market information
show_progress: str, optional
Show progress: e.g. "%Y-%m"
Returns:
-------
None
Notes:
-----
- If context.data is not defined, the function returns without any action
- If context.rebalance_period is defined, only rebalances on signal dates
- 'weight' column is preferred over 'position' column for portfolio allocation
- If neither weight nor position is specified, equal weight allocation is used
Source code
-----
[bigquant github](https://github.com/BigQuant/bigquant)
"""
if show_progress is not None:
if not hasattr(context, "last_progress"):
context.last_progress = None
current_progress = data.current_dt.strftime(show_progress)
if context.last_progress != current_progress:
context.logger.info(f"Processing {current_progress}...")
context.last_progress = current_progress
if not hasattr(context, "data") or context.data is None:
return
# 检查是否为调仓日
if (
hasattr(context, "rebalance_period")
and context.rebalance_period is not None
and isinstance(context.rebalance_period, RebalancePeriod)
and not context.rebalance_period.is_signal_date()
):
return
df_today = context.data[context.data["date"] == data.current_dt.strftime("%Y-%m-%d")]
if len(df_today) == 1 and df_today["instrument"].iloc[0] is None:
# 非调仓日
return
# 卖出不再持有的股票
for instrument in set(context.get_positions()) - set(df_today["instrument"]):
context.order_target_percent(instrument, 0)
# 买入或调整目标持仓
for _, row in df_today.iterrows():
instrument = row["instrument"]
if "weight" in row:
weight = float(row["weight"])
elif "position" in row:
# @deprecated, use 'weight' instead
weight = float(row["position"])
else:
# if weight is not set, use equal weight
weight = 1 / len(df_today)
context.order_target_percent(instrument, weight)
@staticmethod
def handle_data_signal_based( # noqa: C901
context: "IContext",
data: "IBarData",
max_hold_days: int = None,
take_profit: float = None,
stop_loss: float = None,
max_open_weights_per_day: float = None,
show_progress: str = None,
) -> None:
"""Rebalance portfolio based on signals in context.data.
Assumes context.data contains columns: date, instrument, signal, weight
where signal indicates buy(1), hold(0), or sell(-1) signals.
Usage:
In initialize function:
1. Calculate trading signals, save to context.data
Rebalancing logic:
1. Sell stocks with signal = -1
2. Buy stocks with signal = 1 (if not already held)
3. Keep current positions for stocks with signal = 0
Parameters
----------
context : IContext
The strategy context object that contains data and portfolio information
data : IBarData
Current bar data containing market information
max_hold_days : int, optional
Maximum number of days to hold a position before selling
take_profit : float, optional
Percentage gain at which to take profits (e.g., 0.1 for 10%)
stop_loss : float, optional
Percentage loss at which to cut losses (e.g., 0.05 for 5%)
show_progress: str, optional
Show progress: e.g. "%Y-%m"
Returns:
-------
None
Raises:
------
Exception
If the "signal" column is not found in context.data
Notes:
-----
- If context.data is not defined, the function returns without any action
- If context.rebalance_period is defined, only rebalances on signal dates
- Position management includes max holding period, profit taking, and stop loss
Source code
-----
[bigquant github](https://github.com/BigQuant/bigquant)
"""
if show_progress is not None:
if not hasattr(context, "last_progress"):
context.last_progress = None
current_progress = data.current_dt.strftime(show_progress)
if context.last_progress != current_progress:
context.logger.info(f"Processing {current_progress}...")
context.last_progress = current_progress
if not hasattr(context, "data") or context.data is None:
return
# 检查是否为调仓日
if hasattr(context, "rebalance_period") and context.rebalance_period is not None and not context.rebalance_period.is_signal_date():
return
df_today = context.data[context.data["date"] == data.current_dt.strftime("%Y-%m-%d")]
if "signal" not in df_today.columns:
raise Exception("not found signal column in context.data")
if "weight" not in df_today.columns:
raise Exception("not found weight column in context.data")
if len(df_today) == 0 or (len(df_today) == 1 and df_today["instrument"].iloc[0] is None):
return
# 处理持仓到期/止盈/止损
if max_hold_days is not None:
for _, position in context.get_positions().items():
# 检查是否到期
if position.hold_days >= max_hold_days:
context.record_log("INFO", f"{position.instrument} has been held for {position.hold_days} days, executing position close")
context.order_target_percent(position.instrument, 0)
if take_profit is not None:
for _, position in context.get_positions().items():
# 检查是否止盈
profit_ratio = position.last_price / position.cost_price - 1
if profit_ratio >= take_profit:
context.record_log("INFO", f"{position.instrument} profit {profit_ratio:.2%}, triggering take profit")
context.order_target_percent(position.instrument, 0)
if stop_loss is not None:
for _, position in context.get_positions().items():
# 检查是否止损
loss_ratio = position.last_price / position.cost_price - 1
if loss_ratio <= -stop_loss:
context.record_log("INFO", f"{position.instrument} loss {-loss_ratio:.2%}, triggering stop loss")
context.order_target_percent(position.instrument, 0)
# 处理平仓信号
sell_instruments = df_today[df_today["signal"] == -1]["instrument"].tolist()
for instrument in sell_instruments:
if instrument in context.get_positions():
context.order_target_percent(instrument, 0)
# 处理开仓信号 (weight 为正表示开多,为负表示开空)
buy_df = df_today[df_today["signal"] == 1]
open_weights = 0.0
for _, row in buy_df.iterrows():
weight = float(row["weight"])
context.order_target_percent(row["instrument"], weight)
if max_open_weights_per_day is not None:
open_weights += abs(weight)
if open_weights >= max_open_weights_per_day:
break
\