# 导入相关库 import os import pandas as pd import torch # 导入PyTorch库 import torch.nn as nn # 导入神经网络模块 import tushare as ts # 导入tushare库,用于获取股票数据 import torch.optim as optim # 导入优化器模块 from tqdm import tqdm # 导入tqdm库,用于显示进度条 import matplotlib.pyplot as plt # 导入matplotlib库,用于绘制图表 from copy import deepcopy as copy # 导入deepcopy函数,用于深拷贝对象 from torch.utils.data import DataLoader, TensorDataset # 导入DataLoader和TensorDataset类,用于加载数据 # 获取数据 class GetData: def __init__(self, stock_id, save_path): """ 初始化方法 :param stock_id: 股票id :param save_path: 数据保存路径 """ self.min_value = None self.max_value = None self.stock_id = stock_id self.save_path = save_path self.data = None def getData(self): """ 获取数据 数据处理并保存 :return: None """ """ # 获取股票数据 self.data = ts.get_hist_data(self.stock_id).iloc[::-1] # 选取特定列作为数据 self.data = self.data[["open", "close", "high", "low", "volume"]] # 计算数据列的最大值和最小值 self.max_value = self.data['volume'].max() self.min_value = self.data['volume'].min() # 归一化处理 self.data = self.data.apply(lambda x: (x - min(x)) / (max(x) - min(x))) # 保存数据 self.data.to_csv(self.save_path) return self.data """ # 本地数据data.csv由于是归一化后的数据,所以最大值和最小值并不准确,所以运行结果会有误差,重在体验整个项目的逻辑即可 columns = ['open', 'close', 'high', 'low', 'volume'] self.data = pd.read_csv(self.save_path, names=columns, header=0) # 计算数据列的最大值和最小值 self.max_value = self.data['volume'].max() self.min_value = self.data['volume'].min() return self.data def process_data(self, n): """ 处理数据 :param n: 滑动窗口大小 :return: 训练集的特征、测试集的特征、训练集的标签、测试集的标签 """ if self.data is None: self.getData() # 提取特征和标签数据 """ iloc 是 Pandas 库中用于按位置索引选取数据的方法 """ feature = [ self.data.iloc[i: i + n].values.tolist() for i in range(len(self.data) - n + 2) if i + n < len(self.data) ] label = [ self.data.close.values[i + n] for i in range(len(self.data) - n + 2) if i + n < len(self.data) ] # 划分训练集和数据集 train_x = feature[:500] test_x = feature[500:] train_y = label[:500] test_y = label[500:] return train_x, test_x, train_y, test_y # 搭建LSTM模型: 单层单向LSTM网络+全连接层输出 class Model(nn.Module): def __init__(self, n): # 初始化方法 super(Model, self).__init__() # 调用父类的初始化方法 # 定义LSTM层 输入大小为n, 隐藏层大小为256,批次优先为True self.lstm_layer = nn.LSTM(input_size=n, hidden_size=256, batch_first=True) # 定义全连接层 输入特征数为256, 输出特征数为1 有偏差 self.linear_layer = nn.Linear(in_features=256, out_features=1, bias=True) # 向前传播方法 def forward(self, x): """ x: 输入数据(通常是时间序列的特征) lstm_output: LSTM 层的输出序列 hidden_state: LSTM 的隐藏状态(用于传递长期记忆) cell_state: LSTM 的细胞状态(仅在 LSTM 中存在) final_output: 经过全连接层后的最终输出 """ # LSTM 层的前向传播,得到输出和隐藏状态 lstm_output, (hidden_state, cell_state) = self.lstm_layer(x) # 获取隐藏状态的维度:batch_size, num_layers, hidden_size batch_size, num_layers, hidden_size = hidden_state.shape # 将隐藏状态输入全连接层,需要先展平为二维 final_output = self.linear_layer(hidden_state.view(batch_size * num_layers, hidden_size)) return final_output # 训练模型 def train_model(epoch, train_dataloader, test_dataloader, optimizer, early_stop, model): """ 训练模型的函数 :param model: 模型 :param early_stop: 提前停止的轮数 :param optimizer: 优化器 :param epoch: 训练轮次 :param train_dataloader: 训练数据加载器 :param test_dataloader: 测试数据加载器 :return: """ best_model = None # 用于保存最佳模型 train_loss = 0 # 训练损失 test_loss = 0 # 测试损失 best_loss = 100 # 最佳损失 epoch_cnt = 0 # 训练轮次计数器 for i in range(epoch): total_train_loss = 0 # 训练总损失 total_train_num = 0 # 训练总样本数 total_test_loss = 0 # 测试总损失 total_test_num = 0 # 测试总样本数 for x, y in tqdm(train_dataloader, desc=f"Epoch:{i} | Train Loss:{train_loss} | Test Loss:{test_loss}"): x_num = len(x) # 当前批次样本数 p = model(x) # 模型预测 ✅ 使用 model(x),而不是 Model(x) loss = loss_func(p, y) # 计算损失 optimizer.zero_grad() # 清空梯度 loss.backward() # 反向传播 optimizer.step() # 更新参数 total_train_loss += loss.item() # 训练损失累加 total_train_num += x_num # 训练样本数累加 # 计算训练损失 train_loss = total_train_loss / total_train_num for x, y in test_dataloader: x_num = len(x) # 当前批次样本数 p = model(x) # 模型预测 ✅ 使用 model(x),而不是 Model(x) loss = loss_func(p, y) # 计算损失 optimizer.zero_grad() # 清空梯度 loss.backward() # 反向传播 optimizer.step() # 更新参数 total_test_loss += loss.item() # 测试损失累加 total_test_num += x_num # 测试样本数累加 test_loss = total_test_loss / total_test_num # 如果当前测试损失小于最佳损失,则更新最佳模型和轮次计数器 否则 轮次计数器加1 if test_loss < best_loss: best_loss = test_loss best_model = copy(model) # ✅ 使用 copy(model),而不是 copy(Model torch.save(best_model.state_dict(), './best_model.pth') epoch_cnt = 0 else: epoch_cnt += 1 if epoch_cnt > early_stop: break def test_model(test_dataloader): """ 测试模型,并返回预测值、真实标签和测试损失 :param test_dataloader: 测试数据加载器 :return: pred,label,test_loss """ pred = [] # 预测值列表 label = [] # 真实标签列表 model_f = Model(5) # 创建模型对象 model_f.load_state_dict(torch.load('./best_model.pth')) # 加载最佳模型 model_f.eval() # 设置模型为评估模式 total_test_loss = 0 total_test_num = 0 for x, y in test_dataloader: x_num = len(x) p = model_f(x) # ✅ 使用 model_f(x) loss = loss_func(p, y) total_test_loss += loss.item() total_test_num += x_num # 将预测值和真实标签添加到列表中 pred.extend(p.data.squeeze(1).tolist()) label.extend(y.data.tolist()) # 获取预测值和真实标签 test_loss = total_test_loss / total_test_num return pred, label, test_loss def plot_img(data, pred): """ 绘制真实值与预测值对比图 :param data: 真实标签列表 :param pred: 模型预测值列表 :return: """ # 设置支持中文的字体 plt.rcParams['font.sans-serif'] = ['SimHei'] plt.rcParams['axes.unicode_minus'] = False # 解决负号显示问题 plt.figure(figsize=(14, 8)) # 绘制真实值曲线 plt.plot(range(len(data)), data, color='blue', label='真实值(收盘价)', linewidth=2) # 绘制预测值曲线 plt.plot(range(len(pred)), pred, color='green', label='预测值(模型输出)', linestyle='--', linewidth=2) # 添加预测区间(每5个点绘制一个3天的预测区间) for i in range(0, len(pred) - 3, 5): price = [data[i] + pred[j] - pred[i] for j in range(i, i + 3)] plt.plot(range(i, i + 3), price, color='red', alpha=0.6, linestyle=':', linewidth=1.5) # 设置标题和标签 plt.title('股票价格预测结果对比', fontsize=20) plt.xlabel('时间步(天数)', fontsize=16) plt.ylabel('股票收盘价(亿)', fontsize=16) # 设置刻度字体 plt.xticks(fontproperties='Times New Roman', size=14) plt.yticks(fontproperties='Times New Roman', size=14) # 显示图例 plt.legend(loc='upper left', fontsize=14) # 显示网格 plt.grid(True, linestyle='--', alpha=0.5) # 展示图形 plt.tight_layout() plt.show() if __name__ == '__main__': # 超参数 days_num = 5 # 天数 epoch = 100 # 训练轮次 fea = 5 # 特征数量 batch_size = 20 # 批次大小 early_stop = 5 # 提前停止轮次 # 创建模型对象 model = Model(fea) # 创建数据加载器 gd = GetData(stock_id='601398', save_path='./data.csv') train_x, test_x, train_y, test_y = gd.process_data(days_num) # 将数据转换为张量 train_x = torch.tensor(train_x).float() test_x = torch.tensor(test_x).float() train_y = torch.tensor(train_y).float() test_y = torch.tensor(test_y).float() # 构建训练数据集和测试数据集 train_data = TensorDataset(train_x, train_y) train_dataloader = DataLoader(train_data, batch_size=batch_size) test_data = TensorDataset(test_x, test_y) test_dataloader = DataLoader(test_data, batch_size=batch_size) # 创建损失函数和优化器 loss_func = nn.MSELoss() optimizer = optim.Adam(model.parameters(), lr=0.001) # 训练模型 train_model(epoch, train_dataloader, test_dataloader, optimizer, early_stop, model) # 只有模型存在时才进行测试 if os.path.exists('./best_model.pth'): pred, label, test_loss = test_model(test_dataloader) else: print("模型文件不存在,请先完成训练并确保模型已保存。") # 将预测值和真实标签转换为真实价格 pred = [ele * (gd.max_value - gd.min_value) + gd.min_value for ele in pred] data = [ele * (gd.max_value - gd.min_value) + gd.min_value for ele in label] # 绘制图像 plot_img(data, pred) print(f"模型损失:{test_loss}")