import torch import torch.nn as nn import pandas as pd import numpy as np from sklearn.preprocessing import StandardScaler, MinMaxScaler from torch.utils.data import Dataset, DataLoader import matplotlib.pyplot as plt # 参数设置 SEQ_LENGTH = 10 # 时间序列窗口长度 PRE_LENGTH = 1 #预测时间序列窗口长度 BATCH_SIZE = 4096 EPOCHS = 2000 HIDDEN_SIZE = 64 HIDDEN_LAYER = 3 #隐藏层 DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu") # 数据预处理 def process_data(file_path): # 读取Excel文件,跳过首行,使用科学计数法解析 df = pd.read_excel(file_path, header=None, skiprows=1,sheet_name="sample1") # 提取输入输出数据 inputs = df.iloc[:, 1:9].values.astype(np.float32) # 第2-9列 outputs = df.iloc[:, 10:19].values.astype(np.float32) # 第11-19列 print("标准化之前:") ori = inputs print(inputs) # 数据标准化 input_scaler = StandardScaler() output_scaler = StandardScaler() inputs = input_scaler.fit_transform(inputs) outputs = output_scaler.fit_transform(outputs) # # input_scaler = MinMaxScaler(feature_range=(-1, 1)) # output_scaler = MinMaxScaler(feature_range=(-1, 1)) # inputs = input_scaler.fit_transform(inputs) # outputs = output_scaler.fit_transform(outputs) print("标准化之后:") print(inputs) # 创建对比可视化 plt.figure(figsize=(12, 8)) colors = plt.cm.tab10(np.arange(8)) # 生成8种不同颜色 # 绘制图像通道数据分布 plt.subplot(2, 2, 1) plt.hist(ori, bins=30, alpha=0.7,color=colors, label='Original') plt.title('Image Channel (Original)') plt.xlabel('Pixel Value') plt.subplot(2, 2, 1) plt.hist(inputs, bins=30, alpha=0.7, color=colors, label='StandardScaler') plt.title('StandardScaler Comparison') plt.xlabel('StandardScaler Value') plt.legend() plt.tight_layout() plt.show() return inputs, outputs, input_scaler, output_scaler # 创建序列数据集 look_back:依据时间序列 pred_step:推测时间序列 def create_sequences(inputs, outputs, look_back=8, pred_step=1): X, y = [], [] for i in range(len(inputs) - look_back - pred_step): X.append(inputs[i:i + look_back]) y.append(outputs[(i+look_back):(i+look_back+pred_step)]) return torch.FloatTensor(np.array(X)), torch.FloatTensor(np.array(y)) # 自定义Dataset class TimeSeriesDataset(Dataset): def __init__(self, X, y): self.X = X self.y = y def __len__(self): return len(self.X) def __getitem__(self, idx): return self.X[idx], self.y[idx] # 在训练循环前初始化损失记录列表 train_losses = [] test_losses = [] # LSTM模型 class LSTMModel(nn.Module): def __init__(self, input_size, hidden_size, output_size, num_lay): super().__init__() self.lstm = nn.LSTM( input_size=input_size, hidden_size=hidden_size, batch_first=True, num_layers = num_lay, # 增加LSTM层数 # bidirectional = True, # 使用双向LSTM # dropout = 0.2 # 添加正则化 ) self.fc = nn.Linear(hidden_size, output_size) def forward(self, x): out, (h_n, c_n) = self.lstm(x) out = self.fc(out[:, -1, :]) # 取最后一个时间步的输出 return out # 主流程 if __name__ == "__main__": # 数据准备,归一化和标准化 inputs, outputs, input_scaler, output_scaler = process_data("D:\liyong\文档\项目文档\中汽TVS\机器学习\降阶模型数据.xlsx") print(inputs[:5])#输出前五个 print(outputs[:5])#输出前五个 X, y = create_sequences(inputs, outputs, SEQ_LENGTH,PRE_LENGTH) # 数据集分割 split = int(0.8 * len(X)) train_dataset = TimeSeriesDataset(X[:split], y[:split]) test_dataset = TimeSeriesDataset(X[split:], y[split:]) train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True) test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False) # 模型初始化 model = LSTMModel( input_size=8, # 输入特征数 hidden_size=HIDDEN_SIZE, output_size=8 , # 输出特征数 num_lay = HIDDEN_LAYER ).to(DEVICE) # 训练配置 criterion = nn.MSELoss() optimizer = torch.optim.Adam(model.parameters(), lr=0.0001) for epoch in range(EPOCHS): model.train() epoch_train_loss = 0 for batch_X, batch_y in train_loader: batch_X, batch_y = batch_X.to(DEVICE), batch_y.to(DEVICE) outputs = model(batch_X) loss = criterion(outputs, batch_y) optimizer.zero_grad() loss.backward() optimizer.step() epoch_train_loss += loss.item() * batch_X.size(0) # 计算平均训练损失并记录 avg_train_loss = epoch_train_loss / len(train_loader.dataset) train_losses.append(avg_train_loss) # 验证损失计算(原代码逻辑) model.eval() test_loss = 0 with torch.no_grad(): for batch_X, batch_y in test_loader: batch_X, batch_y = batch_X.to(DEVICE), batch_y.to(DEVICE) preds = model(batch_X) test_loss += criterion(preds, batch_y).item() * batch_X.size(0) avg_test_loss = test_loss / len(test_loader.dataset) test_losses.append(avg_test_loss) # 输出损失比例 if epoch % 10 == 0: print( f"Epoch {epoch} | Train Loss: {avg_train_loss:.4f} | Test Loss: {avg_test_loss:.4f} | 损失比: {avg_train_loss / avg_test_loss:.2f}:1") torch.save(model,"D:\liyong\lstm.pth") # 动态绘制损失曲线 plt.figure(figsize=(10, 5)) plt.plot(train_losses, label='Train Loss', color='blue', alpha=0.7) plt.plot(test_losses, label='Test Loss', color='red', alpha=0.7) plt.title("LSTM TrainLine (train vs test)") plt.xlabel("Epoch") plt.ylabel("Loss") plt.legend() plt.grid(True, linestyle='--', alpha=0.5) # //plt.savefig('training_loss_curve.png', dpi=300) # 保存高清图像 plt.show() # 示例预测 sample_input = X[0:1].to(DEVICE) # 取第一个样本 prediction = model(sample_input) print("Sample Prediction:", prediction) print("Real Value:", y[0:1])