10. RF (Random Forest; ランダムフォレスト)#

import numpy as np 
import pandas as pd
import scipy
from sklearn.tree import DecisionTreeClassifier, DecisionTreeRegressor
from sklearn.datasets import load_wine
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from copy import copy
from sklearn.ensemble import RandomForestClassifier
  from palmerpenguins import load_penguins
  !pip install palmerpenguins
  from palmerpenguins import load_penguins

SEED = 2023_02_15

10.1. データの準備#

print("-------Original DataFrame-------------------")
df = load_penguins()

print("-------Preprocessed DataFrame-------------------")
df = df.dropna()
labelencoder = LabelEncoder()
df.island = labelencoder.fit_transform(df.island)
df.sex = labelencoder.fit_transform(df.sex)
df.species = labelencoder.fit_transform(df.species)

X = df.drop("species", axis=1).to_numpy()
y = df["species"].to_numpy()
-------Original DataFrame-------------------
(344, 8)
species island bill_length_mm bill_depth_mm flipper_length_mm body_mass_g sex year
0 Adelie Torgersen 39.1 18.7 181.0 3750.0 male 2007
1 Adelie Torgersen 39.5 17.4 186.0 3800.0 female 2007
2 Adelie Torgersen 40.3 18.0 195.0 3250.0 female 2007
3 Adelie Torgersen NaN NaN NaN NaN NaN 2007
4 Adelie Torgersen 36.7 19.3 193.0 3450.0 female 2007
-------Preprocessed DataFrame-------------------
(333, 8)
species island bill_length_mm bill_depth_mm flipper_length_mm body_mass_g sex year
0 0 2 39.1 18.7 181.0 3750.0 1 2007
1 0 2 39.5 17.4 186.0 3800.0 0 2007
2 0 2 40.3 18.0 195.0 3250.0 0 2007
4 0 2 36.7 19.3 193.0 3450.0 0 2007
5 0 2 39.3 20.6 190.0 3650.0 1 2007

10.2. scikit-learnを使った決定木の実験#

dtc = DecisionTreeClassifier(max_depth=None,random_state=SEED)

10.3. scikit-learnを使ったRandom Forestの実験#

rfc = RandomForestClassifier(
    n_estimators=100, # 弱学習器を何個作るか
    criterion="gini", # 損失関数をginiやentropyから指定
    max_depth=None, # 弱学習器として使った決定木の深さ上限
    max_features="sqrt", # ブートストラップサンプルの特徴数。sqrtはsqrt(n_features)
    n_jobs=-1, # 何個並列で計算するか。-1は使えるコアを全て使う。
    random_state=SEED, # 擬似乱数のSEED

RandomForestClassifier(n_jobs=-1, random_state=20230215)
array([0, 2, 0, 0, 0, 2, 0, 2, 1, 2, 2, 0, 0, 2, 0, 2, 2, 0, 1, 2, 1, 2,
       0, 1, 0, 2, 0, 1, 2, 0, 0, 2, 2, 1, 2, 2, 0, 2, 2, 0, 2, 0, 0, 2,
       2, 2, 1, 2, 0, 0, 0, 0, 2, 1, 0, 0, 1, 0, 2, 0, 2, 2, 2, 0, 0, 0,
       2, 2, 1, 0, 2, 0, 0, 2, 0, 2, 0, 2, 0, 0, 2, 0, 1, 0, 0, 2, 0, 1,
       0, 2, 2, 2, 1, 2, 0, 0, 0, 2, 2, 0, 0, 0, 2, 2, 0, 2, 2, 2, 0, 1,
       0, 1, 0, 1, 2, 1, 2, 2, 2, 0, 2, 0, 1, 1, 0, 1, 2, 2, 2, 1, 0, 0,
       2, 1, 0, 0, 1, 1, 2, 2, 1, 0, 0, 2, 0, 1, 1, 0, 0, 2, 0, 2, 0, 1,
       1, 0, 2, 2, 2, 1, 2, 2, 1, 2, 1, 1, 0])

10.4. RandomForestClassifierのシンプルな実装例#

NumPyを使ってRandom Forest Classifierを実装しましょう。ただし、sklearnのDecisionTreeClassifierを使います。

import numpy as np 
import scipy
from sklearn.tree import DecisionTreeClassifier
from copy import copy

# 最頻値を求める
def mode(Mat: np.ndarray, axis: int = None) -> np.ndarray:
    return scipy.stats.mode(Mat, axis=axis, keepdims=False).mode

def get_bootstrap_sample_indices(rng:np.random._generator.Generator, X:np.ndarray, bootstrap_sample_size:int)->np.ndarray:

        rng (np.random._generator.Generator): 擬似乱数生成器
        X (np.ndarray): 二次元配列
        bootstrap_sample_size (int): サンプルサイズ

        np.ndarray: サンプルのindexを持った一次元配列
    return rng.integers(low=0, high=X.shape[0],size=bootstrap_sample_size,)

class MyRandomForestClassifier:
    def __init__(self,
                 bootstrap_sample_size: int,
                 max_features: int = None,
                 n_estimators: int = 100,
                 rng: np.random._generator.Generator = np.random.default_rng(
        self.n_estimators = n_estimators
        self.bootstrap_sample_size = bootstrap_sample_size
        self.max_features = max_features
        self.rng = rng
        self.estimator_params = estimator_params
        self.estimators_ = []
        self.selected_features_ = []
        self.is_fitted = False

    def fit(self, X, y):
        # ブートストラップサンプルを作成
        for _x, _y in self.get_bootstrap_sample(X, y):
            # 弱識別器の訓練を行う
            _estimator = DecisionTreeClassifier(**self.estimator_params,
                                         random_state=self.rng.integers(0, 2**20),)
            _estimator.fit(_x, _y)
            # 学習済み弱識別器をリストに保存
        self.is_fitted = True
        return self

    def get_bootstrap_sample(self, X: np.ndarray, y: np.ndarray):
        if self.is_fitted:
            print("warning! 2回目以降のfitです。bootstrap sampleの作り方が初期化されます。")

        for _ in range(self.n_estimators):
            _sample_data_indices = get_bootstrap_sample_indices(self.rng,X,self.bootstrap_sample_size)
            # ランダムに特徴を選択する
            _feature_indices = np.arange(X.shape[1])
            if self.max_features is not None:
                _feature_indices = _feature_indices[:self.max_features]

            # ブートストラップサンプルを切り出す
            X_sample = X[_sample_data_indices][:, _feature_indices]
            y_sample = y[_sample_data_indices]
            yield X_sample, y_sample

    def predict(self, X):
        assert self.is_fitted, "このメソッドは訓練後に利用してください。"
        _pred_labels = []

        for _index in range(len(self.estimators_)):
            # _index番目の弱識別器を使ってXのラベルを推論する
            _estimator = self.estimators_[_index]
            _feature_indices = self.selected_features_[_index]
            _pred_labels.append(_estimator.predict(X[:, _feature_indices]))
        _pred_labels = np.vstack(_pred_labels)

        # 多数決で予測値を決定する(_pred_labelsの各列の最頻値を返す)
        pred_labels = mode(_pred_labels, axis=0)
        return pred_labels

    def score(self, X, y):
        assert self.is_fitted, "このメソッドは訓練後に利用してください。"
        _pred_labels = self.predict(X,)
        return (_pred_labels == y).sum()/y.size
rf = MyRandomForestClassifier(
    max_features = int(X_train.shape[1]*0.8),
    n_estimators = 100,
    rng = np.random.default_rng(SEED),


10.5. RandomForestRegressorのシンプルな実装例#

NumPyを使ってRandom Forest Regressorを実装しましょう。ただし、sklearnのDecisionTreeRegressorを使います。

class MyRandomForestRegressor:
    def __init__(self,
                 bootstrap_sample_size: int,
                 max_features: int = None,
                 n_estimators: int = 100,
                 rng: np.random._generator.Generator = np.random.default_rng(
        self.n_estimators = n_estimators
        self.bootstrap_sample_size = bootstrap_sample_size
        self.max_features = max_features
        self.rng = rng
        self.estimator_params = estimator_params
        self.estimators_ = []
        self.selected_features_ = []
        self.is_fitted = False

    def fit(self, X, y):
        # ブートストラップサンプルを作成
        for _x, _y in self.get_bootstrap_sample(X, y):
            # 弱識別器の訓練を行う
            _estimator = DecisionTreeRegressor(**self.estimator_params,
                                         random_state=self.rng.integers(0, 2**20),)
            _estimator.fit(_x, _y)
            # 学習済み弱識別器をリストに保存
        self.is_fitted = True
        return self

    def get_bootstrap_sample(self, X: np.ndarray, y: np.ndarray):
        if self.is_fitted:
            print("warning! 2回目以降のfitです。bootstrap sampleの作り方が初期化されます。")

        for _ in range(self.n_estimators):
            _sample_data_indices = get_bootstrap_sample_indices(self.rng,X,self.bootstrap_sample_size)
            # ランダムに特徴を選択する
            _feature_indices = np.arange(X.shape[1])
            if self.max_features is not None:
                _feature_indices = _feature_indices[:self.max_features]

            # ブートストラップサンプルを切り出す
            X_sample = X[_sample_data_indices][:, _feature_indices]
            y_sample = y[_sample_data_indices]
            yield X_sample, y_sample

    def predict(self, X):
        assert self.is_fitted, "このメソッドは訓練後に利用してください。"
        _pred_labels = []

        for _index in range(len(self.estimators_)):
            # _index番目の弱識別器を使ってXのラベルを推論する
            _estimator = self.estimators_[_index]
            _feature_indices = self.selected_features_[_index]
            _pred_labels.append(_estimator.predict(X[:, _feature_indices]))
        _pred_labels = np.vstack(_pred_labels)

        # 平均で予測値を決定する(_pred_labelsの各列の平均を返す)
        pred_labels = np.mean(_pred_labels, axis=0)
        return pred_labels

    def score(self, X, y):
        assert self.is_fitted, "このメソッドは訓練後に利用してください。"
        _pred_labels = self.predict(X,)
        return (_pred_labels == y).sum()/y.size