10. RF (Random Forest; ランダムフォレスト)#

import numpy as np 
import pandas as pd
import scipy
from sklearn.tree import DecisionTreeClassifier, DecisionTreeRegressor
from sklearn.datasets import load_wine
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from copy import copy
from sklearn.ensemble import RandomForestClassifier
try:
  from palmerpenguins import load_penguins
except:
  !pip install palmerpenguins
  from palmerpenguins import load_penguins


SEED = 2023_02_15

10.1. データの準備#

print("-------Original DataFrame-------------------")
df = load_penguins()
print(df.shape)
display(df.head())
#display(df.info())

print("-------Preprocessed DataFrame-------------------")
df = df.dropna()
labelencoder = LabelEncoder()
df.island = labelencoder.fit_transform(df.island)
df.sex = labelencoder.fit_transform(df.sex)
df.species = labelencoder.fit_transform(df.species)
print(df.shape)
display(df.head())

X = df.drop("species", axis=1).to_numpy()
y = df["species"].to_numpy()
X_train,X_test,y_train,y_test=train_test_split(X,y,test_size=0.5,)#stratify=y)
-------Original DataFrame-------------------
(344, 8)
species island bill_length_mm bill_depth_mm flipper_length_mm body_mass_g sex year
0 Adelie Torgersen 39.1 18.7 181.0 3750.0 male 2007
1 Adelie Torgersen 39.5 17.4 186.0 3800.0 female 2007
2 Adelie Torgersen 40.3 18.0 195.0 3250.0 female 2007
3 Adelie Torgersen NaN NaN NaN NaN NaN 2007
4 Adelie Torgersen 36.7 19.3 193.0 3450.0 female 2007
-------Preprocessed DataFrame-------------------
(333, 8)
species island bill_length_mm bill_depth_mm flipper_length_mm body_mass_g sex year
0 0 2 39.1 18.7 181.0 3750.0 1 2007
1 0 2 39.5 17.4 186.0 3800.0 0 2007
2 0 2 40.3 18.0 195.0 3250.0 0 2007
4 0 2 36.7 19.3 193.0 3450.0 0 2007
5 0 2 39.3 20.6 190.0 3650.0 1 2007

10.2. scikit-learnを使った決定木の実験#

dtc = DecisionTreeClassifier(max_depth=None,random_state=SEED)
dtc.fit(X_train,y_train)
train_acc=dtc.score(X_train,y_train)
test_acc=dtc.score(X_test,y_test)
print(f"{train_acc=}\n{test_acc=}")
train_acc=1.0
test_acc=0.9880239520958084

10.3. scikit-learnを使ったRandom Forestの実験#

rfc = RandomForestClassifier(
    n_estimators=100, # 弱学習器を何個作るか
    criterion="gini", # 損失関数をginiやentropyから指定
    max_depth=None, # 弱学習器として使った決定木の深さ上限
    max_features="sqrt", # ブートストラップサンプルの特徴数。sqrtはsqrt(n_features)
    n_jobs=-1, # 何個並列で計算するか。-1は使えるコアを全て使う。
    random_state=SEED, # 擬似乱数のSEED
)

rfc.fit(X_train,y_train)
RandomForestClassifier(n_jobs=-1, random_state=20230215)
In a Jupyter environment, please rerun this cell to show the HTML representation or trust the notebook.
On GitHub, the HTML representation is unable to render, please try loading this page with nbviewer.org.
rfc.predict(X_test)
array([0, 2, 0, 0, 0, 2, 0, 2, 1, 2, 2, 0, 0, 2, 0, 2, 2, 0, 1, 2, 1, 2,
       0, 1, 0, 2, 0, 1, 2, 0, 0, 2, 2, 1, 2, 2, 0, 2, 2, 0, 2, 0, 0, 2,
       2, 2, 1, 2, 0, 0, 0, 0, 2, 1, 0, 0, 1, 0, 2, 0, 2, 2, 2, 0, 0, 0,
       2, 2, 1, 0, 2, 0, 0, 2, 0, 2, 0, 2, 0, 0, 2, 0, 1, 0, 0, 2, 0, 1,
       0, 2, 2, 2, 1, 2, 0, 0, 0, 2, 2, 0, 0, 0, 2, 2, 0, 2, 2, 2, 0, 1,
       0, 1, 0, 1, 2, 1, 2, 2, 2, 0, 2, 0, 1, 1, 0, 1, 2, 2, 2, 1, 0, 0,
       2, 1, 0, 0, 1, 1, 2, 2, 1, 0, 0, 2, 0, 1, 1, 0, 0, 2, 0, 2, 0, 1,
       1, 0, 2, 2, 2, 1, 2, 2, 1, 2, 1, 1, 0])
rfc.score(X_test,y_test)
0.9820359281437125

10.4. RandomForestClassifierのシンプルな実装例#

NumPyを使ってRandom Forest Classifierを実装しましょう。ただし、sklearnのDecisionTreeClassifierを使います。

import numpy as np 
import scipy
from sklearn.tree import DecisionTreeClassifier
from copy import copy

# 最頻値を求める
def mode(Mat: np.ndarray, axis: int = None) -> np.ndarray:
    return scipy.stats.mode(Mat, axis=axis, keepdims=False).mode

def get_bootstrap_sample_indices(rng:np.random._generator.Generator, X:np.ndarray, bootstrap_sample_size:int)->np.ndarray:
    """ブートストラップサンプルを一つ作る

    Args:
        rng (np.random._generator.Generator): 擬似乱数生成器
        X (np.ndarray): 二次元配列
        bootstrap_sample_size (int): サンプルサイズ

    Returns:
        np.ndarray: サンプルのindexを持った一次元配列
    """
    return rng.integers(low=0, high=X.shape[0],size=bootstrap_sample_size,)


class MyRandomForestClassifier:
    def __init__(self,
                 bootstrap_sample_size: int,
                 max_features: int = None,
                 n_estimators: int = 100,
                 rng: np.random._generator.Generator = np.random.default_rng(
                     np.random.randint(2**20)),
                 **estimator_params,
                 ):
        self.n_estimators = n_estimators
        self.bootstrap_sample_size = bootstrap_sample_size
        self.max_features = max_features
        self.rng = rng
        self.estimator_params = estimator_params
        self.estimators_ = []
        self.selected_features_ = []
        self.is_fitted = False

    def fit(self, X, y):
        # ブートストラップサンプルを作成
        for _x, _y in self.get_bootstrap_sample(X, y):
            # 弱識別器の訓練を行う
            _estimator = DecisionTreeClassifier(**self.estimator_params,
                                         random_state=self.rng.integers(0, 2**20),)
            _estimator.fit(_x, _y)
            # 学習済み弱識別器をリストに保存
            self.estimators_.append(_estimator)
        self.is_fitted = True
        return self

    def get_bootstrap_sample(self, X: np.ndarray, y: np.ndarray):
        """ブートストラップサンプルを作成し、データとラベルのペアを一つ一つ返すメソッド
        """
        if self.is_fitted:
            print("warning! 2回目以降のfitです。bootstrap sampleの作り方が初期化されます。")

        for _ in range(self.n_estimators):
            _sample_data_indices = get_bootstrap_sample_indices(self.rng,X,self.bootstrap_sample_size)
            
            # ランダムに特徴を選択する
            _feature_indices = np.arange(X.shape[1])
            if self.max_features is not None:
                self.rng.shuffle(_feature_indices)
                _feature_indices = _feature_indices[:self.max_features]
            self.selected_features_.append(_feature_indices)

            # ブートストラップサンプルを切り出す
            X_sample = X[_sample_data_indices][:, _feature_indices]
            y_sample = y[_sample_data_indices]
            yield X_sample, y_sample

    def predict(self, X):
        assert self.is_fitted, "このメソッドは訓練後に利用してください。"
        _pred_labels = []

        for _index in range(len(self.estimators_)):
            # _index番目の弱識別器を使ってXのラベルを推論する
            _estimator = self.estimators_[_index]
            _feature_indices = self.selected_features_[_index]
            _pred_labels.append(_estimator.predict(X[:, _feature_indices]))
        _pred_labels = np.vstack(_pred_labels)

        # 多数決で予測値を決定する(_pred_labelsの各列の最頻値を返す)
        pred_labels = mode(_pred_labels, axis=0)
        return pred_labels

    def score(self, X, y):
        "正答率を計算する"
        assert self.is_fitted, "このメソッドは訓練後に利用してください。"
        _pred_labels = self.predict(X,)
        return (_pred_labels == y).sum()/y.size
rf = MyRandomForestClassifier(
    bootstrap_sample_size=int(X_train.shape[0]*0.9),
    max_features = int(X_train.shape[1]*0.8),
    n_estimators = 100,
    rng = np.random.default_rng(SEED),
    max_depth=None,
    )
rf.fit(X_train,y_train)

train_acc=rf.score(X_train,y_train)
test_acc=rf.score(X_test,y_test)
print(f"{train_acc=}\n{test_acc=}")
train_acc=1.0
test_acc=0.9940119760479041

10.5. RandomForestRegressorのシンプルな実装例#

NumPyを使ってRandom Forest Regressorを実装しましょう。ただし、sklearnのDecisionTreeRegressorを使います。

class MyRandomForestRegressor:
    def __init__(self,
                 bootstrap_sample_size: int,
                 max_features: int = None,
                 n_estimators: int = 100,
                 rng: np.random._generator.Generator = np.random.default_rng(
                     np.random.randint(2**20)),
                 **estimator_params,
                 ):
        self.n_estimators = n_estimators
        self.bootstrap_sample_size = bootstrap_sample_size
        self.max_features = max_features
        self.rng = rng
        self.estimator_params = estimator_params
        self.estimators_ = []
        self.selected_features_ = []
        self.is_fitted = False

    def fit(self, X, y):
        # ブートストラップサンプルを作成
        for _x, _y in self.get_bootstrap_sample(X, y):
            # 弱識別器の訓練を行う
            _estimator = DecisionTreeRegressor(**self.estimator_params,
                                         random_state=self.rng.integers(0, 2**20),)
            _estimator.fit(_x, _y)
            # 学習済み弱識別器をリストに保存
            self.estimators_.append(_estimator)
        self.is_fitted = True
        return self

    def get_bootstrap_sample(self, X: np.ndarray, y: np.ndarray):
        """ブートストラップサンプルを作成し、データとラベルのペアを一つ一つ返すメソッド
        """
        if self.is_fitted:
            print("warning! 2回目以降のfitです。bootstrap sampleの作り方が初期化されます。")

        for _ in range(self.n_estimators):
            _sample_data_indices = get_bootstrap_sample_indices(self.rng,X,self.bootstrap_sample_size)
            
            # ランダムに特徴を選択する
            _feature_indices = np.arange(X.shape[1])
            if self.max_features is not None:
                self.rng.shuffle(_feature_indices)
                _feature_indices = _feature_indices[:self.max_features]
            self.selected_features_.append(_feature_indices)

            # ブートストラップサンプルを切り出す
            X_sample = X[_sample_data_indices][:, _feature_indices]
            y_sample = y[_sample_data_indices]
            yield X_sample, y_sample

    def predict(self, X):
        assert self.is_fitted, "このメソッドは訓練後に利用してください。"
        _pred_labels = []

        for _index in range(len(self.estimators_)):
            # _index番目の弱識別器を使ってXのラベルを推論する
            _estimator = self.estimators_[_index]
            _feature_indices = self.selected_features_[_index]
            _pred_labels.append(_estimator.predict(X[:, _feature_indices]))
        _pred_labels = np.vstack(_pred_labels)

        # 平均で予測値を決定する(_pred_labelsの各列の平均を返す)
        pred_labels = np.mean(_pred_labels, axis=0)
        return pred_labels

    def score(self, X, y):
        "正答率を計算する"
        assert self.is_fitted, "このメソッドは訓練後に利用してください。"
        _pred_labels = self.predict(X,)
        return (_pred_labels == y).sum()/y.size