计算机系统应用教程网站

网站首页 > 技术文章 正文

如何将卷积神经网络应用在一维时间序列数据上?

btikc 2024-09-12 11:50:09 技术文章 9 ℃ 0 评论

这篇是ICLR上用TCN来做一般的时间序列分析的论文,在Rebuttal之后的分数为888,算得上是时间序列领域相关的论文中最高分那一档了。本文提出了一个ModernTCN的模型,实现起来也很简单,所以我后面附上了模型的代码实现。

论文链接:https://link.zhihu.com/?target=https%3A//openreview.net/forum%3Fid%3DvpJMJerXHU

Key Point

Motivation

作者发现,在时间序列领域,最近基于TCN/CNN的模型效果没有基于Transformer或MLP的模型效果好,而一些现代的CNN比如ConvNeXt、SLaK的性能都超过了Vision Transformer。因此,作者想探究卷积是不是可以在时间序列分析领域获得更好的性能。为此,有两点可以改善TCN模型的地方。

首先是要提升感受力。在CV领域,现代卷积都有着很大的卷积核。作者发现在时间序列领域差不多,可以看下图:

SCINet和MICN是两个基于TCN的预测模型,它们的感受野都很小。作者发现ModernTCN中采用大的卷积核所对应的感受野要大很多。

其次是充分利用卷积可以捕获跨变量依赖性,也就是多变量时间序列中变量之间的关系。在PatchTST等最近的时间序列预测文章中,很多方法采用了通道独立策略,这种策略直接将多变量序列预测中变量之间关系忽略了,反而取得了更好的效果。作者认为,变量之间关系仍然重要,但是要精心设计模型结构来捕获。

从CV中汲取灵感(现代卷积结构)

在CV中,很多人发现Transformer之所以成功,可能是因为架构比较好。比如下图左侧,self-attention负责token之间的混合,FFN负责通道之间的混合,两者分离开。同样的,把混合token的结构替换为深度分离卷积(depth-wise 卷积,DWConv),把FFN换为完全等价的ConvFFN(由两个point-wise Conv加GeLU激活组成)。不熟悉depth-wise卷积的可以去了解一下,它其实就是对每个通道采用独立的核,这样就不会混合通道,只会混合token,大卷积核来获取大感受野也是在这里用的

然而,作者发现采用上图(b)的结构构建的模型效果也不是特别好,这是因为这个现代卷积结构中并没有专门为时间序列设计的一些特殊的东西,一个重要的就是如何建模跨变量依赖性。注意,在这里要区分通道和变量之间的关系。变量是指多变量序列中每个变量,通道是指每个变量映射到的隐空间维度(而PatchTST中提到的通道独立则是变量之间独立,这个不要混淆)。ConvFFN可以建模通道间关系,但无法建模变量间关系。

适用于时间序列的改动(变量间建模)

首先,在embedding的过程中,cv一般是直接混合RGB变量。而在时间序列中,这种方式不适用,因为一个简单的embedding显然无法充分建模变量间关系。如果在embedding时就已经把变量混合了起来,那后续对变量间的建模则是混乱的。因此,作者提出了变量无关embedding,也是用了分patch的方法,对每个变量独立分patch进行embedding。具体在代码实现上,作者是采用有stride的卷积,在这里我给出了代码实现,先介绍下代码相关的注释:

# B:batch size
# M:多变量序列的变量数
# L:过去序列的长度
# T: 预测序列的长度
# N: 分Patch后Patch的个数
# D:每个变量的通道数
# P:kernel size of embedding layer
# S:stride of embedding layer

Embedding模块先将 B\times M\times L 的输入unsqueeze,新增一个通道维,然后pad之后(方便整除)应用有stride的1D卷积来进行patch embedding,如下:

class Embedding(nn.Module):
    def __init__(self, P=8, S=4, D=2048):
        super(Embedding, self).__init__()
        self.P = P
        self.S = S
        self.conv = nn.Conv1d(
            in_channels=1, 
            out_channels=D, 
            kernel_size=P, 
            stride=S
            )

    def forward(self, x):
        # x: [B, M, L]
        B = x.shape[0]
        x = x.unsqueeze(2)  # [B, M, L] -> [B, M, 1, L]
        x = rearrange(x, 'b m r l -> (b m) r l')  # [B, M, 1, L] -> [B*M, 1, L]
        x_pad = F.pad(
            x,
            pad=(0, self.P-self.S),
            mode='replicate'
            )  # [B*M, 1, L] -> [B*M, 1, L+P-S]
        
        x_emb = self.conv(x_pad)  # [B*M, 1, L+P-S] -> [B*M, D, N]
        x_emb = rearrange(x_emb, '(b m) d n -> b m d n', b=B)  # [B*M, D, N] -> [B, M, D, N]

        return x_emb  # x_emb: [B, M, D, N]

在Embedding之后,作者用一些堆叠的Block来进行建模。每个Block如下图:

上图中DWconv用来建模时间关系,第一个ConvFFN用来建模通道关系,第二个ConvFFN用来建模变量关系。下面介绍具体的实现,注意看上图中shape在每一个模块的前后变化。

首先,希望用DWConv来建模时间上的关系,但又不希望它参与到通道间和变量间的建模上。因此,作者将M和D这两个表示变量和通道的维度reshape在一起,再进行深度可分离卷积。

其次,希望独立建模通道和变量。因此,作者采用了两个组卷积,其中一个组卷积的Group数为M(表示每D个通道构成一个组,因此用来建模通道间关系),另一个组卷积的Group数为D(表示每M个变量构成一个组,因此用来建模变量间关系)。注意,两个组卷积之间存在着reshape和permute操作,这是为了正确的分组,最后会再reshape和permute回去。

最后,整体再用一个残差连接,即可得到最终的ModernTCN block。ModernTCN block的代码实现在最后,堆叠多个block即可得到ModernTCN模型。

综上所述,作者将时间上、通道上、变量上的三种关系解耦建模,用三种组卷积来巧妙地进行实现(深度可分离卷积其实也是组数等于深度数的组卷积),既简单又有效

代码实现

注意,我这里实现的模型是用于时间序列预测任务的,在backbone的基础上加了个预测头,具体的结构在论文附录图5。

import torch
import torch.nn as nn
import torch.nn.functional as F
from einops import rearrange

# B:batch size
# M:多变量序列的变量数
# L:过去序列的长度
# T: 预测序列的长度
# N: 分Patch后Patch的个数
# D:每个变量的通道数
# P:kernel size of embedding layer
# S:stride of embedding layer

class Embedding(nn.Module):
    def __init__(self, P=8, S=4, D=2048):
        super(Embedding, self).__init__()
        self.P = P
        self.S = S
        self.conv = nn.Conv1d(
            in_channels=1, 
            out_channels=D, 
            kernel_size=P, 
            stride=S
            )

    def forward(self, x):
        # x: [B, M, L]
        B = x.shape[0]
        x = x.unsqueeze(2)  # [B, M, L] -> [B, M, 1, L]
        x = rearrange(x, 'b m r l -> (b m) r l')  # [B, M, 1, L] -> [B*M, 1, L]
        x_pad = F.pad(
            x,
            pad=(0, self.P-self.S),
            mode='replicate'
            )  # [B*M, 1, L] -> [B*M, 1, L+P-S]
        
        x_emb = self.conv(x_pad)  # [B*M, 1, L+P-S] -> [B*M, D, N]
        x_emb = rearrange(x_emb, '(b m) d n -> b m d n', b=B)  # [B*M, D, N] -> [B, M, D, N]

        return x_emb  # x_emb: [B, M, D, N]


class ConvFFN(nn.Module):
    def __init__(self, M, D, r, one=True):  # one is True: ConvFFN1, one is False: ConvFFN2
        super(ConvFFN, self).__init__()
        groups_num = M if one else D
        self.pw_con1 = nn.Conv1d(
            in_channels=M*D, 
            out_channels=r*M*D, 
            kernel_size=1,
            groups=groups_num
            )
        self.pw_con2 = nn.Conv1d(
            in_channels=r*M*D, 
            out_channels=M*D, 
            kernel_size=1,
            groups=groups_num
            )

    def forward(self, x):
        # x: [B, M*D, N]
        x = self.pw_con2(F.gelu(self.pw_con1(x)))
        return x  # x: [B, M*D, N]


class ModernTCNBlock(nn.Module):
    def __init__(self, M, D, kernel_size, r):
        super(ModernTCNBlock, self).__init__()
        # 深度分离卷积负责捕获时域关系
        self.dw_conv = nn.Conv1d(
            in_channels=M*D, 
            out_channels=M*D, 
            kernel_size=kernel_size,
            groups=M*D,
            padding='same'
            )  
        self.bn = nn.BatchNorm1d(M*D)
        self.conv_ffn1 = ConvFFN(M, D, r, one=True)
        self.conv_ffn2 = ConvFFN(M, D, r, one=False)

    def forward(self, x_emb):
        # x_emb: [B, M, D, N]
        D = x_emb.shape[-2]
        x = rearrange(x_emb, 'b m d n -> b (m d) n')          # [B, M, D, N] -> [B, M*D, N]
        x = self.dw_conv(x)                                   # [B, M*D, N] -> [B, M*D, N]
        x = self.bn(x)                                        # [B, M*D, N] -> [B, M*D, N]
        x = self.conv_ffn1(x)                                 # [B, M*D, N] -> [B, M*D, N]

        x = rearrange(x, 'b (m d) n -> b m d n', d=D)         # [B, M*D, N] -> [B, M, D, N]
        x = x.permute(0,2,1,3)                                # [B, M, D, N] -> [B, D, M, N]
        x = rearrange(x, 'b d m n -> b (d m) n')              # [B, D, M, N] -> [B, D*M, N]

        x = self.conv_ffn2(x)                                 # [B, D*M, N] -> [B, D*M, N]

        x = rearrange(x, 'b (d m) n -> b d m n', d=D)         # [B, D*M, N] -> [B, D, M, N]
        x = x.permute(0,2,1,3)                                # [B, D, M, N] -> [B, M, D, N]

        out = x + x_emb

        return out  # out: [B, M, D, N]


class ModernTCN(nn.Module):
    def __init__(self, M, L, T, D=2048, P=8, S=4, kernel_size=51, r=1, num_layers=2):
        super(ModernTCN, self).__init__()
        # 深度分离卷积负责捕获时域关系
        self.num_layers = num_layers
        N = L // S
        self.embed_layer = Embedding(P, S, D)
        self.backbone = nn.ModuleList([ModernTCNBlock(M, D, kernel_size, r) for _ in range(num_layers)])
        self.head = nn.Linear(D*N, T)

    def forward(self, x):
        # x: [B, M, L]
        x_emb = self.embed_layer(x)  # [B, M, L] -> [B, M, D, N]

        for i in range(self.num_layers):
            x_emb = self.backbone[i](x_emb)  # [B, M, D, N] -> [B, M, D, N]

        # Flatten
        z = rearrange(x_emb, 'b m d n -> b m (d n)')  # [B, M, D, N] -> [B, M, D*N]
        pred = self.head(z)  # [B, M, D*N] -> [B, M, T]

        return pred  # out: [B, M, T]
    

past_series = torch.rand(2, 4, 96)
model = ModernTCN(4, 96, 192)
pred_series = model(past_series)
print(pred_series.shape)
# torch.Size([2, 4, 192])

Comments

附录很长,里面的消融实验很充分,效果也很好,想法很合理,实现起来也很简单,估计能中oral。不过感觉在那几个时间序列预测任务上的数据集都快刷爆了,性能快到瓶颈了,感觉之后很难再有大的效果提升了。


正好有个二分类的例子,即利用智能手表采集人的2种心率振动信号, 并简单地用一维卷积神经网络模型进行分类

首先对信号进行降噪处理,导入相关模块

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy.signal import butter, lfilter
from scipy.signal import freqz

设置绘图相关参数

plt.rcParams['figure.dpi'] = 250 
plt.rcParams['font.sans-serif'] = ['Arial']
plt.rcParams['font.family']='sans-serif'
plt.rcParams['axes.unicode_minus'] = False
plt.rcParams['figure.figsize'] = (16.0, 4.0)

构造butterworth带通滤波器函数

def butter_bandpass(lowcut, highcut, fs, order=5):
    nyq = 0.5 * fs
    low = lowcut / nyq
    high = highcut / nyq
    b, a = butter(order, [low, high], btype='band')
    return b, a

def butter_bandpass_filter(data, lowcut, highcut, fs, order=4):
    b, a = butter_bandpass(lowcut, highcut, fs, order=order)
    y = lfilter(b, a, data)
    return y

导入数据

df = pd.read_csv('fossil1.csv')

滤波相关参数

#采样频率
fs = 50.0
#低通截止频率
lowcut = 10
#高通截止频率
highcut = 24.8

信号滤波

t = np.linspace(1, len(df['x']),len(df['x']))
x = df['x']
y = df['y']
z = df['z']
plt.plot(t, x, label='Noisy signal')
#使用butterworth带通滤波器进行滤波
x_filtered = butter_bandpass_filter(x, lowcut, highcut, fs, order=4)
y_filtered = butter_bandpass_filter(y, lowcut, highcut, fs, order=4)
z_filtered = butter_bandpass_filter(z, lowcut, highcut, fs, order=4)
plt.plot(t, x_filtered, label='Filtered signal')
plt.grid(True)
plt.axis('tight')
plt.xlabel('samples')
plt.legend(loc='upper right')
plt.show()
x_new = x_filtered.reshape(10000,1)
y_new = y_filtered.reshape(10000,1)
z_new = z_filtered.reshape(10000,1)
model = df['model'].values.reshape(10000,1)
label = df['label'].values.reshape(10000,1)
filtered = np.hstack((x_new,y_new,z_new,label,model))
df_filtered = pd.DataFrame(filtered)
#保存处理后的数据
df_filtered.to_csv('fossil1_filtered.csv',index=0,header = ['x','y','z','label','model'])

数据预处理完成后,开始进行1D-CNN分类

导入相关模块

import numpy as np
import pandas as pd
import keras
import itertools
import matplotlib.pyplot as plt
import tensorflow as tf
from scipy import stats
import keras_metrics as km
from keras.models import Model
from keras.models import load_model
from keras import backend, layers, models, utils
from keras.layers import Conv1D,MaxPooling1D,Dense,Dropout,Flatten,GlobalAveragePooling1D
from keras.models import Sequential
from keras.utils import np_utils
from keras.layers import Reshape
from keras.layers import Dense, Activation
from keras.optimizers import RMSprop
from sklearn import preprocessing
from keras.wrappers.scikit_learn import KerasClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report
from sklearn.metrics import roc_auc_score
from sklearn.model_selection import StratifiedKFold
from sklearn.model_selection import KFold
from sklearn.metrics import confusion_matrix
from sklearn import preprocessing
import warnings
warnings.filterwarnings("ignore")


Tags:

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表