在 pytorch 中,我们一般通过继承 torch.nn.Module
类来构建模型,将子模块在构造函数 __init__
中初始化,并在前向传播函数 forward
但是在 keras 中,事情并没有那么简单。诚然,一般的教程都会以简单的模型来开始;但是我们并不满足于简单的模型,所以会介绍到更复杂的 API。先说结论(太长不看):
简单模块用 Sequential
稍复杂网络结构用 functional API 构建网络
若非必要,不建议 使用继承类的方式构建网络
注意:以下基于 keras 2.X API 探讨,部分内容可能过于老旧
如果是能简单到能串糖葫芦的模型,那么直接用 Sequential
1 2 3 4 5 6 7 8 9 10 11 12 13 14 import kerasfrom keras import layersmodel = keras.Sequential( [ layers.Dense(2 , activation="relu" , name="layer1" ), layers.Dense(3 , activation="relu" , name="layer2" ), layers.Dense(4 , name="layer3" ), ] ) x = ops.ones((3 , 3 )) y = model(x)
对比 pytorch 其实也有对应的傻瓜式 模型构建 API
但是这样的模型构建只能应用在串糖葫芦这种 special case 上,可能作为简单教学和简单应用是没问题的,涉及到多路的网络结构就需要更加复杂的 API 来实现
复杂一点:functional API 涉及到多路网络架构的话,我们就需要显式地定义网络的计算图了。好在 keras 提供了比较方便的 functional API
1 2 3 4 5 6 7 8 9 10 11 12 import kerasfrom keras import layers, modelsinputs = keras.Input(shape=(784 ,)) x1 = layers.Dense(128 , activation="relu" )(inputs) x2 = layers.Dense(128 , activation="tanh" )(inputs) x = layers.Concatenate()([x1, x2]) x = layers.Dense(64 , activation="relu" )(x) x = layers.Dense(10 , activation=None )(x) y = layers.Softmax()(x) model = models.Model(inputs=inputs, outputs=y) model.summary()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 Model: "model" __________________________________________________________________________________________________ Layer (type) Output Shape Param # Connected to ================================================================================================== input_1 (InputLayer) [(None, 784)] 0 [] dense (Dense) (None, 128) 100480 ['input_1[0][0]'] dense_1 (Dense) (None, 128) 100480 ['input_1[0][0]'] concatenate (Concatenate) (None, 256) 0 ['dense[0][0]', 'dense_1[0][0]'] dense_2 (Dense) (None, 64) 16448 ['concatenate[0][0]'] dense_3 (Dense) (None, 10) 650 ['dense_2[0][0]'] softmax (Softmax) (None, 10) 0 ['dense_3[0][0]'] ================================================================================================== Total params: 218,058 Trainable params: 218,058 Non-trainable params: 0 __________________________________________________________________________________________________
Functional API 的大概用法就是:
通过实例化 keras.models.Model
依据这个特性,我们可以很方便地构建 ResNet 的那种短路连接。而且,我们可以先把 block 构造成一个 Model,然后复用之:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 import kerasfrom keras import layers, modelsdef get_multi_block (input_dim ): inputs = keras.Input(shape=(input_dim,)) x1 = layers.Dense(32 , activation="relu" )(inputs) x2 = layers.Dense(32 , activation="tanh" )(inputs) y = layers.Concatenate()([x1, x2]) model = models.Model(inputs=inputs, outputs=y) return model inputs = keras.Input(shape=(200 ,)) x = get_multi_block(200 )(inputs) x = get_multi_block(64 )(x) x = get_multi_block(64 )(x) x = layers.Dense(10 , activation=None )(x) y = layers.Softmax()(x) model = models.Model(inputs=inputs, outputs=y) model.summary()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 Model: "model_3" _________________________________________________________________ Layer (type) Output Shape Param # ================================================================= input_1 (InputLayer) [(None, 200)] 0 model (Functional) (None, 64) 12864 model_1 (Functional) (None, 64) 4160 model_2 (Functional) (None, 64) 4160 dense_6 (Dense) (None, 10) 650 softmax (Softmax) (None, 10) 0 ================================================================= Total params: 21,834 Trainable params: 21,834 Non-trainable params: 0 _________________________________________________________________
当然一旦层和模块套多了,用 model_1
等 keras 自定义的取名方式可能会让模块含义难以理解,我们可以用模块构造时的 name
1 x = layers.Dense(10 , activation=None , name='my_logit_layer' )(x)
另外一个用处是在多模态里面,例如我们有一个 1D 的数据和 2D 的数据,我们需要将 1D 的数据和 2D 的数据分开处理:
1D 的数据用 LSTM 提取特征
2D 的数据用卷积提取特征
之后用线性层输出一个分类概率 logit
用 keras 的 Functional API 来实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 import kerasfrom keras import layers, modelsinput_1d = keras.Input(shape=(None , 64 )) x_1d = layers.LSTM(32 )(input_1d) x_1d = layers.Flatten()(x_1d) input_2d = keras.Input(shape=(28 , 28 , 1 )) x_2d = layers.Conv2D(32 , (3 , 3 ), activation='relu' )(input_2d) x_2d = layers.Conv2D(32 , (3 , 3 ), activation='relu' )(x_2d) x_2d = layers.Flatten()(x_2d) x = layers.Concatenate()([x_1d, x_2d]) y = layers.Dense(1 , activation='sigmoid' )(x) model = models.Model(inputs=[input_1d, input_2d], outputs=y) model.summary()
如果我们觉得用 list 来处理输入的话语义上可能不清晰,还可以把输入做成 dict 的形式,之后调用模型也得用约定的 dict 形式:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 import kerasfrom keras import layers, modelsinput_1d = keras.Input(shape=(None , 64 ), name='text' ) x_1d = layers.LSTM(32 )(input_1d) x_1d = layers.Flatten()(x_1d) input_2d = keras.Input(shape=(28 , 28 , 1 ), name='image' ) x_2d = layers.Conv2D(32 , (3 , 3 ), activation='relu' )(input_2d) x_2d = layers.Conv2D(32 , (3 , 3 ), activation='relu' )(x_2d) x_2d = layers.Flatten()(x_2d) x = layers.Concatenate()([x_1d, x_2d]) y = layers.Dense(1 , activation='sigmoid' )(x) model = models.Model(inputs={ "text" : input_1d, "image" : input_2d }, outputs=y) model.summary()
继承类 因为哥们一开始也是从 pytorch 接触到炼丹的,一开始想要自定义模型的话,自然也会先去尝试 follow,谁知这里面还是挺坑的。说在前面:这个实现方法 非必要不建议用 !
一开始想到把模型弄成一个类的方式可以维护很多和模型绑定的属性(例如某些数据的维度、某些需要调整是否可训练的模块等),所以就这样做了,但是嘛 emmmm,基本的能用是能用,但是进阶的功能就缺胳膊少腿了,尤其是继承 keras.models.Model
首先不同于 pytorch 模型和模块都继承自 torch.nn.Module
类,keras 将层归于 keras.layers.Layer
类,将模型归于 keras.models.Model
自定义层:继承 keras.layers.Layer
类 首先从 官方文档 开始,我们先从构建一个最简单的线性层开始玩起:
1 2 3 4 5 6 7 8 9 10 11 12 class Linear (keras.layers.Layer): def __init__ (self, units=32 , input_dim=32 ): super ().__init__() self.w = self.add_weight( shape=(input_dim, units), initializer="random_normal" , trainable=True , ) self.b = self.add_weight(shape=(units,), initializer="zeros" , trainable=True ) def call (self, inputs ): return tf.matmul(inputs, self.w) + self.b
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 import tensorflow as tfimport kerasfrom keras import layers, modelsclass Linear (layers.Layer): def __init__ (self, units=32 , input_dim=32 ): super ().__init__() self.w = self.add_weight( shape=(input_dim, units), initializer="random_normal" , trainable=True , ) self.b = self.add_weight(shape=(units,), initializer="zeros" , trainable=True ) def call (self, inputs ): return tf.matmul(inputs, self.w) + self.b inputs = keras.Input(shape=(100 ,)) x = Linear(32 , 100 )(inputs) x = layers.ReLU()(x) x = Linear(10 , 32 )(x) y = layers.Softmax()(x) model = models.Model(inputs=inputs, outputs=y) model.summary()
如果想实现更多的规范,建议参考 keras 官方的 keras.layers.Dense
实现 ,里面包含很多的 corner case,以及更完备的设置
方法在基类 keras.layers.Layer
中被重写,包含是否为 eager execution、判断输入是否兼容等。我们只需实现在 call
如何实现那种在训练阶段和测试阶段行为不同的模块,譬如 batch normalization, dropout?
看到 keras.layers.Dense
类的 官方实现 ,会发现权重的初始化被弄到了 build
构造函数里面只有初始化 initializer 的操作
方法接受 input_shape
在 build
方法的最后,还需要设置一个 self.built=True
还实现了一个 compute_output_shape
最后还有一个 get_config
对于层的序列化和反序列化,参考 官方文档 中关于 get_config
和 build_from_config
主要是 build
方法的实现,让层在被使用 functional API 等方式构建的时候有了形状推理的功能。
Pytorch 用 model.train()
和 model.eval()
来控制模型是否在训练还是在测试,但是 keras 模型是通过在 __call__
方法中输入 training
1 2 y_pred = model(X, training=False ) y_pred = model(X, training=True )
所以,在前向传播,也就是 call
方法中,我们也要加入 training
参数在不同取值下的逻辑,参考一下 keras.layer.Dropout
的 官方实现 :
根据 training 的取值来,具体利用 control_flow_util.smart_cond
如果 training=True
,则返回应用 dropout 之后的结果
如果 training=False
这里就是坑的一点,也是这里要指出的一个事实:加入 training
参数的 call
方法可能导致 内存泄漏 !
以下是一个 NoisyNet 的实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 import tensorflow as tfimport kerasfrom keras import layers, modelsfrom keras import backend as Kimport mathclass NoisyLinear (layers.Layer): def __init__ (self, out_features, sigma_init=0.5 , **kwargs ): self.out_features = out_features self.sigma_init = sigma_init super (NoisyLinear, self).__init__(**kwargs) def build (self, input_shape ): in_features = input_shape[-1 ] mu_range = 1 / math.sqrt(in_features) self.weight_mu = self.add_weight(name='weight_mu' , shape=(self.out_features, in_features), initializer=tf.random_uniform_initializer( minval=-mu_range, maxval=mu_range), trainable=True ) self.bias_mu = self.add_weight(name='bias_mu' , shape=(self.out_features,), initializer=tf.random_uniform_initializer( minval=-mu_range, maxval=mu_range), trainable=True ) self.weight_sigma = self.add_weight(name='weight_sigma' , shape=(self.out_features, in_features), initializer=tf.constant_initializer( self.sigma_init / math.sqrt(in_features)), trainable=True ) self.bias_sigma = self.add_weight(name='bias_sigma' , shape=(self.out_features,), initializer=tf.constant_initializer( self.sigma_init / math.sqrt(self.out_features)), trainable=True ) super (NoisyLinear, self).build(input_shape) def call (self, x, training=False ): if training: weight_epsilon = self._scale_noise( (self.out_features, x.shape[-1 ])) bias_epsilon = self._scale_noise((self.out_features, )) weight = self.weight_mu + self.weight_sigma * weight_epsilon bias = self.bias_mu + self.bias_sigma * bias_epsilon else : weight = self.weight_mu bias = self.bias_mu output = K.dot(x, K.transpose(weight)) + bias return output def _scale_noise (self, shape ): epsilon = K.random_normal(shape=shape) epsilon = K.sign(epsilon) * K.sqrt(K.abs (epsilon)) return epsilon def compute_output_shape (self, input_shape ): return input_shape[0 ], self.out_features inputs = keras.Input(shape=(100 ,)) x = layers.Dense(32 )(inputs) x = layers.ReLU()(x) x = NoisyLinear(10 )(x) y = layers.Softmax()(x) model = models.Model(inputs=inputs, outputs=y) model.summary()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 import numpy as npimport psutilimport osX = np.random.rand(20000 , 100 ).astype(np.float32) y = tf.keras.utils.to_categorical(np.random.randint(10 , size=(20000 ,)), num_classes=10 ).astype(np.float32) loss_fn = tf.keras.losses.CategoricalCrossentropy() optimizer = tf.keras.optimizers.Adam() class MemoryMonitor (tf.keras.callbacks.Callback): def on_epoch_begin (self, epoch, logs=None ): process = psutil.Process(os.getpid()) print (f"Epoch {epoch} start - Memory usage: {process.memory_info().rss / 1024 ** 2 :.2 f} MB" ) def on_epoch_end (self, epoch, logs=None ): process = psutil.Process(os.getpid()) print (f"Epoch {epoch} end - Memory usage: {process.memory_info().rss / 1024 ** 2 :.2 f} MB" ) batch_size = 100 dataset = tf.data.Dataset.from_tensor_slices((X, y)).batch(batch_size) epochs = 200 memory_monitor = MemoryMonitor() for epoch in range (epochs): memory_monitor.on_epoch_begin(epoch) for step, (x_batch, y_batch) in enumerate (dataset): with tf.GradientTape() as tape: predictions = model(x_batch, training=True ) loss = loss_fn(y_batch, predictions) gradients = tape.gradient(loss, model.trainable_variables) optimizer.apply_gradients(zip (gradients, model.trainable_variables)) memory_monitor.on_epoch_end(epoch)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 Epoch 0 start - Memory usage: 1231.91 MB Epoch 0 end - Memory usage: 2318.18 MB Epoch 1 start - Memory usage: 2318.18 MB Epoch 1 end - Memory usage: 2350.60 MB Epoch 2 start - Memory usage: 2350.60 MB Epoch 2 end - Memory usage: 2382.94 MB Epoch 3 start - Memory usage: 2382.94 MB Epoch 3 end - Memory usage: 2415.52 MB Epoch 4 start - Memory usage: 2415.52 MB Epoch 4 end - Memory usage: 2448.46 MB Epoch 5 start - Memory usage: 2448.46 MB Epoch 5 end - Memory usage: 2479.93 MB ... Epoch 48 start - Memory usage: 3759.94 MB Epoch 48 end - Memory usage: 3792.23 MB Epoch 49 start - Memory usage: 3792.23 MB Epoch 49 end - Memory usage: 3823.97 MB Epoch 50 start - Memory usage: 3823.97 MB Epoch 50 end - Memory usage: 3870.03 MB
可以看到每个 epoch 之后是有不少的增加的,这只是模拟数据,现实数据肯定只会更多,而且训练的 epoch 数都会非常多,挂着稍微久一点,显存就直接笋干爆炸了。
所有涉及新张量运算的函数应当用 @tf.function
针对这个例子,我们将 NoisyLinear
类的 _scale_noise
方法加上 @tf.function
1 2 3 4 5 6 class NoisyLinear (layers.Layer): @tf.function def _scale_noise (self, shape ): epsilon = K.random_normal(shape=shape) epsilon = K.sign(epsilon) * K.sqrt(K.abs (epsilon)) return epsilon
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 Epoch 0 start - Memory usage: 1260.84 MB Epoch 0 end - Memory usage: 2314.94 MB Epoch 1 start - Memory usage: 2314.94 MB Epoch 1 end - Memory usage: 2316.06 MB Epoch 2 start - Memory usage: 2316.06 MB Epoch 2 end - Memory usage: 2316.62 MB Epoch 3 start - Memory usage: 2316.62 MB Epoch 3 end - Memory usage: 2317.14 MB Epoch 4 start - Memory usage: 2317.14 MB Epoch 4 end - Memory usage: 2315.14 MB Epoch 5 start - Memory usage: 2315.14 MB Epoch 5 end - Memory usage: 2309.54 MB ... Epoch 48 start - Memory usage: 2214.17 MB Epoch 48 end - Memory usage: 2214.18 MB Epoch 49 start - Memory usage: 2214.18 MB Epoch 49 end - Memory usage: 2214.28 MB Epoch 50 start - Memory usage: 2214.28 MB Epoch 50 end - Memory usage: 2214.66 MB
自定义模型:继承 keras.models.Model
类 一个层可能就是单个或多个输入、单个输出的结构,但是模型的输入和输出都可以是多个的
多个输入、单个输出的层:如 keras.layers.Concatenate
总之,如果我们想仿照 pytorch 继承 nn.Module
的方法,来继承这个 Model 类的话,也不是不行。继承 Model 类让我们能够给模型增加新的属性和方法,能够更好地定制化模型。
事实上,官方也提供了关于这个方法的 文档 ,我们来拷打一下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 import kerasfrom keras import layersclass MyModel (keras.Model): def __init__ (self, num_classes=10 ): super ().__init__() self.block_1 = layers.Dense(128 , activation='relu' ) self.block_2 = layers.Dense(64 , activation='relu' ) self.classifier = layers.Dense(num_classes, activation=None ) def call (self, inputs ): x = self.block_1(inputs) x = self.block_2(x) return self.classifier(x) model = MyModel() model.summary()
1 ValueError: This model has not yet been built. Build the model first by calling `build()` or by calling the model on a batch of data.
因为根据已知的信息,并不知道输入的 shape 该长啥样。所以我们就试试 calling the model on a batch of data
1 2 3 4 5 import numpy as npmodel = MyModel() dummy_X = np.random.randn(1 , 32 ) model(dummy_X) model.summary()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 Model: "my_model" _________________________________________________________________ Layer (type) Output Shape Param # ================================================================= dense (Dense) multiple 4224 dense_1 (Dense) multiple 8256 dense_2 (Dense) multiple 650 ================================================================= Total params: 13,130 Trainable params: 13,130 Non-trainable params: 0 _________________________________________________________________
会发现有一点:Output Shape 这一列全是展示的 multiple,而非准确的形状。
1 2 3 4 import numpy as npmodel = MyModel() model.build(input_shape=(None , 32 )) model.summary()
但是 model.summary()
输出的结果还是同上。最后找了一圈,发现要 自己实现一下 build
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 import kerasfrom keras import layersimport tensorflow as tfclass MyModel (keras.Model): def __init__ (self, num_classes=10 ): super ().__init__() self.block_1 = layers.Dense(128 , activation='relu' ) self.block_2 = layers.Dense(64 , activation='relu' ) self.classifier = layers.Dense(num_classes, activation=None ) def build (self, input_shape ): super ().build(input_shape) x = tf.keras.Input(shape=input_shape[1 :]) self.call(x) def call (self, inputs ): x = self.block_1(inputs) x = self.block_2(x) return self.classifier(x)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 Model: "my_model" _________________________________________________________________ Layer (type ) Output Shape Param ================================================================= dense (Dense) (None , 128 ) 4224 dense_1 (Dense) (None , 64 ) 8256 dense_2 (Dense) (None , 10 ) 650 ================================================================= Total params: 13 ,130 Trainable params: 13 ,130 Non-trainable params: 0 _________________________________________________________________
网络结构的可视化 keras 自带的画图 API 首先,我们可以通过 keras 自带的方法导出 PNG 格式的模型结构图,用法详见 文档 :
1 2 3 4 5 6 tf.keras.utils.plot_model(model, to_file='model_structure.png' , show_shapes=True , show_layer_names=True , expand_nested=True , show_layer_activations=True )
1 2 pip install pydot graphviz sudo apt install graphviz
对于上面的自定义 Linear
层,以及用 functional API 构建的模型能画出这样的图:
但是对于继承 keras.models.Model
tensorboard 此外,我们也可以在 tensorboard 中可视化模型:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 log_dir = './tensorboard' summary_writer = tf.summary.create_file_writer(log_dir) tf.summary.trace_on() dummy_input = tf.zeros((1 , 100 )) @tf.function def forward_pass (input_data ): return model(input_data, training=True ) _ = forward_pass(dummy_input) with summary_writer.as_default(): tf.summary.trace_export(name="model_structure" , step=0 )
注意:上面的前向传播 必须 要用带有 @tf.function
装饰器的函数封一层,否则 tensorboard 就不能记录上网络结构
双击 model 点开之:
同样地,对于继承 keras.models.Model
类的模型,似乎也能通过 tensorboard 进行可视化:
双击 model 点开之:
再双击 dense 点开之:
可以看到我们在构建模型时所选用的 ReLU 激活函数
总结 如果不是有特别需要(例如合作、接前人维护的屎山)的话,尽量别碰 tensorflow,因为这会让你的生活变得不幸 。
如果需要自定义模型的话,也会比较推荐用 fuctional API 去构建。如果实在是有定制化模型的需要(如定制模型的不同前向传播路径、绑定特定参数为属性、绑定特定函数为方法等 OOP 操作实现),建议自定义一个类,类中包含一个由 functional API 构建的模型(作为属性),在这个自定义类中维护其他的属性及方法。
顺带一提:keras 现在都走到 3.X 版本了,后端也支持 pytorch 了(虽然某些 API 可能调整了)。如果是单纯想尝试 keras 的话,这边也建议 keras 3.X + pytorch 起手了(