众所周知,keras 数据集的实现主要依赖于两派:

  • 一派为 keras 提供的数据集相关接口,其中包括已经封装好的几个 toy dataset,还有一些 预处理函数
    • 虽然预处理函数被标成 deprecated,但是貌似函数功能没变,只是被挪了个地方,应该都是进到 keras.utils 里面了
  • 另一派为显式实现 tf.data pipeline

官方推荐用 tf.data 去构建 TensorFlow 输入流水线,更多参考 官方教程

先给一个太长不看的结论:

  • 如果图省事+不需要定制需求的话就用 keras 自带的接口就好
  • 实在想折腾的话建议优先关注 mini-batch 的 batch_size 设置
  • 然后注意向量化运算的利用

问题描述

工地上涉及到的数据集是比较客制化的时序数据集,可以描述为下面的场景

  • 输入 Xy,以及下标序列 index
  • X 的 shape 为 (num_samples, num_features), y 的 shape 为 num_samples
  • 设置时间序列长度为 sequence_length
  • 对于每个 index 中的元素 i,抽出 (X[i-sequence_length+1:i+1], y[i]) 作为一笔数据
  • 收集 batch_size 笔这样的数据作为一个 batch 的数据,送到模型里面进行训练和推理

特别地,当 index 取为全集的时候(每个能抽的数据都利用去训练,也即 index=[sequence_length-1, l, ..., num_samples]),该功能可以用 keras.utils.timeseries_dataset_from_array 来实现,参考:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import os
os.environ['VISIBLE_CUDA_DEVICES'] = '-1'

import numpy as np
from keras.utils import timeseries_dataset_from_array


num_samples = 10000
num_features = 10
sequence_length = 20
batch_size = 3

X = np.arange(num_samples * num_features).reshape(num_samples, num_features)
y = np.arange(num_samples)

dataset = timeseries_dataset_from_array(X,
y,
sequence_length,
batch_size=batch_size)

Xi, yi = dataset.take(1).as_numpy_iterator().next()
print(f'{Xi = }')
print()
print(f'{yi = }')

观察上述输出,发现要使用该 API,还需在 y 的前面加上 l-1 个 padding 才能匹配上我们上面描述的那个需求

但是由于我们加了对数据集上的采样,不能直接调用这玩意儿

一开始的做法

简易实现

首先想到的(能被搜出来的)就是先实现一个吐出单笔数据的数据集,然后利用 tf.data.Dataset.from_generator 将一个 generator 封装为 tf.data.Dataset,然后在这基础上再去加 batchprefetchcache 等操作构造 mini-batch

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import os
os.environ['VISIBLE_CUDA_DEVICES'] = '-1'

import numpy as np
import tensorflow as tf


num_samples = 10000
num_features = 10
sequence_length = 20
batch_size = 3

X = np.arange(num_samples * num_features).reshape(num_samples, num_features)
y = np.arange(num_samples)

def get_sample_generator(X: np.ndarray, y: np.ndarray, index: np.ndarray):
def sample_generator():
for i in index:
yield X[i-sequence_length+1:i+1], y[i]
return sample_generator

output_signature = (
tf.TensorSpec(shape=(sequence_length, num_features), dtype=tf.float32),
tf.TensorSpec(shape=(), dtype=tf.float32)
)
g = get_sample_generator(X, y, np.arange(sequence_length-1, len(X)))
dataset = tf.data.Dataset.from_generator(g, output_signature=output_signature)
dataset = dataset.batch(batch_size).prefetch(tf.data.AUTOTUNE)

Xi, yi = dataset.take(1).as_numpy_iterator().next()
print(f'{Xi = }')
print()
print(f'{yi = }')

一开始就是走这个简易实现,不过马上发现问题:训练第 1 个 epoch 用时过久,似乎能通过加 cache 来“绕过”后续 epoch 训练的问题。猜测瓶颈出在数据集吐数据上。

性能分析方法

一个手段是弄一个简单的模型去跑模型训练,利用 tensorflow profiler 查看前面 step 的 profile 结果,可以清晰地看到看到当 batch_size=10000 时,绝大多数时间都耗在了输入生成的阶段

还有一个手段是利用 line_profiler 分析,去尝试单步迭代数据集,看看主要耗时都在哪一行,绝对时长多少,并且针对热点去调整实现

譬如上面简易实现的 profiler 结果,如下:

简易实现

暴力实现

简易实现取数据慢的核心原因是因为抽样和窗口化操作需要每次都老老实实去取才行。为了绕过这个在线生成的过程,一个很自然的方法就是先预先(离线)生成形状为 (num_index, sequence_length, num_features) 的全量数据,然后利用 tf.data.Dataset.from_tensor_slices 来生成迭代数据集。这样做的好处就是 __getitem__ 或者 __iter__ 的时候(理论上)不存在在线计算的瓶颈;而且离线生成的时候也可以用到一些向量化操作的 trick 来批量化取下标

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import os
os.environ['VISIBLE_CUDA_DEVICES'] = '-1'

import numpy as np
import tensorflow as tf


num_samples = 10000
num_features = 10
sequence_length = 20
batch_size = 3

X = np.arange(num_samples * num_features).reshape(num_samples, num_features)
y = np.arange(num_samples)

def get_full_dataset(X: np.ndarray, y: np.ndarray, index: np.ndarray):
num_index = len(index)
X_full = np.zeros((num_index, sequence_length, num_features), dtype=np.float32)
for i in range(sequence_length):
current_index = index + i - sequence_length + 1
X_full[:, i, :] = X[current_index, :]
y_full = y[index]
return X_full, y_full

index = np.arange(sequence_length-1, len(X))
X_full, y_full = get_full_dataset(X, y, index)
dataset = tf.data.Dataset.from_tensor_slices((X_full, y_full))
dataset = dataset.batch(batch_size).prefetch(tf.data.AUTOTUNE)

Xi, yi = dataset.take(1).as_numpy_iterator().next()
print(f'{Xi = }')
print()
print(f'{yi = }')

但是这有一个问题:工地上的 dataframe 往往会比较大,一个 dataframe 勉强能被内存装下,但是 sequence_length 个 dataframe 是装不下的。如果用到虚拟内存的话,那么整个过程又会变得奇慢无比。所以这个实现对于大数据集来说是不现实的。

改进

魔改 keras.utils.timeseries_dataset_from_array

首先我们假设 keras.utils.timeseries_dataset_from_array 的实现是要更加好的,然后进到 具体实现 里面,把

1
start_positions = np.arange(0, num_seqs, sequence_stride, dtype=index_dtype)

替换成我们定制的下标就好了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
import os
os.environ['VISIBLE_CUDA_DEVICES'] = '-1'

import numpy as np
import tensorflow as tf

def sequences_from_indices(array, indices_ds: tf.data.Dataset):
dataset = tf.data.Dataset.from_tensors(array).repeat()
dataset = tf.data.Dataset.zip((dataset, indices_ds)).map(
lambda steps, inds: tf.gather(steps, inds),
num_parallel_calls=tf.data.AUTOTUNE,
)
return dataset

def get_sequence_dataset(X: np.ndarray,
Y: np.ndarray,
index: np.ndarray,
sequence_length: int,
batch_size: int,
shuffle: bool = False):

if len(index) == 0:
return tf.data.Dataset.range(0)
# Always generate the same content
positions_ds = tf.data.Dataset.from_tensors(index).repeat()
# For each initial window position, generates indices of the window elements

indices = tf.data.Dataset.zip(
(tf.data.Dataset.range(len(index)), positions_ds)
)

indices_X = indices.map(
lambda i, positions: tf.range(
positions[i] - sequence_length + 1,
positions[i] + 1,
),
num_parallel_calls=tf.data.AUTOTUNE,
)

indices_Y = indices.map(
lambda i, positions: positions[i],
num_parallel_calls=tf.data.AUTOTUNE,
)

dataset_X = sequences_from_indices(X, indices_X)
dataset_Y = sequences_from_indices(Y, indices_Y)

dataset = tf.data.Dataset.zip((dataset_X, dataset_Y))
dataset = dataset.prefetch(tf.data.AUTOTUNE)
if batch_size is not None:
if shuffle:
# Shuffle locally at each iteration
dataset = dataset.shuffle(buffer_size=batch_size * 8)
dataset = dataset.batch(
batch_size
)
else:
if shuffle:
dataset = dataset.shuffle(buffer_size=1024)
return dataset


num_samples = 10000
num_features = 10
sequence_length = 20
batch_size = 3

X = np.arange(num_samples * num_features).reshape(num_samples, num_features)
y = np.arange(num_samples)

index = np.arange(sequence_length-1, len(X))

dataset = get_sequence_dataset(X,
y,
index,
sequence_length,
batch_size=batch_size)

Xi, yi = dataset.take(1).as_numpy_iterator().next()
print(f'{Xi = }')
print()
print(f'{yi = }')

这个实现的思路是先在 index 的时候转换成 tf.data 对应的下标迭代 pipeline,然后再根据下标去取数据。

比起一开始的简易实现,这个实现迭代的时候会更快一些(可能是更早被 tf.data 接管了);但是这个实现还是有一个问题:没有利用到 可以并行取数据 的特性。也就是说,取 index[2] 可以跟 index[1] index[0] 的时候同时取的,甚至可以先于 index[1] index[0] 取的,因为这些取数据的过程 互不干扰。但是,目前使用的 tf.data 取数据的过程似乎都是串行的,譬如 tf.data.Dataset.from_generator 这个从 generator 来生成数据的,就是假设只能通过一个 next 方法来获取下一批数据。之后的优化应该重点考虑接触这个限制,能够让取数据的过程并行化。

keras.utils.Sequence 实现

之后的实现是继承 keras.util.Sequence 这个类。注意这个类的数据迭代是以 batch 为计量单位的,也就是每次取一个 batch 的数据。需要重写以下方法:

  • __len__ 这个数据集一共有多少个 batch
  • __getitem__ 根据 batch_id,获取对应的批量数据
  • on_epoch_end 在每个 epoch 执行完之后调用,一般是用于负责数据的 shuffle

在实现的过程中,注意几点:

  • 还是尽可能把 __getitem__ 中取数据的逻辑变得简单,越简单越好
  • 预处理的操作可以提前做,譬如放在构造函数里面完成
  • 尽可能利用 numpy 中的向量化运算,譬如根据下标取元素的时候,可以直接根据下标取出所有元素;还有如下特性:
1
2
3
4
5
6
7
8
9
A = np.arange(6).reshape(2, 3)
# array([[0, 1, 2],
# [3, 4, 5]])
B = A[:, [[0, 1], [1, 2]]]
# array([[[0, 1],
# [1, 2]],
#
# [[3, 4],
# [4, 5]]])

也就是说,我们可以预先生成一个二维的 index(下面的 self.index_sequence),然后利用这个特性去进行索引取出所需要的元素。之所以这样预取,是因为 self.index_sequence 的形状是 (num_index, sequence_length),跟 X 的量级差不多,所以内存还能装得下(如果装不下的话恐怕又只能在线生成了)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
import os
os.environ['VISIBLE_CUDA_DEVICES'] = '-1'

import numpy as np
from tensorflow import keras


class TSDataset(keras.utils.Sequence):
def __init__(
self,
X: np.ndarray,
y: np.ndarray,
index: np.ndarray,
sequence_length: int,
batch_size: int,
shuffle: bool = False,
drop_last: bool = True,
) -> None:
super().__init__()
self.X = X.astype(np.float32)
self.y = y.astype(np.float64)
self.index = index
self.sequence_length = sequence_length
self.batch_size = batch_size
self.shuffle = shuffle
self.num_index = len(self.index)

# drop_last: train set vs test set
self.steps_per_epoch = self.num_index // batch_size
if drop_last or self.num_index % batch_size == 0:
self.num_index_in_last_step = 0
else:
self.steps_per_epoch += 1
# append fake data to the end
self.index += self.index[
: batch_size - self.num_index % batch_size
]
self.num_index_in_last_step = self.num_index % batch_size

self.index = np.array(self.index)
self._get_index_sequence()

def _get_index_sequence(self):
num_maintaining_index = len(self.index)
self.index_sequence = np.zeros(
(num_maintaining_index, self.sequence_length), dtype=np.int32
)
for i in range(self.sequence_length):
self.index_sequence[:, i] = self.index + i + 1 - self.sequence_length

def __len__(self):
return self.steps_per_epoch

def __getitem__(self, index):
current_batch_id = self.index[
self.batch_size * index : self.batch_size * (index + 1)
]

all_indices = self.index_sequence[
self.batch_size * index : self.batch_size * (index + 1), :
]
X_batch = self.X[all_indices, ...]
y_batch = self.y[current_batch_id]

return X_batch, y_batch

def on_epoch_end(self):
if self.shuffle:
num_maintaining_index = len(self.index)
shuffle_mapping = list(range(num_maintaining_index))
np.random.shuffle(shuffle_mapping)
self.index = self.index[shuffle_mapping]
self.index_sequence = self.index_sequence[shuffle_mapping, :]

num_indexs = 10000
num_features = 10
sequence_length = 20
batch_size = 3

X = np.arange(num_indexs * num_features).reshape(num_indexs, num_features)
y = np.arange(num_indexs)

index = np.arange(sequence_length-1, len(X))

dataset = TSDataset(X,
y,
index,
sequence_length,
batch_size=batch_size)

Xi, yi = dataset[0]
print(f'{Xi = }')
print()
print(f'{yi = }')

然后在 model.fitmodel.evaluate 的时候,需要加上 workers=6 以及 max_queue_size=20 参数来并行取数据(这两个数实测下来往更高了调好像就占不到便宜了)

当然具体改进有没有用,以及模型跟占比如何的话,还得结合 tensorflow profiler 来分析:

keras.utils.Sequence 实现

对比简易实现,改进已经很 OK 了,但是还有进一步改进的余地

进阶改进

下面介绍我试了下来,比较有用的改进。

改进 1:调小 batch_size

做到这里之后,工友的一次不经意的尝试揭开了最关键的谜底:之前的配置文件中 batch_size 被设置成了 10000,实际上设置成 2000 之后马上输入的生成时间就会减少很多,WTF……

所以最关键的实际上就是把 batch_size 调小就行了。我们在做 CV 的时候往往会被灌输一个理念,就是 batch_size 尽可能调大到显存炸了为止。这是针对 batch_size <= 128 的情形(一般 CV 最多就这样设 batch_size 顶天了);但是,当 batch_size 大到一定程度的时候,处理 batch 数据这个环节就会成为拖油瓶。所以最简单的解决策略就是调小 batch_sizebatch_size 调小之后的 profiler 长这样,生成数据的时长占比一下子就不见了:

调小 batch_size 之后

改进 2:利用 tf.data.Dataset.interleave

用法详见 文档

因为上面的基于 generator 的所有实现,无论何种,tensorflow 底层仅会调用 tf_data_iterator_get_next 来获取数据

profile 中可以看到

而且 profiler 的性能提示中说到可以考虑利用 tf.data.Dataset.interleave 这个 API。原理猜想大概就是把以前的单个 generator 变成若干并行的 generator,然后取数据的时候就轮流用子 generator 来取数据,如下图:

interleave 工作原理猜想

所以,如果我们不变 tensorflow 的这个 tf_data_iterator_get_next 底层调用,只是将若干个 dataset 以如下的方式“组装”成某个大 dataset,对外(箭头的右边)还是通过 tf_data_iterator_get_next 这个函数来获取下一个 batch 的数据,但是对内可以将这个获取下一个 batch 的操作 “指派” 到具体的子数据集上,结合子数据集又可以通过 prefetch 来提前获取后 N 个迭代的内容,那么就相当于将取数据这个操作“平摊”到子数据集上,也就实现了数据集并行。

举个例子:假设单个数据集获取下一个 batch 需要耗时 400ms,如果 “组装” 的并行度为 10 的话,那么理想状态下,“组装” 后的数据集获取下一个 batch 仅需要耗时 40ms

代码实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
import os
os.environ['VISIBLE_CUDA_DEVICES'] = '-1'

import numpy as np
import tensorflow as tf
from tensorflow import keras


class TSDataset(keras.utils.Sequence):
def __init__(
self,
X: np.ndarray,
y: np.ndarray,
index: np.ndarray,
sequence_length: int,
batch_size: int,
shuffle: bool = False,
drop_last: bool = True,
) -> None:
super().__init__()
self.X = X.astype(np.float32)
self.y = y.astype(np.float64)
self.index = index
self.sequence_length = sequence_length
self.batch_size = batch_size
self.shuffle = shuffle
self.num_index = len(self.index)

# drop_last: train set vs test set
self.steps_per_epoch = self.num_index // batch_size
if drop_last or self.num_index % batch_size == 0:
self.num_index_in_last_step = 0
else:
self.steps_per_epoch += 1
# append fake data to the end
self.index += self.index[
: batch_size - self.num_index % batch_size
]
self.num_index_in_last_step = self.num_index % batch_size

self.index = np.array(self.index)
self._get_index_sequence()

def _get_index_sequence(self):
num_maintaining_index = len(self.index)
self.index_sequence = np.zeros(
(num_maintaining_index, self.sequence_length), dtype=np.int32
)
for i in range(self.sequence_length):
self.index_sequence[:, i] = self.index + i + 1 - self.sequence_length

def __len__(self):
return self.steps_per_epoch

def __getitem__(self, index):
current_batch_id = self.index[
self.batch_size * index : self.batch_size * (index + 1)
]

all_indices = self.index_sequence[
self.batch_size * index : self.batch_size * (index + 1), :
]
X_batch = self.X[all_indices, ...]
y_batch = self.y[current_batch_id]

return X_batch, y_batch

def on_epoch_end(self):
if self.shuffle:
num_maintaining_index = len(self.index)
shuffle_mapping = list(range(num_maintaining_index))
np.random.shuffle(shuffle_mapping)
self.index = self.index[shuffle_mapping]
self.index_sequence = self.index_sequence[shuffle_mapping, :]

num_indexs = 10000
num_features = 10
sequence_length = 20
batch_size = 3

X = np.arange(num_indexs * num_features).reshape(num_indexs, num_features)
y = np.arange(num_indexs)

index = np.arange(sequence_length-1, len(X))

seq = TSDataset(X,
y,
index,
sequence_length,
batch_size=batch_size)

output_signature = (
tf.TensorSpec(
shape=(batch_size, sequence_length, num_features),
dtype=tf.float32,
),
tf.TensorSpec(shape=(batch_size,), dtype=tf.float32),
)

NUM_PARALLELS = 16

def to_generator(seq: tf.keras.utils.Sequence, generator_id: int):
def wrapped_callable():
for i in range(generator_id, len(seq), NUM_PARALLELS):
yield seq[i]

return wrapped_callable

dataset = (
tf.data.Dataset.from_tensor_slices(
[
tf.data.Dataset.from_generator(
to_generator(seq, i), output_signature=output_signature
).prefetch(4)
for i in range(NUM_PARALLELS)
]
)
.interleave(
lambda x: x, # 直接返回数据集
cycle_length=NUM_PARALLELS, # 并行读取的数据集数量
block_length=1, # 从每个数据集中连续读取的批次数量
num_parallel_calls=NUM_PARALLELS, # 自动调整并行度 tf.data.experimental.AUTOTUNE
)
.prefetch(NUM_PARALLELS)
)
dataset = dataset.apply(tf.data.experimental.assert_cardinality(len(seq)))

Xi, yi = dataset.take(1).as_numpy_iterator().next()
print(f'{Xi = }')
print()
print(f'{yi = }')

batch_size=10000 的 profiler 如下所示:

interleave + 简易实现

interleave + keras.utils.Sequence 实现

可以看到输入耗时有所缓解,但还是有点长(感觉不如调小 batch_size 改进来得更直接)而且有几个坑:

  1. 有如上图所示的尖刺:如果并行度 * 模型计算一个 batch 的时间 < 吐出一个 batch 数据的时间,那么还是会造成周期性的阻塞;
  2. 针对上面一点,一个很自然的想法是增大 tf.data.Dataset.interleave 的并行度;但是实验发现并行度并不能无脑增大,一个原因是下面会说到的 tensorflow 底层的 tf_data_private_threadpool 限制;另一方面是还会带来下面的坑
  3. 在理想的情况下,每个子数据集都像“流水线”般地生成数据;但是实际情况会有一些特殊:在取前 NUM_PARALLELS 个数据的时候,会发生阻塞,因为一开始没启动子数据集的 prefetch。感觉这个设定还是跟 tensorflow 的 API 假设有关,而且更杯具的是在每个 epoch 的开头,这个耗时的启动过程总是会重新来过。如下图所示:

启动造成了延时

改进 3:手动维护多进程队列

在测试 keras.utils.Sequence 的时候我关注到了一点:在 trace viewer 中,下面有一个 tf_data_private_threadpool,猜想应该是 tensorflow 底层维护的、实际负责干活的线程池:

trace viewer

按理来说,使用 keras.utils.Sequence 之后,将 workers 往大了调之后,应该 tf_data_private_threadpool 的并行数也该是多多益善的,但是在明显可以往大了再调调效率还有得提高(生成输入的时长占比较高)的背景下,这个并行数就没!动!了!(workers=20 之后这个条目数量还是 2 个的样子);类似地,在使用 tf.data.Dataset.interleave 之后,并不是一味调高 NUM_PARALLELS 就能增大实际 tf_data_private_threadpool 的并行数。而且,这种并行数的限制肯定不是因为设备的计算资源不够用才导致的。

这里提出一个猜想就是 可能 tensorflow 内部对于并行度有约束。虽然搜索了若干对并行度的设置:

但是尝试过后发现没什么改变。然后又去翻源码,翻到了个 ,难道说 inter_op_parallelism_threads 已经被钦定了?我不理解

所以最后就冒出一个想法:既然你 tensorflow 对于数据获取这一方面做得这么烂,那我就自己维护多进程取数据的过程。

  • 把 prefetch 分成 NUM_PARALLELS 个槽,每个槽装 1 个 batch 的数据;
  • 有多个写者(生成数据),具体来说每个写者对一个槽里面写 batch 数据;
  • 有一个读者(读数据),循环读处理好的 batch 数据;
  • 对于每个槽,维护状态,NEW 表示当前槽的数据是最新的,USED 表示当前槽的数据是还没准备好的
  • 对每个槽配置一个锁:无论是读数据还是写数据,都需要先上锁,对数据操作,操作完才能更新对应槽的状态
  • 对象一创建就拉起 NUM_PARALLELS 个写者写数据

因为 python GIL 的限制,所以应该被做成多进程的模式;因为数据比较大,所以数据共享采用共享内存。

让 deepseek 糊了一个实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
import multiprocessing as mp
from multiprocessing.shared_memory import SharedMemory
import numpy as np
import time

# 状态常量
USED = 0
NEW = 1

class ParallelTSDataset:
def __init__(
self,
X: np.ndarray,
y: np.ndarray,
index: np.ndarray,
sequence_length: int,
batch_size: int,
shuffle: bool = False,
drop_last: bool = True,
num_parallels: int = 4
):
# 原始参数保持
self.X = X.astype(np.float32)
self.y = y.astype(np.float32)
self.original_index = index.copy()
self.sequence_length = sequence_length
self.batch_size = batch_size
self.shuffle = shuffle
self.drop_last = drop_last
self.num_parallels = num_parallels

# 共享内存管理
self.manager = mp.Manager()
self.slots = self.manager.list()

# 计算数据形状
self.x_shape = (batch_size, sequence_length, X.shape[1])
self.y_shape = (batch_size,)
self.x_dtype = np.dtype(np.float32)
self.y_dtype = np.dtype(np.float32)

# 初始化共享槽
for _ in range(num_parallels):
x_shm = SharedMemory(create=True, size=np.prod(self.x_shape)*self.x_dtype.itemsize)
y_shm = SharedMemory(create=True, size=np.prod(self.y_shape)*self.y_dtype.itemsize)

self.slots.append({
'x_shm': x_shm.name,
'y_shm': y_shm.name,
'state': self.manager.Value('i', USED),
'lock': self.manager.Lock(),
'ready': self.manager.Event()
})
x_shm.close()
y_shm.close()

# 初始化索引相关
self._reset_indexes()

# 启动写进程
self.writers = []
for slot_idx in range(num_parallels):
p = mp.Process(target=self._writer_worker, args=(slot_idx,), daemon=True)
p.start()
self.writers.append(p)

self.reader_idx = 0

def _reset_indexes(self):
"""初始化/重置索引系统"""
self.index = self.original_index.copy()
self.num_index = len(self.index)

# 计算 epoch 步数
self.steps_per_epoch = self.num_index // self.batch_size
if not (self.num_index % self.batch_size == 0 or self.drop_last):
self.steps_per_epoch += 1

# 生成索引序列
self.index_sequence = np.zeros(
(self.num_index, self.sequence_length), dtype=np.int32
)
for i in range(self.sequence_length):
self.index_sequence[:, i] = self.index + i + 1 - self.sequence_length

def _writer_worker(self, slot_idx):
"""写进程工作函数"""
slot = self.slots[slot_idx]
x_shm = SharedMemory(name=slot['x_shm'])
y_shm = SharedMemory(name=slot['y_shm'])

x_arr = np.ndarray(self.x_shape, self.x_dtype, x_shm.buf)
y_arr = np.ndarray(self.y_shape, self.y_dtype, y_shm.buf)
batch_offset = slot_idx * self.batch_size

try:
while True:
with slot['lock']:
if slot['state'].value == USED:
# 填充 X 数据
x_data = self.X[self.index_sequence[batch_offset:batch_offset+self.batch_size]]
x_arr[:] = x_data

# 填充 y 数据
y_data = self.y[self.index[batch_offset:batch_offset+self.batch_size]]
y_arr[:] = y_data

slot['state'].value = NEW
slot['ready'].set()
batch_offset += self.num_parallels * self.batch_size

time.sleep(0.01) # 防止 CPU 空转
finally:
x_shm.close()
y_shm.close()

def __len__(self):
return self.steps_per_epoch

def __getitem__(self, index):
"""实现预取逻辑"""
while True:
slot = self.slots[self.reader_idx]
self.reader_idx = (self.reader_idx + 1) % self.num_parallels

if slot['ready'].is_set():
with slot['lock']:
if slot['state'].value == NEW:
# 从共享内存读取
x_shm = SharedMemory(name=slot['x_shm'])
y_shm = SharedMemory(name=slot['y_shm'])

try:
X_batch = np.copy(np.ndarray(
self.x_shape,
self.x_dtype,
x_shm.buf
))
y_batch = np.copy(np.ndarray(
self.y_shape,
self.y_dtype,
y_shm.buf
))
finally:
x_shm.close()
y_shm.close()

slot['state'].value = USED
slot['ready'].clear()
return X_batch, y_batch
else:
time.sleep(0.01)

def on_epoch_end(self):
"""epoch结束时打乱数据"""
if self.shuffle:
shuffle_indices = np.random.permutation(len(self.index))
self.index = self.index[shuffle_indices]
self.index_sequence = self.index_sequence[shuffle_indices]

def cleanup(self):
"""清理资源"""
for p in self.writers:
p.terminate()
for slot in self.slots:
try:
x_shm = SharedMemory(name=slot['x_shm'])
x_shm.close()
x_shm.unlink()
except:
pass
try:
y_shm = SharedMemory(name=slot['y_shm'])
y_shm.close()
y_shm.unlink()
except:
pass

# 测试代码保持不变
num_indexs = 20
num_features = 10
sequence_length = 5
batch_size = 3
NUM_PARALLELS = 4

X = np.arange(num_indexs * num_features).reshape(num_indexs, num_features)
y = np.arange(num_indexs)

index = np.arange(sequence_length-1, len(X))

dataset = ParallelTSDataset(
X,
y,
index,
sequence_length,
batch_size=batch_size,
num_parallels=NUM_PARALLELS
)

try:
Xi, yi = dataset[0]
print(f'{Xi = }')
print()
print(f'{yi = }')
finally:
dataset.cleanup()

注意:在 linux 系统中可以使用多个子进程加载数据,而在 windows 系统中不能这样做。如果要在 windows 系统中使用的话,创建子进程的代码必须要在 if __name__ == '__main__': 下面才行

此时注意:因为已经使用了一个循环队列来控制数据的读写,所以数据集的生成相当于是一个 iterator,只关注 next 了,__getitem__ 方法的 index 参数没有真正被使用,而是根据调用次数来绝对的 __getitem__ 的返回值,所以理论上也可以向上暴露 generator。

但是有一个问题始终是存在的,就是调用 model.fit 的时候,每个 epoch 都会重新创建一个数据集对象,prefetch 之类的占便宜手法不能跨 epoch 应用(这点很烦,这样就解释了为什么 from_generator 是传入一个 callable,它内部每个 epoch 会调用 __call__ 方法)。这点目前没找到解决方法,不过似乎再往下钻研性价比过低了,就此打住,告一段落。