Files
ModelTrainingPython/TVS_DL/TVS_LSTM.py

190 lines
6.3 KiB
Python
Raw Normal View History

2025-10-17 14:59:16 +08:00
import torch
import torch.nn as nn
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from torch.utils.data import Dataset, DataLoader
import matplotlib.pyplot as plt
# 参数设置
SEQ_LENGTH = 10 # 时间序列窗口长度
PRE_LENGTH = 1 #预测时间序列窗口长度
BATCH_SIZE = 4096
EPOCHS = 2000
HIDDEN_SIZE = 64
HIDDEN_LAYER = 3 #隐藏层
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# 数据预处理
def process_data(file_path):
# 读取Excel文件跳过首行使用科学计数法解析
df = pd.read_excel(file_path, header=None, skiprows=1,sheet_name="sample1")
# 提取输入输出数据
inputs = df.iloc[:, 1:9].values.astype(np.float32) # 第2-9列
outputs = df.iloc[:, 10:19].values.astype(np.float32) # 第11-19列
print("标准化之前:")
ori = inputs
print(inputs)
# 数据标准化
input_scaler = StandardScaler()
output_scaler = StandardScaler()
inputs = input_scaler.fit_transform(inputs)
outputs = output_scaler.fit_transform(outputs)
#
# input_scaler = MinMaxScaler(feature_range=(-1, 1))
# output_scaler = MinMaxScaler(feature_range=(-1, 1))
# inputs = input_scaler.fit_transform(inputs)
# outputs = output_scaler.fit_transform(outputs)
print("标准化之后:")
print(inputs)
# 创建对比可视化
plt.figure(figsize=(12, 8))
colors = plt.cm.tab10(np.arange(8)) # 生成8种不同颜色
# 绘制图像通道数据分布
plt.subplot(2, 2, 1)
plt.hist(ori, bins=30, alpha=0.7,color=colors, label='Original')
plt.title('Image Channel (Original)')
plt.xlabel('Pixel Value')
plt.subplot(2, 2, 1)
plt.hist(inputs, bins=30, alpha=0.7, color=colors, label='StandardScaler')
plt.title('StandardScaler Comparison')
plt.xlabel('StandardScaler Value')
plt.legend()
plt.tight_layout()
plt.show()
return inputs, outputs, input_scaler, output_scaler
# 创建序列数据集 look_back依据时间序列 pred_step推测时间序列
def create_sequences(inputs, outputs, look_back=8, pred_step=1):
X, y = [], []
for i in range(len(inputs) - look_back - pred_step):
X.append(inputs[i:i + look_back])
y.append(outputs[(i+look_back):(i+look_back+pred_step)])
return torch.FloatTensor(np.array(X)), torch.FloatTensor(np.array(y))
# 自定义Dataset
class TimeSeriesDataset(Dataset):
def __init__(self, X, y):
self.X = X
self.y = y
def __len__(self):
return len(self.X)
def __getitem__(self, idx):
return self.X[idx], self.y[idx]
# 在训练循环前初始化损失记录列表
train_losses = []
test_losses = []
# LSTM模型
class LSTMModel(nn.Module):
def __init__(self, input_size, hidden_size, output_size, num_lay):
super().__init__()
self.lstm = nn.LSTM(
input_size=input_size,
hidden_size=hidden_size,
batch_first=True,
num_layers = num_lay, # 增加LSTM层数
# bidirectional = True, # 使用双向LSTM
# dropout = 0.2 # 添加正则化
)
self.fc = nn.Linear(hidden_size, output_size)
def forward(self, x):
out, (h_n, c_n) = self.lstm(x)
out = self.fc(out[:, -1, :]) # 取最后一个时间步的输出
return out
# 主流程
if __name__ == "__main__":
# 数据准备,归一化和标准化
inputs, outputs, input_scaler, output_scaler = process_data("D:\liyong\文档\项目文档\中汽TVS\机器学习\降阶模型数据.xlsx")
print(inputs[:5])#输出前五个
print(outputs[:5])#输出前五个
X, y = create_sequences(inputs, outputs, SEQ_LENGTH,PRE_LENGTH)
# 数据集分割
split = int(0.8 * len(X))
train_dataset = TimeSeriesDataset(X[:split], y[:split])
test_dataset = TimeSeriesDataset(X[split:], y[split:])
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)
# 模型初始化
model = LSTMModel(
input_size=8, # 输入特征数
hidden_size=HIDDEN_SIZE,
output_size=8 , # 输出特征数
num_lay = HIDDEN_LAYER
).to(DEVICE)
# 训练配置
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.0001)
for epoch in range(EPOCHS):
model.train()
epoch_train_loss = 0
for batch_X, batch_y in train_loader:
batch_X, batch_y = batch_X.to(DEVICE), batch_y.to(DEVICE)
outputs = model(batch_X)
loss = criterion(outputs, batch_y)
optimizer.zero_grad()
loss.backward()
optimizer.step()
epoch_train_loss += loss.item() * batch_X.size(0)
# 计算平均训练损失并记录
avg_train_loss = epoch_train_loss / len(train_loader.dataset)
train_losses.append(avg_train_loss)
# 验证损失计算(原代码逻辑)
model.eval()
test_loss = 0
with torch.no_grad():
for batch_X, batch_y in test_loader:
batch_X, batch_y = batch_X.to(DEVICE), batch_y.to(DEVICE)
preds = model(batch_X)
test_loss += criterion(preds, batch_y).item() * batch_X.size(0)
avg_test_loss = test_loss / len(test_loader.dataset)
test_losses.append(avg_test_loss)
# 输出损失比例
if epoch % 10 == 0:
print(
f"Epoch {epoch} | Train Loss: {avg_train_loss:.4f} | Test Loss: {avg_test_loss:.4f} | 损失比: {avg_train_loss / avg_test_loss:.2f}:1")
torch.save(model,"D:\liyong\lstm.pth")
# 动态绘制损失曲线
plt.figure(figsize=(10, 5))
plt.plot(train_losses, label='Train Loss', color='blue', alpha=0.7)
plt.plot(test_losses, label='Test Loss', color='red', alpha=0.7)
plt.title("LSTM TrainLine (train vs test)")
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.legend()
plt.grid(True, linestyle='--', alpha=0.5)
# //plt.savefig('training_loss_curve.png', dpi=300) # 保存高清图像
plt.show()
# 示例预测
sample_input = X[0:1].to(DEVICE) # 取第一个样本
prediction = model(sample_input)
print("Sample Prediction:", prediction)
print("Real Value:", y[0:1])