
等级: 新手上路
- 注册:
- 2025-10-20
- 曾用名:
|

楼主 |
发表于 2025-10-20 14:07
|
显示全部楼层
1、条件A,条件B 条件C,A:=(3*CLOSE+LOW+OPEN+HIGH)/6;
B=MA(A,8);
C=MA(CLOSE, 120);
2、买入的股票为自选里面股票
3、每次买入资金为总资金5%
4、总持仓资金小于80%
5、当股价大于C,且A大于B且股价与A相差小于1%时买入
6、买入的股票持仓亏损超过3%全部卖掉或者买入后该股票30分钟图A小于B时合部卖掉或者买入股票后盈利超过15%卖掉该股的50%,盈利超过30%再卖掉50%
# 金字塔量化交易系统 - 自定义交易指标
# 适用于金字塔Python策略
import pandas as pd
import numpy as np
from PythonApi import *
def init(context):
"""
初始化函数
"""
# 全局变量设置
context.initial_capital = context.total_cash() # 初始资金
context.watch_list = [] # 自选股票池
context.position_size_ratio = 0.05 # 每次买入资金比例5%
context.max_position_ratio = 0.8 # 最大持仓比例80%
context.stop_loss_ratio = 0.03 # 止损比例3%
context.profit_sell_ratio1 = 0.15 # 盈利卖出比例15%
context.profit_sell_ratio2 = 0.30 # 盈利卖出比例30%
# 持仓状态记录
context.position_states = {}
# 设置定时器,每日检查交易条件
scheduler.run_daily(check_trading_conditions, time_rule="09:30:00")
log.info("交易系统初始化完成")
def handle_bar(context):
"""
K线回调函数
"""
# 可以在此添加实时监控逻辑
pass
def calculate_MID(data):
"""
计算MID指标
MID = (3*CLOSE + LOW + OPEN + HIGH) / 6
"""
return (3 * data['close'] + data['low'] + data['open'] + data['high']) / 6
def calculate_A(data, period=20):
"""
计算指标A - 加权移动平均
"""
mid = calculate_MID(data)
# 计算加权移动平均
weights = np.arange(period, 0, -1) # 20,19,18,...,1
total_weight = np.sum(weights)
# 创建MID的历史数据数组
mid_history = []
for i in range(period):
if i < len(mid):
mid_history.append(mid.iloc[-(i+1)])
else:
mid_history.append(mid.iloc[-1])
# 计算加权平均
weighted_sum = 0
for i, weight in enumerate(weights):
weighted_sum += mid_history[i] * weight
return weighted_sum / total_weight
def calculate_B(data, period=8):
"""
计算指标B - 强势界的8日移动平均
注意:这里假设强势界就是指标A
"""
A_values = []
for i in range(period):
if i < len(data):
slice_data = data.iloc[-(i+1):]
A_val = calculate_A(slice_data)
A_values.append(A_val)
else:
A_values.append(calculate_A(data))
return np.mean(A_values)
def calculate_C(data, period=60):
"""
计算指标C - 60日移动平均
"""
if len(data) < period:
return data['close'].mean()
return data['close'].rolling(window=period).mean().iloc[-1]
def check_trading_conditions(context):
"""
检查交易条件的主函数
"""
# 更新自选股列表(示例,实际需要从金字塔获取或设置)
if not context.watch_list:
# 这里可以设置默认自选股或从金字塔板块获取
context.watch_list = ["SZ000001", "SH600036", "SZ000858"]
log.info("设置自选股: {}".format(context.watch_list))
# 检查总持仓比例
total_position_value = context.portfolio_value - context.total_cash()
total_assets = context.portfolio_value
if total_position_value >= total_assets * context.max_position_ratio:
log.info("持仓已达上限{}%,暂停买入".format(context.max_position_ratio * 100))
return
# 遍历自选股检查买入条件
for symbol in context.watch_list:
try:
# 获取历史数据(日线)
hist_data = context.get_history(symbol, 100, "1d")
if hist_data is None or len(hist_data) < 60:
continue
# 计算指标
A_value = calculate_A(hist_data)
B_value = calculate_B(hist_data)
C_value = calculate_C(hist_data)
current_price = hist_data['close'].iloc[-1]
# 记录指标值(可选,用于调试)
log.debug("{} - A:{:.3f}, B:{:.3f}, C:{:.3f}, 价格:{:.3f}".format(
symbol, A_value, B_value, C_value, current_price))
# 检查买入条件
buy_condition1 = current_price > C_value # 股价大于C
buy_condition2 = A_value > B_value # A大于B
buy_condition3 = abs(current_price - A_value) / A_value < 0.01 # 股价与A相差小于1%
if buy_condition1 and buy_condition2 and buy_condition3:
# 执行买入
execute_buy(context, symbol, current_price, A_value, B_value, C_value)
# 检查持仓股票的卖出条件
if symbol in context.positions and context.positions[symbol].amount > 0:
check_sell_conditions(context, symbol, current_price, A_value, B_value)
except Exception as e:
log.error("处理股票{}时出错: {}".format(symbol, str(e)))
continue
def execute_buy(context, symbol, price, A_value, B_value, C_value):
"""
执行买入操作
"""
# 计算买入资金量
buy_amount = context.portfolio_value * context.position_size_ratio
shares = int(buy_amount / price)
if shares == 0:
log.warning("{} 计算股数为0,跳过买入".format(symbol))
return
# 执行买入
order_id = buy_open(symbol, "Limit", price, shares)
if order_id:
# 记录买入信息
if symbol not in context.position_states:
context.position_states[symbol] = {}
context.position_states[symbol].update({
'buy_price': price,
'buy_time': context.current_dt,
'A_value': A_value,
'B_value': B_value,
'C_value': C_value,
'first_profit_sold': False,
'second_profit_sold': False
})
log.info("买入信号触发: {},价格:{:.3f},数量:{}".format(symbol, price, shares))
log.info("指标状态 - A:{:.3f}, B:{:.3f}, C:{:.3f}".format(A_value, B_value, C_value))
else:
log.error("{} 买入下单失败".format(symbol))
def check_sell_conditions(context, symbol, current_price, A_daily, B_daily):
"""
检查卖出条件
"""
if symbol not in context.position_states:
return
position_info = context.position_states[symbol]
position = context.positions[symbol]
buy_price = position_info['buy_price']
# 计算盈亏比例
profit_ratio = (current_price - buy_price) / buy_price
# 条件1: 持仓亏损超过3%全部卖掉
if profit_ratio <= -context.stop_loss_ratio:
sell_close(symbol, "Market", position.amount)
log.info("止损卖出: {},亏损比例:{:.2%}".format(symbol, profit_ratio))
# 清理状态记录
if symbol in context.position_states:
del context.position_states[symbol]
return
try:
# 条件2: 30分钟图A小于B时全部卖掉
# 获取30分钟数据计算指标
hist_30min = context.get_history(symbol, 50, "30m")
if hist_30min is not None and len(hist_30min) >= 20:
A_30min = calculate_A(hist_30min)
B_30min = calculate_B(hist_30min)
if A_30min < B_30min:
sell_close(symbol, "Market", position.amount)
log.info("30分钟指标触发卖出: {},A_30min:{:.3f}, B_30min:{:.3f}".format(
symbol, A_30min, B_30min))
# 清理状态记录
if symbol in context.position_states:
del context.position_states[symbol]
return
except Exception as e:
log.warning("获取30分钟数据失败: {}".format(str(e)))
# 条件3: 盈利超过15%卖掉50%
if profit_ratio >= context.profit_sell_ratio1 and not position_info.get('first_profit_sold', False):
sell_shares = position.amount // 2
if sell_shares > 0:
sell_close(symbol, "Market", sell_shares)
context.position_states[symbol]['first_profit_sold'] = True
log.info("盈利15%卖出50%: {},盈利比例:{:.2%}".format(symbol, profit_ratio))
# 条件4: 盈利超过30%再卖掉50%
if profit_ratio >= context.profit_sell_ratio2 and position_info.get('first_profit_sold', False) and not position_info.get('second_profit_sold', False):
remaining_shares = position.amount
sell_shares = remaining_shares // 2
if sell_shares > 0:
sell_close(symbol, "Market", sell_shares)
context.position_states[symbol]['second_profit_sold'] = True
log.info("盈利30%卖出剩余50%: {},盈利比例:{:.2%}".format(symbol, profit_ratio))
def before_trading(context):
"""
盘前处理
"""
log.info("开始新交易日")
log.info("当前总资产: {:.2f}, 可用资金: {:.2f}".format(
context.portfolio_value, context.total_cash()))
def after_trading(context):
"""
盘后处理
"""
# 输出当日交易总结
total_position_value = context.portfolio_value - context.total_cash()
position_ratio = total_position_value / context.portfolio_value if context.portfolio_value > 0 else 0
log.info("交易日结束")
log.info("总资产: {:.2f}, 持仓比例: {:.2%}".format(
context.portfolio_value, position_ratio))
# 输出持仓状态
if context.positions:
log.info("当前持仓:")
for symbol, position in context.positions.items():
if symbol in context.position_states:
buy_price = context.position_states[symbol]['buy_price']
current_price = position.current_price
profit_ratio = (current_price - buy_price) / buy_price
log.info(" {}: 数量{}, 成本{:.3f}, 现价{:.3f}, 盈亏{:.2%}".format(
symbol, position.amount, buy_price, current_price, profit_ratio))
# 辅助函数:设置自选股
def set_watch_list(context, stock_list):
"""
设置自选股票池
"""
context.watch_list = stock_list
log.info("更新自选股列表: {}".format(stock_list))
# 辅助函数:更新参数
def update_parameters(context, position_ratio=None, stop_loss=None, profit1=None, profit2=None):
"""
更新交易参数
"""
if position_ratio is not None:
context.position_size_ratio = position_ratio
if stop_loss is not None:
context.stop_loss_ratio = stop_loss
if profit1 is not None:
context.profit_sell_ratio1 = profit1
if profit2 is not None:
context.profit_sell_ratio2 = profit2
log.info("参数更新 - 买入比例:{:.1%}, 止损:{:.1%}, 止盈1:{:.1%}, 止盈2:{:.1%}".format(
context.position_size_ratio, context.stop_loss_ratio,
context.profit_sell_ratio1, context.profit_sell_ratio2))
|
|