
等级: 新手上路
- 注册:
- 2025-6-19
- 曾用名:
|

楼主 |
发表于 2025-8-26 23:02
|
显示全部楼层
我现在均线的问题一直解决不了,怎样让数据每天都自动下载,自动存储,以供获取到足够的数据去计算均线,比如我做5分钟周期白银连续。
以下截图设置帮我看一下对不对,这样是不是可以每天自动下载数据。
以下是我均线的代码,帮我看一下对不对,搞了太久,不知道哪里出问题了,总是不对。
def update_ma_values(context: Any, bars: np.ndarray, prev_index: int):
"""更新均线值(使用简单移动平均,与金字塔前台一致)"""
max_ma = max(context.ma_periods)
if bars is None or len(bars) < max_ma:
log_error(f"数据不足计算均线: {len(bars) if bars else 0}/{max_ma}")
return
# 解析收盘价 - 使用所有可用K线(包括夜盘数据)
close_prices = []
bar_times = []
for i in range(len(bars)):
bar_data = parse_bar_data(bars, i)
close_prices.append(bar_data['close'])
# 记录K线时间用于调试
bar_time = get_bar_time_period(bars, i)
bar_times.append(bar_time)
# 确保有足够的数据计算均线
if len(close_prices) < max_ma:
log_error(f"数据不足计算均线: {len(close_prices)}/{max_ma}")
return
# 计算均线值(简单移动平均)
for period in context.ma_periods:
if len(close_prices) >= period:
# 使用前period根K线的收盘价计算均线
ma_value = np.mean(close_prices[-period:])
context.ma_values[period] = ma_value
# 调试信息:输出计算均线使用的K线时间段
if context.first_run and period in [5, 10]:
used_bars = bar_times[-period:]
log_info(f"MA{period}计算使用的K线时间段: {used_bars}")
else:
log_error(f"数据不足计算MA{period},需要{period}根,现有{len(close_prices)}根")
def get_kline_data(context: Any, bar_count: int = None, include_now: bool = False) -> np.ndarray:
"""获取K线数据"""
if bar_count is None:
bar_count = max(context.ma_periods) + 20 # 默认多取一些
try:
# 关键修改:使用include_now参数控制是否包含当前K线
bars = history_bars(
context.symbol,
bar_count,
context.period,
['datetime', 'open', 'high', 'low', 'close'],
skip_suspended=True,
include_now=include_now, # 控制是否包含当前K线
adjusted_price=False
)
if bars is None or len(bars) == 0:
log_error("获取K线数据返回空")
return np.array([])
return bars
except Exception as e:
log_error(f"获取K线数据异常: {str(e)}")
return np.array([])
def parse_bar_data(bars: np.ndarray, index: int) -> dict:
"""解析K线数据,返回包含OHLC的字典"""
bar_data = {}
if hasattr(bars, 'dtype') and bars.dtype.names: # 结构化数组
# 金字塔返回的结构化数组,字段名可能是英文
field_names = bars.dtype.names
# 查找正确的字段名
open_field = None
high_field = None
low_field = None
close_field = None
for name in field_names:
if 'open' in name.lower() or '开盘' in name.lower():
open_field = name
elif 'high' in name.lower() or '最高' in name.lower():
high_field = name
elif 'low' in name.lower() or '最低' in name.lower():
low_field = name
elif 'close' in name.lower() or '收盘' in name.lower():
close_field = name
# 使用找到的字段名获取数据
bar_data['open'] = bars[open_field][index] if open_field else bars[index][1]
bar_data['high'] = bars[high_field][index] if high_field else bars[index][2]
bar_data['low'] = bars[low_field][index] if low_field else bars[index][3]
bar_data['close'] = bars[close_field][index] if close_field else bars[index][4]
else: # 普通数组
# 金字塔返回的普通数组,顺序通常是: [datetime, open, high, low, close, ...]
bar_data['open'] = bars[index][1] # 索引1为开盘价
bar_data['high'] = bars[index][2] # 索引2为最高价
bar_data['low'] = bars[index][3] # 索引3为最低价
bar_data['close'] = bars[index][4] # 索引4为收盘价
return bar_data
def get_bar_time_period(bars: np.ndarray, index: int) -> str:
"""获取K线的时间段(正确处理时区)"""
try:
# 获取K线的时间戳
if hasattr(bars, 'dtype') and bars.dtype.names and 'datetime' in bars.dtype.names:
bar_time = bars['datetime'][index]
else:
bar_time = bars[index][0]
# 金字塔的datetime格式通常是YYYYMMDDHHMMSS
time_str = str(int(bar_time))
if len(time_str) < 12:
time_str = time_str.zfill(12)
# 提取日期和时间部分
date_str = time_str[:8] # YYYYMMDD
hour = int(time_str[8:10])
minute = int(time_str[10:12])
# 创建datetime对象
bar_datetime = datetime.datetime(
int(date_str[:4]), int(date_str[4:6]), int(date_str[6:8]),
hour, minute
)
# 根据数据源时区转换为北京时间
beijing_time = convert_bar_time_to_beijing(bar_datetime)
# 计算时间段
start_time = beijing_time.time()
end_time_obj = beijing_time + datetime.timedelta(minutes=5)
end_time = end_time_obj.time()
return f"{start_time.strftime('%H:%M')}-{end_time.strftime('%H:%M')}"
except Exception as e:
log_error(f"获取K线时间段错误: {str(e)}")
return "未知时间段" |
本帖子中包含更多资源
您需要 登录 才可以下载或查看,没有帐号?
x
|