使用部位亲和场实现实时多人2D姿态估计.
本文作者提出了一种多人2D姿态估计方法OpenPose。OpenPose是一种自下而上的多人姿态估计方法,这类方法首先检测图像中的所有人体关键点(比如说手、肘、肩等),再将检测到的关键点位置相互匹配连接,以组合成多人的姿态。在匹配关键点时,OpenPose引入了部位亲和场(Part Affinity Field, PAF),将识别到的身体部位与图像中的每个人相关联。
OpenPose使用卷积神经网络从输入图像中提取部位置信图与部位亲和场,然后通过二分匹配将关节点组合成图像中所有人的全身姿势。
1. 部位置信图 Part Confidence Map
部位置信图(PCM)是指人体关节点的热力图,用于表征人体关节点的位置。预先指定人体的$J$个关节点,则PCM具有$J+1$个通道,最后一个通道作为背景。增加背景通道能增加监督信息,有利于网络更好地学习。
每个关节点的位置是通过高斯核创建的。首先为每个人$k$生成个人置信图:
\[\mathbf{S}^*_{j,k}(\mathbf{p}) = \exp\left( -\frac{||\mathbf{p}-\mathbf{x}_{j,k}||_2^2}{\sigma^2} \right)\]其中\(\mathbf{x}_{j,k}\)是第$k$个人的第$j$个关节点的真实位置,$\sigma$用于控制峰值的传播。
生成个人置信图后,整体的部位置信图是通过最大值算子对单个人的置信度图进行聚合:
\[\mathbf{S}^*_{j}(\mathbf{p}) = \max_k \mathbf{S}^*_{j,k}(\mathbf{p})\]取置信图的最大值而不是平均值,以便附近峰值的精度保持不同,如下图所示。在测试时,预测置信度图,并通过执行非极大值抑制来获得候选的关键身体部位点。
2. 部位亲和场 Part Affinity Field
部位亲和场(PAF)是用于编码肢体支撑区域的位置和方向信息的2D向量场。OpenPose首先对关节进行人为配对(共$19$个关节对),在每对关节点之间生成一个PAF,每个PAF对应一个人体骨骼(包含耳部-肩部的虚拟骨骼)。由于PAF是2D向量,因此PAF的输出通道数为$2\times 19=38$。对于属于特定肢体的区域中的每个像素,2D向量编码从肢体的一个部分指向另一部分的方向。
在构造关节对$j_1,j_2$的PAF时,如果点$p$在骨骼$c=j_1\to j_2$上,则PAF值是从$j_1$指向$j_2$的单位向量;对于所有其他点,PAF值是零向量。
\[\mathbf{L}^*_{c,k}(\mathbf{p}) = \begin{cases} \mathbf{v} = \frac{\mathbf{x}_{j_2,k}-\mathbf{x}_{j_1,k}}{||\mathbf{x}_{j_2,k}-\mathbf{x}_{j_1,k}||_2} , & \mathbf{p} \in c \\ \mathbf{0}, & \text{otherwise} \end{cases}\]骨骼$c$上的点集定义为两个关节点之间的矩形区域。矩形区域的长度为两关节点之间的直线距离$l_{c,k}$,宽度为指定参数$2\sigma_l$,则点集判据为:
\[0 \leq \mathbf{v} \cdot (\mathbf{p}-\mathbf{x}_{j_1,k}) \leq l_{c,k} \quad \text{and} \quad |\mathbf{v}_{⊥} \cdot (\mathbf{p}-\mathbf{x}_{j_1,k}) | \leq \sigma_l\]最终的每个骨骼$c$的PAF,取$K$个人的PAF的平均值:
\[\mathbf{L}^*_{c}(\mathbf{p}) = \frac{1}{n_c(\mathbf{p})} \sum_k \mathbf{L}^*_{c,k}(\mathbf{p})\]其中\(n_c(\mathbf{p})\)表示\(\mathbf{p}\)点处非零向量的个数。
3. 部位关联 Part Association
通过部位置信图(PCM)与部位亲和场(PAF)可以执行部位关联,这是通过沿着连接候选关节对位置的线段计算对应PAF上的线积分来实现的。
对于两个候选关节位置\(\mathbf{d}_{j_1},\mathbf{d}_{j_2}\),沿着线段对$c=j_1\to j_2$的PAF进行线积分,以测量其关联的置信度:
\[E_{j_1j_2} = \int_{u=0}^{u=1} \mathbf{L}_{c}((1-u)\mathbf{d}_{j_1}+u\mathbf{d}_{j_2}) \cdot \frac{\mathbf{d}_{j_2}-\mathbf{d}_{j_1}}{||\mathbf{d}_{j_2}-\mathbf{d}_{j_1}||_2} du\]线积分计算量较大,实践中通过对$u$的均匀间隔值进行采样和求和来近似积分。
上述计算得到的$E_{j_1j_2}$给出了一个关键点$j_1$到另一个关键点$j_2$可能存在连接的置信度。由于关节点的连接顺序是已知的,因此找到候选关节点集合$D_{j_1},D_{j_2}$之间的最佳关联可以构建为一个二分匹配问题并求解:
\[\begin{aligned} \max _{\mathcal{Z}_c}\quad &E_c=\max _{\mathcal{Z}_c} \sum_{m \in \mathcal{D}_{j_1}} \sum_{n \in \mathcal{D}_{j_2}} E_{m n} \cdot z_{j_1 j_2}^{m n} \\ \text { s.t. } \quad &\forall m \in \mathcal{D}_{j_1}, \sum_{n \in \mathcal{D}_{j_2}} z_{j_1 j_2}^{m n} \leq 1 \\ &\forall n \in \mathcal{D}_{j_2}, \sum_{m \in \mathcal{D}_{j_1}} z_{j_1 j_2}^{m n} \leq 1 \end{aligned}\]# heatmap_avg: 部位置信图 [H, W, 18+1]
# paf_avg : 部位亲和场 [H, W, 19x2]
all_peaks = [] # 存储筛选后的关节点
peak_counter = 0
# 对候选关节点进行非极大值抑制
for part in range(18):
map_ori = heatmap_avg[:, :, part]
from scipy.ndimage.filters import gaussian_filter
one_heatmap = gaussian_filter(map_ori, sigma=3)
map_left = np.zeros(one_heatmap.shape)
map_left[1:, :] = one_heatmap[:-1, :]
map_right = np.zeros(one_heatmap.shape)
map_right[:-1, :] = one_heatmap[1:, :]
map_up = np.zeros(one_heatmap.shape)
map_up[:, 1:] = one_heatmap[:, :-1]
map_down = np.zeros(one_heatmap.shape)
map_down[:, :-1] = one_heatmap[:, 1:]
peaks_binary = np.logical_and.reduce(
(one_heatmap >= map_left, one_heatmap >= map_right, one_heatmap >= map_up, one_heatmap >= map_down, one_heatmap > thre1))
peaks = list(zip(np.nonzero(peaks_binary)[1], np.nonzero(peaks_binary)[0])) # note reverse
peaks_with_score = [x + (map_ori[x[1], x[0]],) for x in peaks]
peak_id = range(peak_counter, peak_counter + len(peaks))
peaks_with_score_and_id = [peaks_with_score[i] + (peak_id[i],) for i in range(len(peak_id))]
all_peaks.append(peaks_with_score_and_id)
peak_counter += len(peaks)
# find connection in the specified sequence, center 29 is in the position 15
limbSeq = [[2, 3], [2, 6], [3, 4], [4, 5], [6, 7], [7, 8], [2, 9], [9, 10], \
[10, 11], [2, 12], [12, 13], [13, 14], [2, 1], [1, 15], [15, 17], \
[1, 16], [16, 18], [3, 17], [6, 18]]
# the middle joints heatmap correpondence
mapIdx = [[31, 32], [39, 40], [33, 34], [35, 36], [41, 42], [43, 44], [19, 20], [21, 22], \
[23, 24], [25, 26], [27, 28], [29, 30], [47, 48], [49, 50], [53, 54], [51, 52], \
[55, 56], [37, 38], [45, 46]]
connection_all = [] # 存储关节点的连接关系
special_k = []
mid_num = 10
# 部位关联
for k in range(len(mapIdx)):
score_mid = paf_avg[:, :, [x - 19 for x in mapIdx[k]]]
candA = all_peaks[limbSeq[k][0] - 1]
candB = all_peaks[limbSeq[k][1] - 1]
nA = len(candA)
nB = len(candB)
indexA, indexB = limbSeq[k]
if (nA != 0 and nB != 0):
connection_candidate = []
for i in range(nA):
for j in range(nB):
vec = np.subtract(candB[j][:2], candA[i][:2])
norm = math.sqrt(vec[0] * vec[0] + vec[1] * vec[1])
norm = max(0.001, norm)
vec = np.divide(vec, norm)
startend = list(zip(np.linspace(candA[i][0], candB[j][0], num=mid_num), \
np.linspace(candA[i][1], candB[j][1], num=mid_num)))
vec_x = np.array([score_mid[int(round(startend[I][1])), int(round(startend[I][0])), 0] \
for I in range(len(startend))])
vec_y = np.array([score_mid[int(round(startend[I][1])), int(round(startend[I][0])), 1] \
for I in range(len(startend))])
score_midpts = np.multiply(vec_x, vec[0]) + np.multiply(vec_y, vec[1])
score_with_dist_prior = sum(score_midpts) / len(score_midpts) + min(
0.5 * oriImg.shape[0] / norm - 1, 0)
criterion1 = len(np.nonzero(score_midpts > thre2)[0]) > 0.8 * len(score_midpts)
criterion2 = score_with_dist_prior > 0
if criterion1 and criterion2:
connection_candidate.append(
[i, j, score_with_dist_prior, score_with_dist_prior + candA[i][2] + candB[j][2]])
connection_candidate = sorted(connection_candidate, key=lambda x: x[2], reverse=True)
connection = np.zeros((0, 5))
for c in range(len(connection_candidate)):
i, j, s = connection_candidate[c][0:3]
if (i not in connection[:, 3] and j not in connection[:, 4]):
connection = np.vstack([connection, [candA[i][3], candB[j][3], s, i, j]])
if (len(connection) >= min(nA, nB)):
break
connection_all.append(connection)
else:
special_k.append(k)
connection_all.append([])
# subset: n*20 array, 0-17 is the index in candidate, 18 is the total score, 19 is the total parts
# candidate: x, y, score, id
subset = -1 * np.ones((0, 20))
candidate = np.array([item for sublist in all_peaks for item in sublist])
# 根据部位关联结果解析每个人的关节点
for k in range(len(mapIdx)):
if k not in special_k:
partAs = connection_all[k][:, 0]
partBs = connection_all[k][:, 1]
indexA, indexB = np.array(limbSeq[k]) - 1
for i in range(len(connection_all[k])): # = 1:size(temp,1)
found = 0
subset_idx = [-1, -1]
for j in range(len(subset)): # 1:size(subset,1):
if subset[j][indexA] == partAs[i] or subset[j][indexB] == partBs[i]:
subset_idx[found] = j
found += 1
if found == 1:
j = subset_idx[0]
if subset[j][indexB] != partBs[i]:
subset[j][indexB] = partBs[i]
subset[j][-1] += 1
subset[j][-2] += candidate[partBs[i].astype(int), 2] + connection_all[k][i][2]
elif found == 2: # if found 2 and disjoint, merge them
j1, j2 = subset_idx
membership = ((subset[j1] >= 0).astype(int) + (subset[j2] >= 0).astype(int))[:-2]
if len(np.nonzero(membership == 2)[0]) == 0: # merge
subset[j1][:-2] += (subset[j2][:-2] + 1)
subset[j1][-2:] += subset[j2][-2:]
subset[j1][-2] += connection_all[k][i][2]
subset = np.delete(subset, j2, 0)
else: # as like found == 1
subset[j1][indexB] = partBs[i]
subset[j1][-1] += 1
subset[j1][-2] += candidate[partBs[i].astype(int), 2] + connection_all[k][i][2]
elif not found and k < 17: # if find no partA in the subset, create a new subset
row = -1 * np.ones(20)
row[indexA] = partAs[i]
row[indexB] = partBs[i]
row[-1] = 2
row[-2] = sum(candidate[connection_all[k][i, :2].astype(int), 2]) + connection_all[k][i][2]
subset = np.vstack([subset, row])
# delete some rows of subset which has few parts occur
deleteIdx = []
for i in range(len(subset)):
if subset[i][-1] < 4 or subset[i][-2] / subset[i][-1] < 0.4:
deleteIdx.append(i)
subset = np.delete(subset, deleteIdx, axis=0)
4. 网络结构
OpenPose首先通过VGGNet提取特征$F$。网络主体部分可以分为部位置信图(PCM)预测部分与部位亲和场(PAF)预测部分,在每个阶段中两个预测部分是并行的。
def make_layers(block, no_relu_layers):
layers = []
for layer_name, v in block.items():
if 'pool' in layer_name:
layer = nn.MaxPool2d(kernel_size=v[0], stride=v[1],
padding=v[2])
layers.append((layer_name, layer))
else:
conv2d = nn.Conv2d(in_channels=v[0], out_channels=v[1],
kernel_size=v[2], stride=v[3],
padding=v[4])
layers.append((layer_name, conv2d))
if layer_name not in no_relu_layers:
layers.append(('relu_'+layer_name, nn.ReLU(inplace=True)))
return nn.Sequential(OrderedDict(layers))
class bodypose_model(nn.Module):
def __init__(self):
super(bodypose_model, self).__init__()
# these layers have no relu layer
no_relu_layers = ['conv5_5_CPM_L1', 'conv5_5_CPM_L2', 'Mconv7_stage2_L1',\
'Mconv7_stage2_L2', 'Mconv7_stage3_L1', 'Mconv7_stage3_L2',\
'Mconv7_stage4_L1', 'Mconv7_stage4_L2', 'Mconv7_stage5_L1',\
'Mconv7_stage5_L2', 'Mconv7_stage6_L1', 'Mconv7_stage6_L1']
self.blocks = {}
block0 = OrderedDict([
('conv1_1', [3, 64, 3, 1, 1]),
('conv1_2', [64, 64, 3, 1, 1]),
('pool1_stage1', [2, 2, 0]),
('conv2_1', [64, 128, 3, 1, 1]),
('conv2_2', [128, 128, 3, 1, 1]),
('pool2_stage1', [2, 2, 0]),
('conv3_1', [128, 256, 3, 1, 1]),
('conv3_2', [256, 256, 3, 1, 1]),
('conv3_3', [256, 256, 3, 1, 1]),
('conv3_4', [256, 256, 3, 1, 1]),
('pool3_stage1', [2, 2, 0]),
('conv4_1', [256, 512, 3, 1, 1]),
('conv4_2', [512, 512, 3, 1, 1]),
('conv4_3_CPM', [512, 256, 3, 1, 1]),
('conv4_4_CPM', [256, 128, 3, 1, 1])
])
self.model0 = make_layers(block0, no_relu_layers)
# Stage 1
block1_1 = OrderedDict([
('conv5_1_CPM_L1', [128, 128, 3, 1, 1]),
('conv5_2_CPM_L1', [128, 128, 3, 1, 1]),
('conv5_3_CPM_L1', [128, 128, 3, 1, 1]),
('conv5_4_CPM_L1', [128, 512, 1, 1, 0]),
('conv5_5_CPM_L1', [512, 38, 1, 1, 0])
])
block1_2 = OrderedDict([
('conv5_1_CPM_L2', [128, 128, 3, 1, 1]),
('conv5_2_CPM_L2', [128, 128, 3, 1, 1]),
('conv5_3_CPM_L2', [128, 128, 3, 1, 1]),
('conv5_4_CPM_L2', [128, 512, 1, 1, 0]),
('conv5_5_CPM_L2', [512, 19, 1, 1, 0])
])
self.blocks['block1_1'] = block1_1
self.blocks['block1_2'] = block1_2
# Stages 2 - 6
for i in range(2, 7):
self.blocks['block%d_1' % i] = OrderedDict([
('Mconv1_stage%d_L1' % i, [185, 128, 7, 1, 3]),
('Mconv2_stage%d_L1' % i, [128, 128, 7, 1, 3]),
('Mconv3_stage%d_L1' % i, [128, 128, 7, 1, 3]),
('Mconv4_stage%d_L1' % i, [128, 128, 7, 1, 3]),
('Mconv5_stage%d_L1' % i, [128, 128, 7, 1, 3]),
('Mconv6_stage%d_L1' % i, [128, 128, 1, 1, 0]),
('Mconv7_stage%d_L1' % i, [128, 38, 1, 1, 0])
])
self.blocks['block%d_2' % i] = OrderedDict([
('Mconv1_stage%d_L2' % i, [185, 128, 7, 1, 3]),
('Mconv2_stage%d_L2' % i, [128, 128, 7, 1, 3]),
('Mconv3_stage%d_L2' % i, [128, 128, 7, 1, 3]),
('Mconv4_stage%d_L2' % i, [128, 128, 7, 1, 3]),
('Mconv5_stage%d_L2' % i, [128, 128, 7, 1, 3]),
('Mconv6_stage%d_L2' % i, [128, 128, 1, 1, 0]),
('Mconv7_stage%d_L2' % i, [128, 19, 1, 1, 0])
])
for k in self.blocks.keys():
self.blocks[k] = make_layers(self.blocks[k], no_relu_layers)
def forward(self, x):
out1 = self.model0(x)
out1_1 = self.blocks['block1_1'](out1)
out1_2 = self.blocks['block1_2'](out1)
out2 = torch.cat([out1_1, out1_2, out1], 1)
out2_1 = self.blocks['block2_1'](out2)
out2_2 = self.blocks['block2_2'](out2)
out3 = torch.cat([out2_1, out2_2, out1], 1)
out3_1 = self.blocks['block3_1'](out3)
out3_2 = self.blocks['block3_2'](out3)
out4 = torch.cat([out3_1, out3_2, out1], 1)
out4_1 = self.blocks['block4_1'](out4)
out4_2 = self.blocks['block4_2'](out4)
out5 = torch.cat([out4_1, out4_2, out1], 1)
out5_1 = self.blocks['block5_1'](out5)
out5_2 = self.blocks['block5_2'](out5)
out6 = torch.cat([out5_1, out5_2, out1], 1)
out6_1 = self.blocks['block6_1'](out6)
out6_2 = self.blocks['block6_2'](out6)
return out6_1, out6_2
此外对于每个阶段的输出,还引入了中间监督。对于每个阶段的输出,构造损失函数:
\[\begin{aligned} f_{\mathbf{S}}^t & =\sum_{j=1}^J \sum_{\mathbf{p}} \mathbf{W}(\mathbf{p}) \cdot\left\|\mathbf{S}_j^t(\mathbf{p})-\mathbf{S}_j^*(\mathbf{p})\right\|_2^2 \\ f_{\mathbf{L}}^t & =\sum_{c=1}^C \sum_{\mathbf{p}} \mathbf{W}(\mathbf{p}) \cdot\left\|\mathbf{L}_c^t(\mathbf{p})-\mathbf{L}_c^*(\mathbf{p})\right\|_2^2 \end{aligned}\]其中$W$是一个二进制mask矩阵,当点$p$处缺少标注时为$0$。