190 lines
6.3 KiB
Python
190 lines
6.3 KiB
Python
|
|
import torch
|
|||
|
|
import torch.nn as nn
|
|||
|
|
import pandas as pd
|
|||
|
|
import numpy as np
|
|||
|
|
from sklearn.preprocessing import StandardScaler, MinMaxScaler
|
|||
|
|
from torch.utils.data import Dataset, DataLoader
|
|||
|
|
import matplotlib.pyplot as plt
|
|||
|
|
|
|||
|
|
# 参数设置
|
|||
|
|
SEQ_LENGTH = 10 # 时间序列窗口长度
|
|||
|
|
PRE_LENGTH = 1 #预测时间序列窗口长度
|
|||
|
|
BATCH_SIZE = 4096
|
|||
|
|
EPOCHS = 2000
|
|||
|
|
HIDDEN_SIZE = 64
|
|||
|
|
HIDDEN_LAYER = 3 #隐藏层
|
|||
|
|
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
|
|||
|
|
|
|||
|
|
|
|||
|
|
# 数据预处理
|
|||
|
|
def process_data(file_path):
|
|||
|
|
# 读取Excel文件,跳过首行,使用科学计数法解析
|
|||
|
|
df = pd.read_excel(file_path, header=None, skiprows=1,sheet_name="sample1")
|
|||
|
|
|
|||
|
|
# 提取输入输出数据
|
|||
|
|
inputs = df.iloc[:, 1:9].values.astype(np.float32) # 第2-9列
|
|||
|
|
outputs = df.iloc[:, 10:19].values.astype(np.float32) # 第11-19列
|
|||
|
|
print("标准化之前:")
|
|||
|
|
ori = inputs
|
|||
|
|
print(inputs)
|
|||
|
|
# 数据标准化
|
|||
|
|
input_scaler = StandardScaler()
|
|||
|
|
output_scaler = StandardScaler()
|
|||
|
|
inputs = input_scaler.fit_transform(inputs)
|
|||
|
|
outputs = output_scaler.fit_transform(outputs)
|
|||
|
|
#
|
|||
|
|
# input_scaler = MinMaxScaler(feature_range=(-1, 1))
|
|||
|
|
# output_scaler = MinMaxScaler(feature_range=(-1, 1))
|
|||
|
|
# inputs = input_scaler.fit_transform(inputs)
|
|||
|
|
# outputs = output_scaler.fit_transform(outputs)
|
|||
|
|
print("标准化之后:")
|
|||
|
|
print(inputs)
|
|||
|
|
|
|||
|
|
# 创建对比可视化
|
|||
|
|
plt.figure(figsize=(12, 8))
|
|||
|
|
colors = plt.cm.tab10(np.arange(8)) # 生成8种不同颜色
|
|||
|
|
# 绘制图像通道数据分布
|
|||
|
|
plt.subplot(2, 2, 1)
|
|||
|
|
plt.hist(ori, bins=30, alpha=0.7,color=colors, label='Original')
|
|||
|
|
plt.title('Image Channel (Original)')
|
|||
|
|
plt.xlabel('Pixel Value')
|
|||
|
|
|
|||
|
|
plt.subplot(2, 2, 1)
|
|||
|
|
plt.hist(inputs, bins=30, alpha=0.7, color=colors, label='StandardScaler')
|
|||
|
|
plt.title('StandardScaler Comparison')
|
|||
|
|
plt.xlabel('StandardScaler Value')
|
|||
|
|
plt.legend()
|
|||
|
|
|
|||
|
|
plt.tight_layout()
|
|||
|
|
plt.show()
|
|||
|
|
|
|||
|
|
return inputs, outputs, input_scaler, output_scaler
|
|||
|
|
|
|||
|
|
|
|||
|
|
# 创建序列数据集 look_back:依据时间序列 pred_step:推测时间序列
|
|||
|
|
def create_sequences(inputs, outputs, look_back=8, pred_step=1):
|
|||
|
|
X, y = [], []
|
|||
|
|
for i in range(len(inputs) - look_back - pred_step):
|
|||
|
|
X.append(inputs[i:i + look_back])
|
|||
|
|
y.append(outputs[(i+look_back):(i+look_back+pred_step)])
|
|||
|
|
return torch.FloatTensor(np.array(X)), torch.FloatTensor(np.array(y))
|
|||
|
|
|
|||
|
|
|
|||
|
|
# 自定义Dataset
|
|||
|
|
class TimeSeriesDataset(Dataset):
|
|||
|
|
def __init__(self, X, y):
|
|||
|
|
self.X = X
|
|||
|
|
self.y = y
|
|||
|
|
|
|||
|
|
def __len__(self):
|
|||
|
|
return len(self.X)
|
|||
|
|
|
|||
|
|
def __getitem__(self, idx):
|
|||
|
|
return self.X[idx], self.y[idx]
|
|||
|
|
# 在训练循环前初始化损失记录列表
|
|||
|
|
train_losses = []
|
|||
|
|
test_losses = []
|
|||
|
|
|
|||
|
|
# LSTM模型
|
|||
|
|
class LSTMModel(nn.Module):
|
|||
|
|
def __init__(self, input_size, hidden_size, output_size, num_lay):
|
|||
|
|
super().__init__()
|
|||
|
|
self.lstm = nn.LSTM(
|
|||
|
|
input_size=input_size,
|
|||
|
|
hidden_size=hidden_size,
|
|||
|
|
batch_first=True,
|
|||
|
|
num_layers = num_lay, # 增加LSTM层数
|
|||
|
|
# bidirectional = True, # 使用双向LSTM
|
|||
|
|
# dropout = 0.2 # 添加正则化
|
|||
|
|
)
|
|||
|
|
self.fc = nn.Linear(hidden_size, output_size)
|
|||
|
|
|
|||
|
|
def forward(self, x):
|
|||
|
|
out, (h_n, c_n) = self.lstm(x)
|
|||
|
|
out = self.fc(out[:, -1, :]) # 取最后一个时间步的输出
|
|||
|
|
return out
|
|||
|
|
|
|||
|
|
|
|||
|
|
# 主流程
|
|||
|
|
if __name__ == "__main__":
|
|||
|
|
# 数据准备,归一化和标准化
|
|||
|
|
inputs, outputs, input_scaler, output_scaler = process_data("D:\liyong\文档\项目文档\中汽TVS\机器学习\降阶模型数据.xlsx")
|
|||
|
|
|
|||
|
|
print(inputs[:5])#输出前五个
|
|||
|
|
print(outputs[:5])#输出前五个
|
|||
|
|
X, y = create_sequences(inputs, outputs, SEQ_LENGTH,PRE_LENGTH)
|
|||
|
|
|
|||
|
|
# 数据集分割
|
|||
|
|
split = int(0.8 * len(X))
|
|||
|
|
train_dataset = TimeSeriesDataset(X[:split], y[:split])
|
|||
|
|
test_dataset = TimeSeriesDataset(X[split:], y[split:])
|
|||
|
|
|
|||
|
|
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
|
|||
|
|
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)
|
|||
|
|
|
|||
|
|
# 模型初始化
|
|||
|
|
model = LSTMModel(
|
|||
|
|
input_size=8, # 输入特征数
|
|||
|
|
hidden_size=HIDDEN_SIZE,
|
|||
|
|
output_size=8 , # 输出特征数
|
|||
|
|
num_lay = HIDDEN_LAYER
|
|||
|
|
).to(DEVICE)
|
|||
|
|
|
|||
|
|
# 训练配置
|
|||
|
|
criterion = nn.MSELoss()
|
|||
|
|
optimizer = torch.optim.Adam(model.parameters(), lr=0.0001)
|
|||
|
|
|
|||
|
|
for epoch in range(EPOCHS):
|
|||
|
|
model.train()
|
|||
|
|
epoch_train_loss = 0
|
|||
|
|
|
|||
|
|
for batch_X, batch_y in train_loader:
|
|||
|
|
batch_X, batch_y = batch_X.to(DEVICE), batch_y.to(DEVICE)
|
|||
|
|
outputs = model(batch_X)
|
|||
|
|
loss = criterion(outputs, batch_y)
|
|||
|
|
|
|||
|
|
optimizer.zero_grad()
|
|||
|
|
loss.backward()
|
|||
|
|
optimizer.step()
|
|||
|
|
|
|||
|
|
epoch_train_loss += loss.item() * batch_X.size(0)
|
|||
|
|
|
|||
|
|
# 计算平均训练损失并记录
|
|||
|
|
avg_train_loss = epoch_train_loss / len(train_loader.dataset)
|
|||
|
|
train_losses.append(avg_train_loss)
|
|||
|
|
|
|||
|
|
# 验证损失计算(原代码逻辑)
|
|||
|
|
model.eval()
|
|||
|
|
test_loss = 0
|
|||
|
|
with torch.no_grad():
|
|||
|
|
for batch_X, batch_y in test_loader:
|
|||
|
|
batch_X, batch_y = batch_X.to(DEVICE), batch_y.to(DEVICE)
|
|||
|
|
preds = model(batch_X)
|
|||
|
|
test_loss += criterion(preds, batch_y).item() * batch_X.size(0)
|
|||
|
|
|
|||
|
|
avg_test_loss = test_loss / len(test_loader.dataset)
|
|||
|
|
test_losses.append(avg_test_loss)
|
|||
|
|
|
|||
|
|
# 输出损失比例
|
|||
|
|
if epoch % 10 == 0:
|
|||
|
|
print(
|
|||
|
|
f"Epoch {epoch} | Train Loss: {avg_train_loss:.4f} | Test Loss: {avg_test_loss:.4f} | 损失比: {avg_train_loss / avg_test_loss:.2f}:1")
|
|||
|
|
torch.save(model,"D:\liyong\lstm.pth")
|
|||
|
|
# 动态绘制损失曲线
|
|||
|
|
plt.figure(figsize=(10, 5))
|
|||
|
|
plt.plot(train_losses, label='Train Loss', color='blue', alpha=0.7)
|
|||
|
|
plt.plot(test_losses, label='Test Loss', color='red', alpha=0.7)
|
|||
|
|
plt.title("LSTM TrainLine (train vs test)")
|
|||
|
|
plt.xlabel("Epoch")
|
|||
|
|
plt.ylabel("Loss")
|
|||
|
|
plt.legend()
|
|||
|
|
plt.grid(True, linestyle='--', alpha=0.5)
|
|||
|
|
# //plt.savefig('training_loss_curve.png', dpi=300) # 保存高清图像
|
|||
|
|
plt.show()
|
|||
|
|
|
|||
|
|
# 示例预测
|
|||
|
|
sample_input = X[0:1].to(DEVICE) # 取第一个样本
|
|||
|
|
prediction = model(sample_input)
|
|||
|
|
|
|||
|
|
print("Sample Prediction:", prediction)
|
|||
|
|
print("Real Value:", y[0:1])
|