首页 > Python资料 博客日记
python 实现LSTM时间序列预测(转载)
2024-10-12 20:00:06Python资料围观64次
这篇文章介绍了python 实现LSTM时间序列预测(转载),分享给大家做个参考,收藏Python资料网收获更多编程知识
导入相关库
In [55]:
import pandas as pd import numpy as np import os import sys import time import logging import warnings from logging.handlers import RotatingFileHandler import matplotlib.pyplot as plt from sklearn.model_selection import train_test_split import torch from torch.nn import Module, LSTM, Linear from torch.utils.data import DataLoader, TensorDataset
数据集模块
In [56]:
class Data: def __init__(self, config): self.config = config self.data, self.data_column_name = self.read_data() self.data_num = self.data.shape[0] self.train_num = int(self.data_num * self.config.train_data_rate) self.mean = np.mean(self.data, axis=0) self.std = np.std(self.data, axis=0) self.norm_data = (self.data - self.mean) / self.std # 归一化,去量纲 self.start_num_in_test = 0 # 测试集中前几天的数据会被删掉,因为它不够一个time_step def read_data(self): if self.config.debug_mode: init_data = pd.read_csv(self.config.train_data_path, nrows=self.config.debug_num, usecols=self.config.feature_columns) else: init_data = pd.read_csv(self.config.train_data_path, usecols=self.config.feature_columns) return init_data.values, init_data.columns.tolist() # .columns.tolist() 是获取列名 def get_train_and_valid_data(self): feature_data = self.norm_data[:self.train_num] label_data = self.norm_data[self.config.predict_day: self.config.predict_day + self.train_num, self.config.label_in_feature_index] # 将延后几天的数据作为label if not self.config.do_continue_train: # 在非连续训练模式下,每time_step行数据会作为一个样本,两个样本错开一行 # 比如:1-20行,2-21行··· train_x = [feature_data[i:i + self.config.time_step] for i in range(self.train_num - self.config.time_step)] train_y = [label_data[i:i + self.config.time_step] for i in range(self.train_num - self.config.time_step)] else: # 在连续训练模式下,每time_step行数据会作为一个样本,两个样本错开time_step行, # 比如:1-20行,21-40行···到数据末尾,然后又是 2-21行,22-41行。。。到数据末尾,…… train_x = [ feature_data[start_index + i * self.config.time_step: start_index + (i + 1) * self.config.time_step] for start_index in range(self.config.time_step) for i in range((self.train_num - start_index) // self.config.time_step)] train_y = [ label_data[start_index + i * self.config.time_step: start_index + (i + 1) * self.config.time_step] for start_index in range(self.config.time_step) for i in range((self.train_num - start_index) // self.config.time_step)] train_x, train_y = np.array(train_x), np.array(train_y) # 划分训练和验证集,并打乱 train_x, valid_x, train_y, valid_y = train_test_split(train_x, train_y, test_size=self.config.valid_data_rate, random_state=self.config.random_seed, shuffle=self.config.shuffle_train_data) return train_x, valid_x, train_y, valid_y def get_test_data(self, return_label_data=False): feature_data = self.norm_data[self.train_num:] sample_interval = min(feature_data.shape[0], self.config.time_step) # 防止time_step大于测试集数量 self.start_num_in_test = feature_data.shape[0] % sample_interval # 这些天的数据不够一个sample_interval time_step_size = feature_data.shape[0] // sample_interval # 在测试数据中,每time_step行数据会作为一个样本,两个样本错开time_step行 # 比如:1-20行,21-40行···到数据末尾。 test_x = [feature_data[ self.start_num_in_test + i * sample_interval: self.start_num_in_test + (i + 1) * sample_interval] for i in range(time_step_size)] if return_label_data: # 实际应用中的测试集是没有label数据的 label_data = self.norm_data[self.train_num + self.start_num_in_test:, self.config.label_in_feature_index] return np.array(test_x), label_data return np.array(test_x)
建立LSTM时间序列预测模型
In [57]:
class Net(Module): ''' pytorch预测模型,包括LSTM时序预测层和Linear回归输出层 ''' def __init__(self, config): super(Net, self).__init__() self.lstm = LSTM(input_size=config.input_size, hidden_size=config.hidden_size, num_layers=config.lstm_layers, batch_first=True, dropout=config.dropout_rate) self.linear = Linear(in_features=config.hidden_size, out_features=config.output_size) def forward(self, x, hidden=None): lstm_out, hidden = self.lstm(x, hidden) linear_out = self.linear(lstm_out) return linear_out, hidden
模型训练模块
In [58]:
def train(config, logger, train_and_valid_data): if config.do_train_visualized: import visdom vis = visdom.Visdom(env='model_pytorch') train_X, train_Y, valid_X, valid_Y = train_and_valid_data train_X, train_Y = torch.from_numpy(train_X).float(), torch.from_numpy(train_Y).float() train_loader = DataLoader(TensorDataset(train_X, train_Y), batch_size=config.batch_size) valid_X, valid_Y = torch.from_numpy(valid_X).float(), torch.from_numpy(valid_Y).float() valid_loader = DataLoader(TensorDataset(valid_X, valid_Y), batch_size=config.batch_size) device = torch.device("cuda:0" if config.use_cuda and torch.cuda.is_available() else "cpu") model = Net(config).to(device) if config.add_train: model.load_state_dict(torch.load(config.model_save_path + config.model_name)) optimizer = torch.optim.Adam(model.parameters(), lr=config.learning_rate) criterion = torch.nn.MSELoss() valid_loss_min = float("inf") bad_epoch = 0 global_step = 0 for epoch in range(config.epoch): logger.info("Epoch {}/{}".format(epoch, config.epoch)) model.train() train_loss_array = [] hidden_train = None for i, _data in enumerate(train_loader): _train_X, _train_Y = _data[0].to(device),_data[1].to(device) optimizer.zero_grad() pred_Y, hidden_train = model(_train_X, hidden_train) if not config.do_continue_train: hidden_train = None else: h_0, c_0 = hidden_train h_0.detach_(), c_0.detach_() # 去掉梯度信息 hidden_train = (h_0, c_0) loss = criterion(pred_Y, _train_Y) # 计算loss loss.backward() # 将loss反向传播 optimizer.step() # 用优化器更新参数 train_loss_array.append(loss.item()) global_step += 1 if config.do_train_visualized and global_step % 100 == 0: vis.line(X=np.array([global_step]), Y=np.array([loss.item()]), win='Train_Loss', update='append' if global_step > 0 else None, name='Train', opts=dict(showlegend=True)) # 以下为早停机制,当模型训练连续config.patience个epoch都没有使验证集预测效果提升时,就停止,防止过拟合 model.eval() valid_loss_array = [] hidden_valid = None for _valid_X, _valid_Y in valid_loader: _valid_X, _valid_Y = _valid_X.to(device), _valid_Y.to(device) pred_Y, hidden_valid = model(_valid_X, hidden_valid) if not config.do_continue_train: hidden_valid = None loss = criterion(pred_Y, _valid_Y) valid_loss_array.append(loss.item()) train_loss_cur = np.mean(train_loss_array) valid_loss_cur = np.mean(valid_loss_array) logger.info("The train loss is {:.6f}. ".format(train_loss_cur) + "The valid loss is {:.6f}.".format(valid_loss_cur)) if config.do_train_visualized: vis.line(X=np.array([epoch]), Y=np.array([train_loss_cur]), win='Epoch_Loss', update='append' if epoch > 0 else None, name='Train', opts=dict(showlegend=True)) vis.line(X=np.array([epoch]), Y=np.array([valid_loss_cur]), win='Epoch_Loss', update='append' if epoch > 0 else None, name='Eval', opts=dict(showlegend=True)) if valid_loss_cur < valid_loss_min: valid_loss_min = valid_loss_cur bad_epoch = 0 torch.save(model.state_dict(), config.model_save_path + config.model_name) else: bad_epoch += 1 # 如果验证集指标连续patience个epoch没有提升,就停掉训练 if bad_epoch >= config.patience: logger.info(" The training stops early in epoch {}".format(epoch)) break
模型预测模块
In [59]:
def predict(config, test_X): # 获取测试数据 test_X = torch.from_numpy(test_X).float() test_set = TensorDataset(test_X) test_loader = DataLoader(test_set, batch_size=1) # 加载模型 device = torch.device("cuda:0" if config.use_cuda and torch.cuda.is_available() else "cpu") model = Net(config).to(device) model.load_state_dict(torch.load(config.model_save_path + config.model_name)) # 加载模型参数 # 先定义一个tensor保存预测结果 result = torch.Tensor().to(device) # 预测过程 model.eval() hidden_predict = None for _data in test_loader: data_X = _data[0].to(device) pred_X, hidden_predict = model(data_X, hidden_predict) cur_pred = torch.squeeze(pred_X, dim=0) result = torch.cat((result, cur_pred), dim=0) return result.detach().cpu().numpy() # 先去梯度信息,如果在gpu要转到cpu,最后要返回numpy数据
项目配置模块
In [60]:
class Config: # 数据参数 feature_columns = list(range(1, 15)) label_columns = [14] label_in_feature_index = (lambda x, y: [x.index(i) for i in y])(feature_columns, label_columns) predict_day = 5 # 预测未来多少天 # 网络参数 input_size = len(feature_columns) output_size = len(label_columns) hidden_size = 64 lstm_layers = 4 dropout_rate = 0.2 time_step = 10 # 训练参数 do_train = False do_predict = not do_train add_train = False shuffle_train_data = True use_cuda = True train_data_rate = 0.95 valid_data_rate = 0.2 batch_size = 256 learning_rate = 0.001 epoch = 3000 patience = 800 random_seed = 42 do_continue_train = False continue_flag = "" if do_continue_train: shuffle_train_data = False batch_size = 1 continue_flag = "continue_" if do_predict: train_data_rate = 0 # 训练模式 debug_mode = False debug_num = 500 # 框架参数 used_frame = "pytorch" model_name = "model_" + continue_flag + "pytorch.pth" # 路径参数 train_data_path = "Data.csv" model_save_path = "./checkpoint/" figure_save_path = "./figure/" log_save_path = "./log/" do_log_print_to_screen = True do_log_save_to_file = True do_figure_save = True do_train_visualized = False if not os.path.exists(model_save_path): os.makedirs(model_save_path) if not os.path.exists(figure_save_path): os.mkdir(figure_save_path) if do_train and (do_log_save_to_file or do_train_visualized): cur_time = time.strftime("%Y_%m_%d_%H_%M_%S", time.localtime()) log_save_path = log_save_path + cur_time + "/" os.makedirs(log_save_path)
log日志记录模块
In [61]:
def load_logger(config): logger = logging.getLogger() logger.setLevel(level=logging.DEBUG) # StreamHandler if config.do_log_print_to_screen: stream_handler = logging.StreamHandler(sys.stdout) stream_handler.setLevel(level=logging.INFO) formatter = logging.Formatter(datefmt='%Y/%m/%d %H:%M:%S', fmt='[ %(asctime)s ] %(message)s') stream_handler.setFormatter(formatter) logger.addHandler(stream_handler) # FileHandler if config.do_log_save_to_file: file_handler = RotatingFileHandler(config.log_save_path + "out.log", maxBytes=1024000, backupCount=5, encoding='utf-8') file_handler.setLevel(level=logging.INFO) formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') file_handler.setFormatter(formatter) logger.addHandler(file_handler) # 把config信息也记录到log 文件中 config_dict = {} for key in dir(config): if not key.startswith("_"): config_dict[key] = getattr(config, key) config_str = str(config_dict) config_list = config_str[1:-1].split(", '") config_save_str = "\nConfig:\n" + "\n'".join(config_list) logger.info(config_save_str) return logger
绘图模块
In [62]:
def draw(config: Config, origin_data: Data, logger, predict_norm_data: np.ndarray): label_data = origin_data.data[origin_data.train_num + origin_data.start_num_in_test:, config.label_in_feature_index] predict_data = predict_norm_data * origin_data.std[config.label_in_feature_index] + \ origin_data.mean[config.label_in_feature_index] assert label_data.shape[0] == predict_data.shape[0], "The element number in origin and predicted data is different" label_name = [origin_data.data_column_name[i] for i in config.label_in_feature_index] label_column_num = len(config.label_columns) # label 和 predict 是错开config.predict_day天的数据的 loss = np.mean((label_data[config.predict_day:] - predict_data[:-config.predict_day]) ** 2, axis=0) loss_norm = loss / (origin_data.std[config.label_in_feature_index] ** 2) logger.info("The mean squared error of stock {} is ".format(label_name) + str(loss_norm)) label_X = range(origin_data.data_num - origin_data.train_num - origin_data.start_num_in_test) predict_X = [x + config.predict_day for x in label_X] for i in range(label_column_num): plt.figure(i + 1) plt.plot(label_X, label_data[:, i], label='真实值', color='red') plt.plot(predict_X, predict_data[:, i], label='预测值', color='blue') plt.title("{}预测图".format(label_name[i]), fontname="SimHei") plt.legend(loc="upper left") logger.info("The predicted stock {} for the next {} day(s) is: ".format(label_name[i], config.predict_day) + str(np.squeeze(predict_data[-config.predict_day:, i]))) if config.do_figure_save: plt.savefig(config.figure_save_path + "{}predict_{}.png".format(config.continue_flag, label_name[i])) plt.show()
主文件
In [ ]:
if __name__ == "__main__": warnings.filterwarnings("ignore") plt.style.use('seaborn') plt.rcParams['font.sans-serif'] = 'Microsoft Yahei' config = Config() logger = load_logger(config) try: np.random.seed(config.random_seed) data_gainer = Data(config) if config.do_train: train_X, valid_X, train_Y, valid_Y = data_gainer.get_train_and_valid_data() train(config, logger, [train_X, train_Y, valid_X, valid_Y]) if config.do_predict: test_X, test_Y = data_gainer.get_test_data(return_label_data=True) pred_result = predict(config, test_X) draw(config, data_gainer, logger, pred_result) except Exception: logger.error("Run Error", exc_info=True)
版权声明:本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:jacktools123@163.com进行投诉反馈,一经查实,立即删除!
标签:
相关文章
最新发布
- 光流法结合深度学习神经网络的原理及应用(完整代码都有Python opencv)
- Python 图像处理进阶:特征提取与图像分类
- 大数据可视化分析-基于python的电影数据分析及可视化系统_9532dr50
- 【Python】入门(运算、输出、数据类型)
- 【Python】第一弹---解锁编程新世界:深入理解计算机基础与Python入门指南
- 华为OD机试E卷 --第k个排列 --24年OD统一考试(Java & JS & Python & C & C++)
- Python已安装包在import时报错未找到的解决方法
- 【Python】自动化神器PyAutoGUI —告别手动操作,一键模拟鼠标键盘,玩转微信及各种软件自动化
- Pycharm连接SQL Sever(详细教程)
- Python编程练习题及解析(49题)
点击排行
- 版本匹配指南:Numpy版本和Python版本的对应关系
- 版本匹配指南:PyTorch版本、torchvision 版本和Python版本的对应关系
- Python 可视化 web 神器:streamlit、Gradio、dash、nicegui;低代码 Python Web 框架:PyWebIO
- 相关性分析——Pearson相关系数+热力图(附data和Python完整代码)
- Anaconda版本和Python版本对应关系(持续更新...)
- Python与PyTorch的版本对应
- Windows上安装 Python 环境并配置环境变量 (超详细教程)
- Python pyinstaller打包exe最完整教程