Decision Tree.

决策树(decision tree)是一种对训练数据集(X,y)(X,y)进行划分的树形结构算法。决策树既可以看作一个if-then规则的集合,其中的规则互斥且完备;又可以看作描述训练数据集的条件概率分布P(y|X)P(y|X)

一棵完整的决策树是由结点(node)和有向边(directed edge)组成的。结点包括内部(internal)结点和叶(leaf)结点:其中内部结点对输入数据的某个特征维度进行条件判断,叶结点作为决策树的某一路输出。有向边用于把输入数据划分到不同的分支(branch)。

决策树的基本算法包含了三个选择:

决策树可以表示成递归形式:

G(x)=Nn=1[b(x)=n]Gn(x)G(x)=Nn=1[b(x)=n]Gn(x)

其中NN表示每一个结点的分支数,根据条件进入不同的子树;b(x)b(x)是分支函数(branching tree),用来决定数据前往哪个分支。

决策树算法递归地根据分支条件进行特征选择,然后把训练数据集中的数据划分到不同的分支中,每一个数据最终会落入一个叶结点中。当给定一个测试数据时,判断其所属的叶结点,并输出对应叶结点的输出值:对于分类取该结点中出现最多的数据类别;对于回归取该结点中数据标签的均值。

决策树有以下优点

决策树有以下缺点

根据设置的分支个数和分支条件不同,决策树算法体现为不同的形式:

决策树 分支个数 分支条件
ID3 多叉树 信息增益
C4.5 多叉树 信息增益比
CART 二叉树 (分类): 基尼指数
(回归): 均方误差

1. ID3

ID3是一种以信息增益作为分支条件的多叉决策树。

(1) 信息增益

信息增益(information gain)定义为给定特征AA时,数据集DD的不确定性减少的程度(也即为互信息):

g(D,A)=H(D)H(D|A)g(D,A)=H(D)H(D|A)

其中H(D)H(D)表示数据集DD的经验熵(empirical entropy),用于衡量数据集DD的标签的不确定性。在计算时首先统计每个标签cc的频数,然后计算标签频率分布的熵:

H(D)=Cc=1|D(y=c)||D|log|D(y=c)||D|H(D)=Cc=1|D(y=c)||D|log|D(y=c)||D|

H(D|A)H(D|A)表示特征AA对数据集DD的经验条件(conditional)熵,用于衡量已知特征AA的条件下数据集DD的标签的不确定性。在计算时首先构造特征AA的每种取值情况下的经验熵,然后按照特征取值的频率进行加权:

H(D|A)=Kk=1|Dk||D|H(Dk)=Kk=1|Dk||D|Cc=1|Dk(y=c)||Dk|log|Dk(y=c)||Dk|

(2) ID3的算法流程

给定训练数据集D、数据的特征集A和阈值ϵ

  1. 如果数据集D中所有样本均属于同一类c0,则决策树T为单结点树,并将c0作为该结点的类标记,返回T
  2. 如果特征集A=Φ,则T为单结点树,并将D中出现次数最多的类cmax作为该结点的类标记,返回T
  3. 计算特征集A中每一个特征的信息增益,选择信息增益最大的特征Amax
  4. 如果Amax的信息增益小于阈值ϵ,则T为单结点树,并将该结点中出现次数最多的类cmax作为该结点的类标记,返回T
  5. 对于Amax的每一种可能取值ak,按照Amax=akD分割成若干非空子集Dk,将Dk中出现次数最多的类ck作为该类标记构造子结点;
  6. 对第k个子结点,以Dk为训练集,以AAmax为特征集,递归地调用1-5,得到并返回子树Tk

(3) 实现ID3

import math
import operator

class DisicionTree():
    def __init__(self):
        self.myTree = {}
       
    # 递归构建决策树
    def fit(self, dataSet, feature_labels):
        # 取出类别标签
        classList = [example[-1] for example in dataSet]            
        # 如果类别完全相同则停止继续划分
        if classList.count(classList[0]) == len(classList):           
            return classList[0]
        # 遍历完所有特征时返回出现次数最多的类标签
        if len(dataSet[0]) == 1 or len(feature_labels) == 0:                
            return self.majorityCnt(classList)
        # 获取最优特征维度    
        bestFeat = self.chooseBestFeatureToSplit(dataSet)                
        # 得到最优特征标签
        bestFeatLabel = feature_labels[bestFeat]
        # 根据最优特征的标签生成树
        self.myTree.update({bestFeatLabel:{}})
        # 删除已经使用特征标签
        del(feature_labels[bestFeat])
        # 得到训练集中所有最优特征维度的所有属性值
        featValues = [example[bestFeat] for example in dataSet]       
        uniqueVals = set(featValues)
        # 遍历特征,创建决策树
        for value in uniqueVals:
            subLabels = feature_labels[:]
            self.myTree[bestFeatLabel][value] = self.fit(
                dataSet = self.splitDataSet(dataSet, bestFeat, value),
                feature_labels = subLabels,
                )
        return self.myTree

    # 返回classList中出现次数最多的元素
    def majorityCnt(self, classList):
        classCount = {}
        keys = set(classList)
        for key in keys:
            classCount[key] = classList.count(key)
        #根据字典的值降序排序
        sortedClassCount = sorted(classCount.items(), 
                                  key = operator.itemgetter(1), 
                                  reverse = True)  
        return sortedClassCount[0][0]   

    # 根据信息增益选择特征
    def chooseBestFeatureToSplit(self, dataSet):
        numFeatures = len(dataSet[0]) - 1     # 特征数量
        baseEntropy = self.calcShannonEnt(dataSet) # 计算数据集的经验熵
        bestInfoGain = 0.0                    # 信息增益
        bestFeature = -1                      # 最优特征的索引值
        for i in range(numFeatures): 
            #获取所有数据样本的第i个特征
            featList = [example[i] for example in dataSet]
            uniqueVals = set(featList)        # 第i个特征的所有取值
            newEntropy = 0.0                  # 经验条件熵
            #计算第i个特征的信息增益
            for value in uniqueVals:
                # 按照第i个特征拆分数据集
                subDataSet = self.splitDataSet(dataSet, i, value)
                prob = len(subDataSet) / float(len(dataSet))
                newEntropy += prob * self.calcShannonEnt(subDataSet)
            infoGain = baseEntropy - newEntropy
            # 寻找信息增益最大的特征
            if (infoGain > bestInfoGain):
                bestInfoGain = infoGain
                bestFeature = i
        return bestFeature

    # 计算经验熵(香农熵)
    def calcShannonEnt(self, dataSet):
        # 返回数据集的样本数
        numEntires = len(dataSet)       
        # 收集所有目标标签 (最后一个维度)
        labels= [featVec[-1] for featVec in dataSet]     
        # 去重、获取标签种类
        keys = set(labels)
        shannonEnt = 0.0
        for key in keys:
           # 计算每种标签出现的次数
           prob = float(labels.count(key)) / numEntires 
           shannonEnt -= prob * math.log(prob, 2) 
        return shannonEnt

    # 数据集分割
    def splitDataSet(self, dataSet, axis, value):        
        retDataSet = []
        for featVec in dataSet:
            if featVec[axis] == value:
                # 去除数据的第i个特征
                reducedFeatVec = featVec[:axis]
                reducedFeatVec.extend(featVec[axis+1:])
                retDataSet.append(reducedFeatVec)
        return retDataSet

    # 进行新数据的分类
    def predict(self, inputTree, feature_labels, testVec):
        # 获取决策树结点
        firstStr = next(iter(inputTree))
        # 获取子树
        secondDict = inputTree[firstStr]
        featIndex = feature_labels.index(firstStr)
        # 把数据划分到不同的分支                                   
        for key in secondDict.keys():
            if testVec[featIndex] == key:
                # 如果对应子树
                if type(secondDict[key]).__name__ == 'dict':
                    classLabel = self.predict(
                        inputTree = secondDict[key],
                        testVec = testVec
                        )
                # 如果对应叶结点
                else: classLabel = secondDict[key]
        return classLabel


# 创建数据集
def createDataSet():
    dataSet = [[0, 0, 0, 0, 'no'],                        #数据集
            [0, 0, 0, 1, 'no'],
            [0, 1, 0, 1, 'yes'],
            [0, 1, 1, 0, 'yes'],
            [0, 0, 0, 0, 'no'],
            [1, 0, 0, 0, 'no'],
            [1, 0, 0, 1, 'no'],
            [1, 1, 1, 1, 'yes'],
            [1, 0, 1, 2, 'yes'],
            [1, 0, 1, 2, 'yes'],
            [2, 0, 1, 2, 'yes'],
            [2, 0, 1, 1, 'yes'],
            [2, 1, 0, 1, 'yes'],
            [2, 1, 0, 2, 'yes'],
            [2, 0, 0, 0, 'no']]
    labels = ['年龄', '有工作', '有房子', '信贷情况']        #特征标签
    return dataSet, labels                             #返回数据集和分类属性

if __name__ == '__main__':
    # 获取数据集
    dataSet, labels = createDataSet()
    feature_labels = labels[:]
    dt = DisicionTree()
    myTree = dt.fit(dataSet, labels)
    print(myTree) # {'有房子': {0: {...}, 1: 'yes'}, '有工作': {0: 'no', 1: 'yes'}}
    
    # 测试
    testVec = [0,1,1,2] 
    result = dt.predict(myTree, feature_labels, testVec)
    print(result) # yes

2. C4.5

C4.5模型与ID3模型类似,主要区别在于使用信息增益比作为分支条件。

数据集D在特征A上的信息增益比(information gain radio)定义为信息增益与条件熵的比值:

gR(D,A)=g(D,A)H(D)

3. CART

CART (classification and regression tree)是一种二叉树形式的决策树算法,既可以用于分类也可以用于回归。CART算法简单,容易实现;具有具有良好的可解释性;并且可以处理特征缺失的情况。对于每一个内部节点,CART使用决策桩算法(decision stump)

(1) 决策桩 Decision Stump

决策桩算法可以看做一维的感知机,即通过一定的准则将数据按照某一个特征维度分成两份:若数据的第i个特征不超过θ,则前往右边的子树;否则前往左边的子树:

b(x)=[xiθ]+1

一个好的决策桩对数据进行划分后,应使每一组子数据集内的不纯度最小(即设定合适的分支条件)。根据设置的分支条件不同,CART可以分别应用于回归和分类问题。

(2) 回归CART:最小二乘回归树 least squares regression tree

最小二乘回归树把输入空间划分为M个互不相交的子区域R1,,RM,计算每个子区域Rm上的输出值Cm,从而构造回归树模型:

f(x)=Mm=1CmI(xRm)

最小二乘回归树根据平方误差最小化准则选择最优划分特征:

minxiRm(yif(xi))2

具体地,对输入数据x选择一个切分特征d及其对应的切分点p,将空间递归地划分为两个子空间:

R1(d,p)={x|xdp},R2(d,p)={x|xd>p}

并将每个子空间的输出值设定为子空间内所有数据的平均标签值:

C1=1|R1(d,p)|xR1(d,p)y,C2=1|R2(d,p)|xR2(d,p)y

最优切分特征d及最优切分点p的选择通过求解:

mind,p[minC1xR1(d,p)(yC1)2+minC2xR2(d,p)(yC2)2]

在实践中首先遍历特征维度d;对于每个特征维度,遍历所有可能的切分点p;直至找到使得上式最小的(d,p)值。

⚪ 回归CART的例子

已知如图所示的训练数据,试用平方误差损失准则生成一个二叉回归树。

该数据集只有一个切分变量x,分别选择p=1,,10为切分点,把数据分成左右两份。对于每个切分点分别计算两个子集的平均输出C1,C2,并进一步计算平方误差:

从结果可得p=5时平方误差最小,因此选择第一个最优切分点为p1=5,划分成两个子数据集R1=1,2,3,4,5R2=6,7,8,9,10,两个子数据集的输出分别为ˆc1=5.06,ˆc2=8.18

递归地执行上述过程,即可构造二叉回归树:

(3) 分类CART

对于分类问题,CART使用基尼指数(Gini index)作为分支条件。基尼指数越大,表明数据集的不确定性越大。

若分类问题共有K个类别,且某一样本属于第k个类别的概率为pk,则该样本的基尼指数为:

Gini(p)=Kk=1pk(1pk)=1Kk=1p2k

对于二分类问题(K=2),若记某一类样本所占总样本的比例为μ,另一类所占比例为1μ,则基尼指数为:

Gini(μ)=2μ(1μ)

若样本集D共有N个样本,则该样本集的基尼指数为:

Gini(D)=1Kk=1(Nn=1[yn=k]N)2

若样本集D根据特征A可以进行划分D=(D1,D2),则在特征A下样本集的基尼指数为:

Gini(D,A)=|D1|NGini(D1)+|D2|NGini(D2)

在构造分类CART时,根据数据集D的每一个特征A的每一个取值情况把D划分成两部分(D1,D2),计算此时的基尼指数Gini(D,A)。遍历所有特征及其所有可能的切分点,选择基尼指数最小的作为最优特征及最优切分点,把数据集划分成两部分;递归地执行上述操作,直至满足停止条件。

(4) 处理缺失值 Handle missing features

CART可以处理预测时缺失特征的情况。一种常用的方法就是代理分支(surrogate branch),即寻找与每个特征相似的替代特征。确定是相似特征的做法是在决策树训练的时候,如果存在一个特征与当前特征切分数据的方式和结果是类似的,则表明二者是相似的,就把该替代的特征也存储下来。当预测时遇到原特征缺失的情况,就用替代特征进行分支判断和选择。

(5) 实现CART

下面以回归CART为例,给出实现过程:

⚪ 回归CART from scratch

import numpy as np

class RegressionTree():
    def __init__(self):
        self.myTree = {}
        
    # 构建tree
    def fit(self, dataSet):
        feat, val = self.chooseBestSplit(dataSet)
        if feat == None: return val  # 满足停止条件时返回叶结点值
        # 切分后赋值
        self.myTree['spInd'] = feat
        self.myTree['spVal'] = val
        # 切分后的左右子树
        lSet, rSet = self.binSplitDataSet(dataSet, feat, val)
        self.myTree['left'] = self.fit(lSet)
        self.myTree['right'] = self.fit(rSet)
        return self.myTree

    # 二元切分
    def chooseBestSplit(self, dataSet, ops=(0, 1)):
        # 切分特征的参数阈值,用户初始设置好
        tolS = ops[0]  # 允许的误差下降值
        tolN = ops[1]  # 切分的最小样本数
        # 若所有特征值都相同,停止切分
        if len(set(dataSet[:, -1].T.tolist())) == 1:  # 标签值查重
            return None, np.mean(dataSet[:, -1])  # 使用标签均值生成叶结点
        m, n = np.shape(dataSet)
        # 计算数据集的平方误差(均方误差*总样本数)
        S = np.var(dataSet[:, -1]) * m
        bestS = inf
        bestIndex = 0
        bestValue = 0
        # 遍历数据的每个属性特征
        for featIndex in range(n - 1):
            # 遍历每个特征里不同的特征值
            for splitVal in set((dataSet[:, featIndex].T.tolist())):
                # 对每个特征进行二元切分
                subset1, subset2 = self.binSplitDataSet(dataSet, featIndex, splitVal)
                if (subset1.shape[0] < tolN) or (subset2.shape[0] < tolN): continue
                S1 = np.var(subset1[:, -1]) * subset1.shape[0]
                S2 = np.var(subset2[:, -1]) * subset2.shape[0]
                newS = S1+S2
                # 更新为误差最小的特征
                if newS < bestS:
                    bestIndex = featIndex
                    bestValue = splitVal
                    bestS = newS
        # 如果切分后误差效果下降不大,则取消切分,直接创建叶结点
        if (S - bestS) < tolS:
            return None, np.mean(dataSet[:, -1])
        subset1, subset2 = self.binSplitDataSet(dataSet, bestIndex, bestValue)
        # 判断切分后子集大小,小于最小允许样本数停止切分
        if (subset1.shape[0] < tolN) or (subset2.shape[0] < tolN):
            return None, np.mean(dataSet[:, -1])
        return bestIndex, bestValue  # 返回特征编号和用于切分的特征值

    # 切分数据集为两个子集
    def binSplitDataSet(self, dataSet, feature, value):
        subset1 = dataSet[nonzero(dataSet[:, feature] > value)[0], :]
        subset2 = dataSet[nonzero(dataSet[:, feature] <= value)[0], :]
        return subset1, subset2

    
if __name__ == "__main__":
    x = np.array(list(range(1, 11))).reshape(-1, 1)
    y = np.array([4.50, 4.75, 4.91, 5.34, 5.80, 7.05, 7.90, 8.23, 8.70, 9.00]).reshape(-1, 1)
    myDat = np.concatenate([x,y], 1)
    myTree = RegressionTree()
    print(myTree.fit(myDat))
    """
    {'spInd': 0, 'spVal': 2.0, 'left': {...}, 'right': {...}}
    """

⚪ 回归CART from sklearn

使用sklearn库可以便捷地实现二叉回归树:

import numpy as np
import matplotlib.pyplot as plt
from sklearn.tree import DecisionTreeRegressor

# Data set
x = np.array(list(range(1, 11))).reshape(-1, 1) # [n, d]
y = np.array([4.50, 4.75, 4.91, 5.34, 5.80, 7.05, 7.90, 8.23, 8.70, 9.00]).ravel() # [n,]

# Fit regression model
model = DecisionTreeRegressor(max_depth=3)
model.fit(x, y)

# Predict
X_test = np.arange(0.0, 10.0, 0.01)[:, np.newaxis] # [m, d]
y_test = model.predict(X_test)

# Plot the results
plt.figure()
plt.scatter(x, y, s=20, edgecolor="black",
            c="darkorange", label="data")
plt.plot(X_test, y_test, color="cornflowerblue",
         label="max_depth=3", linewidth=2)
plt.xlabel("data")
plt.ylabel("target")
plt.legend()
plt.show()

4. 决策树的剪枝

直接生成的决策树被称为fully-grown tree,最终会在观测样本集上实现零误差,并且较深的节点分配到的数据量逐渐减少,容易过拟合;因此需要对其进行剪枝(pruning)。剪枝是指从已生成的树上裁掉子树或叶结点,并将其对应的根结点或父结点作为新的叶结点。

决策树的剪枝有预剪枝后剪枝两种形式:

下面介绍决策树的后剪枝过程。定义决策树T的损失函数:

Cα(T)=|T|t=1NtHt(T)+α|T|=|T|t=1Cc=1NtclogNtcNt+α|T|

其中|T|是叶结点的个数,用于衡量决策树的模型复杂度;Nt是第t个叶结点对应的样本数量;Ht(T)是第t个叶结点的经验熵,

给定决策树T和参数α,则决策树的单步剪枝过程为:

  1. 计算每个叶结点的经验熵;
  2. 递归地从叶结点向上回缩。记回缩后的树为T,计算损失函数Cα(T),Cα(T)。如果Cα(T)Cα(T),则对该叶结点进行剪枝,并将其父结点作为新的叶结点。
  3. 递归地调用2,直至满足结束条件。

上述过程是逐结点地进行剪枝,也可以直接对子树进行剪枝。记某内部结点t,以t为单结点树的损失函数为Cα(t)=C(t)+α,以t为根结点的子树Tt的损失函数为Cα(Tt)=C(Tt)+α|Tt|。若两个损失函数一致Cα(t)=Cα(Tt),则该子树没有实质贡献,此时有:

α=C(t)C(Tt)|Tt|1

因此也可以自上而下地遍历决策树的内部结点t,并计算该结点的C(t)C(Tt)|Tt|1值;若该值等于α,则对以t为根结点的子树Tt进行剪枝。

对于固定的α,存在最优子树Tα。特别地,当α+时最优子树为单结点树;当α=0时最优子树为完整的决策树。若α从小到大变化,0<α0<α1<<αn<+,,则产生一个最优子树序列{T0,T1,,Tn};进一步采用交叉验证法可以从中选取最优子树。