3.3. k-nnを例に学ぶ:初めての機械学習クラス実装#

k-NNクラスを一から実装するのは少し大変です。このノートでは、いきなりk-NNをクラスとして実装することが難しいなぁ…と思っている人のために、一歩一歩k-NNの実装を行っていきます。

さて、k-NNを行うためには、

  1. 初期化: どんな条件のk-NNを行うのかを決める。

  2. 訓練: データとラベルを保存する

  3. 予測: 保存されたデータとラベルを使って未知データのクラスを予測する

の3ステップが必要でした。(これは、scikit-learnの機械学習モデルクラスの実装に倣った設計になっています。)

そこで、ここでは初期化、訓練、予測の3ステップに実装を分けて考えていきます。

# このノートで使うパッケージをimportしておきます。
import numpy as np
import pandas as pd 
import plotly.express as px 

from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
print("--- パッケージバージョンの確認 ---")
print("NumPy:",np.__version__,)
print("Pandas:",pd.__version__)
import plotly; print("Plotly:",plotly.__version__)
import sklearn; print("Scikit-Learn:", sklearn.__version__)
--- パッケージバージョンの確認 ---
NumPy: 1.23.3
Pandas: 1.5.0
Plotly: 5.13.1
Scikit-Learn: 1.1.2
# デモで使うデータセットを読み込んでおきます。
iris_dataset = load_iris()

# 教師データとテストデータに分割します。
X_train, X_test, y_train, y_test = train_test_split(
    iris_dataset.data, iris_dataset.target, # 分割したい配列をここに列挙します。今回はデータとラベルです。
    test_size=0.3, # データ全体に対するテストデータの割合を指定します。
    #stratify=iris_dataset.target, # クラス毎に偏りが出ないような分割をします。
    )
# データを標準化しておきます。(このセルを実行しなくても構いません)
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)

3.3.1. 初期化の実装#

このノートの最終目的はk-NNをクラスとして実装することです。ですがまずはクラスのことを忘れて訓練と予測をそれぞれ関数として実装します。
関数の中ではできるだけ、「関数の引数」と「関数の中で定義した変数」のみが利用可能であることを意識して下さい。(グローバル変数を関数内から参照することは極力避けてください)

さて、まずはk-NNの初期化ステップです。ここでは、機械学習の訓練をする前に、どのような設定でこの機械学習モデルを構築するのかを定義します。

関数として必要な機能を実装していくので、クラスは必要ないのですが、あくまでも「辞書型っぽいなにか」として利用するために、クラスを用意しておきます。

class knnに必要な情報をまとめたデータ構造:
    def __init__(self):
        ...

さて、k-NNを利用するときに、最低限確認したい情報は

  • kを何にするのか

だけです。
つまりこの下で用意する「knnに必要な情報をまとめたデータ構造」は、メンバ変数としてk(近傍の何個の点を見るのか」のみを持っていることになりそうです。 これを上のクラスに追加します。

class knnに必要な情報をまとめたデータ構造:
    def __init__(self, k:int):
        self.k = k

こうして作った「knnに必要な情報をまとめたデータ構造」というクラスは、今はただkの値を持つだけのデータ型です。
クラスは設計書とも言われるようなもので、この設計書をもとに実際にものを作ることで実態を得ます。この「設計書(クラス)を元に実際に作られたもの」のことをインスタンス(instance)と呼びます。

では実際に、コードセルで上記のコードを実行してみます。ついでにインスタンスを生成しておきましょう。

class knnに必要な情報をまとめたデータ構造:
    def __init__(self, k:int):
        self.k = k

data_container = knnに必要な情報をまとめたデータ構造(1) # ここではkを適当に1とします。
print(data_container)
print(data_container.k)
<__main__.knnに必要な情報をまとめたデータ構造 object at 0x140b419f0>
1

data_containerはkの値を持っているので、data_container.kと書くことでkの値にアクセスできます。また、data_container.hoge = huga とすることで、そのインスタンスに新しいフィールドと値を追加することもできます。

これ以降に実装する関数では、このdata_containerを引数として渡すことにします。

3.3.2. 訓練関数の実装#

さて次に、データコンテナと訓練データと訓練ラベルを受け取って、訓練を行う関数を用意します。
ただし、knnが訓練ステップで行うのは、訓練というよりはデータペアをすべて保存するだけの「暗記」です。通常の関数は状態を持てないので、データコンテナとして渡されたdata_containerに新しいフィールドと値を追加することにします。

# 「データコンテナ」という引数には、必ず「knnに必要な情報をまとめたデータ構造」クラスの
# インスタンス(つまりdata_container)を渡して下さい。
def 訓練(データコンテナ, 教師データ, 教師ラベル):
    データコンテナ.教師データ = 教師データ
    データコンテナ.教師ラベル = 教師ラベル
    return データコンテナ 

ここで定義した関数は、引数として受け取ったデータをデータコンテナ(第一引数)の新しいメンバ変数として追加して、更新した第一引数のオブジェクトをそのまま返すだけの仕事をします。

data_container = 訓練(data_container, X_train, y_train)

# 訓練を行ったあとのdata_containerの中身を確認してみましょう。
print(data_container.k)
print(data_container.教師ラベル)
1
[2 1 0 0 2 1 0 2 2 1 1 2 1 0 1 0 2 0 2 2 1 1 0 0 1 0 0 2 1 1 1 0 0 1 0 2 1
 2 0 0 0 2 1 2 1 2 0 2 1 1 0 0 1 0 1 1 2 1 0 1 2 1 2 2 1 2 2 1 0 1 0 2 0 0
 0 1 1 0 0 2 1 2 0 1 0 1 0 2 0 2 1 1 0 2 2 1 2 2 2 0 2 2 2 0 1]

3.3.3. 予測関数の実装#

k-nnでは未知データと教師データすべてのとの距離を計算して、距離の近い順番にソートします。その後に教師データのラベルがそれぞれ何個有るのかを調べて、最も多いラベルを未知データのラベルとして採用するのでした。

3.3.3.1. numpyの便利な関数の紹介#

実装を見る前に、コード中で出てくる難しそうな関数を紹介しておきます。

3.3.3.1.1. np.argsort#

背の順にデータをソートした後に、ソートした要素が入った配列を返すのがnp.sortです。これに対して、ソートした後にそれぞれの要素の元の配列での順番が入った配列を返すのがnp.argsortです。

# np.argsortとは
身長 = [180,150,140,160]
print("np.sort:",np.sort(身長))
print("np.argsort:", np.argsort(身長))
np.sort: [140 150 160 180]
np.argsort: [2 1 3 0]

上の説明でわからなかったという人は、下のコードを読んでイメージを掴んで下さい。

def argsort(arr):
    """argsort
    実際にこのように実装されているわけではありませんが、上の説明でわからなかった場合はこのコードを読んでみてください。
    """
    # 背の順に並び替え
    sorted = np.sort(arr)
    
    # 元の配列arrでのindex(前から何番目の要素か)を取得
    argsorted_list = []
    for val in sorted:
        index = arr.index(val)
        argsorted_list.append(index)
    
    argsorted = np.array(argsorted_list)
    return argsorted

np.argsortは自分で実装するとちょっと面倒くさい処理なので、NumPyの関数を使うことをお勧めします。

3.3.3.1.2. np.bincount#

配列内にある数字がそれぞれ何個あったのかを数えてくれる関数(最大値と同じ要素数のベクトルが返ってくる)

成績 = [4,2,1,3,3,3,5]
np.bincount(成績)
array([0, 1, 1, 3, 1, 1])

3.3.3.2. 距離を計算する関数#

ではまず、距離を計算する関数を実装しましょう。

def 距離関数(データ点, 教師データ全部):
    距離 = ((データ点 - 教師データ全部) ** 2).sum(axis=1)
    return 距離

3.3.4. 与えられたデータが属するクラスを予測する関数#

距離関数を使って、与えられたデータがどのクラスに属するのかを予測する関数を作ります。

def 予測(データコンテナ, クラスを予測したいデータ):
    予測したクラスラベル = [] # リストとして初期化
    
    for (ループの回数, x) in enumerate(クラスを予測したいデータ):
        
        # データ点xと教師データすべてとの距離を計算し、distance_vectorに格納する。
        # distance_vectorの要素数は教師データの数と同じになっているはず。
        distance_vector = 距離関数(x, データコンテナ.教師データ)
        
        # これを小さい順にソートして、データの番号を変数に保存しておく
        sorted_indexes = np.argsort(distance_vector)
        
        # 先頭からk個だけ取り出して、あとは捨てる
        ご近所さん = sorted_indexes[:データコンテナ.k]
        
        # k個のご近所さんの教師ラベルを変数に保存しておく
        ご近所さんのラベル = データコンテナ.教師ラベル[ご近所さん]
        
        # ご近所さんの中で一番多いラベルを見つける
        近所で一番人気のラベル = np.bincount(ご近所さんのラベル).argmax()
        
        # xのご近所で一番多いのがこのクラスなら、きっとxもこのクラスなんだろうな…
        予測したクラスラベル.append(近所で一番人気のラベル)
        
        # あとはこれを「クラスを予測したいデータ」すべてに対して行えばすべての予測ができる。
        
    return np.array(予測したクラスラベル) # 返す時はnumpyの配列としておく(おそらくy_trainもそうだったでしょ?)

if __name__ == "__main__":
    pred_labels = 予測(data_container, X_test)
    # 正答率
    正答率 = (pred_labels == y_test).sum() / len(X_test)
    print(正答率)
0.9111111111111111

これでk-NNが完成です。

3.3.5. リファクタリング#

例えば以下のように実行した場合、上で作ったプログラムはエラーを起こします。

data_container = knnに必要な情報をまとめたデータ構造(1) # ここではkを適当に1とします。
# data_container = 訓練(data_container, X_train, y_train)
pred_labels = 予測(data_container, X_test)
---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
Cell In[12], line 3
      1 data_container = knnに必要な情報をまとめたデータ構造(1) # ここではkを適当に1とします。
      2 # data_container = 訓練(data_container, X_train, y_train)
----> 3 pred_labels = 予測(data_container, X_test)

Cell In[11], line 8, in 予測(データコンテナ, クラスを予測したいデータ)
      2 予測したクラスラベル = [] # リストとして初期化
      4 for (ループの回数, x) in enumerate(クラスを予測したいデータ):
      5     
      6     # データ点xと教師データすべてとの距離を計算し、distance_vectorに格納する。
      7     # distance_vectorの要素数は教師データの数と同じになっているはず。
----> 8     distance_vector = 距離関数(x, データコンテナ.教師データ)
     10     # これを小さい順にソートして、データの番号を変数に保存しておく
     11     sorted_indexes = np.argsort(distance_vector)

AttributeError: 'knnに必要な情報をまとめたデータ構�' object has no attribute '教師データ'

訓練をしていないのに予測を走らせるとエラーが出るようです。そのために訓練済みかどうかを判別できるフラグもデータコンテナのフィールドとして与えておけば良さそうです。
ここまでで実装したコード⇓

## ここまでで実装したコード:
class knnに必要な情報をまとめたデータ構造:
    def __init__(self, k:int):
        self.k = k
        
def 訓練(データコンテナ, 教師データ, 教師ラベル):
    データコンテナ.教師データ = 教師データ
    データコンテナ.教師ラベル = 教師ラベル
    return データコンテナ 

def 距離を計算する関数(データ点, 教師データ全部):
    距離 = ((データ点 - 教師データ全部) ** 2).sum(axis=1)
    return 距離

def 予測(データコンテナ, クラスを予測したいデータ):
    予測したクラスラベル = []
    
    for (ループの回数, x) in enumerate(クラスを予測したいデータ):
        distance_vector = 距離を計算する関数(x, データコンテナ.教師データ)
        sorted_indexes = np.argsort(distance_vector)
        ご近所さん = sorted_indexes[:データコンテナ.k]
        ご近所さんのラベル = データコンテナ.教師ラベル[ご近所さん]
        近所で一番人気のラベル = np.bincount(ご近所さんのラベル).argmax()
        予測したクラスラベル.append(近所で一番人気のラベル)
    return np.array(予測したクラスラベル)

訓練済みフラグを追加して、予測関数で訓練済みかを確認するように変更したコード⇓

# 訓練済みフラグを追加して、予測関数で訓練済みかを確認するように変更したコード:
class knnに必要な情報をまとめたデータ構造:
    def __init__(self, k:int):
        self.k = k
        self.is_fitted = False 
        
def 訓練(データコンテナ, 教師データ, 教師ラベル):
    データコンテナ.教師データ = 教師データ
    データコンテナ.教師ラベル = 教師ラベル
    データコンテナ.is_fitted = True
    return データコンテナ 

def 距離を計算する関数(データ点, 教師データ全部):
    距離 = ((データ点 - 教師データ全部) ** 2).sum(axis=1)
    return 距離

def 予測(データコンテナ, クラスを予測したいデータ):
    assert データコンテナ.is_fitted, "先に訓練してから予測して下さい"
    予測したクラスラベル = []
    
    for (ループの回数, x) in enumerate(クラスを予測したいデータ):
        distance_vector = 距離を計算する関数(x, データコンテナ.教師データ)
        sorted_indexes = np.argsort(distance_vector)
        ご近所さん = sorted_indexes[:データコンテナ.k]
        ご近所さんのラベル = データコンテナ.教師ラベル[ご近所さん]
        近所で一番人気のラベル = np.bincount(ご近所さんのラベル).argmax()
        予測したクラスラベル.append(近所で一番人気のラベル)
    return np.array(予測したクラスラベル)
# 訓練しないで予測しようとした場合

data_container = knnに必要な情報をまとめたデータ構造(3)
pred_labels = 予測(data_container, X_test)
---------------------------------------------------------------------------
AssertionError                            Traceback (most recent call last)
Cell In [15], line 4
      1 # 訓練しないで予測しようとした場合
      3 data_container = knnに必要な情報をまとめたデータ構造(3)
----> 4 pred_labels = 予測(data_container, X_test)

Cell In [14], line 18, in 予測(データコンテナ, クラスを予測したいデータ)
     17 def 予測(データコンテナ, クラスを予測したいデータ):
---> 18     assert データコンテナ.is_fitted, "先に訓練してから予測して下さい"
     19     予測したクラスラベル = []
     21     for (ループの回数, x) in enumerate(クラスを予測したいデータ):

AssertionError: 先に訓練してから予測して下さい
# 訓練後に予測をしようとした場合

data_container = knnに必要な情報をまとめたデータ構造(3)
data_container = 訓練(data_container, X_train,y_train)
pred_labels = 予測(data_container, X_test)
print((pred_labels == y_test).sum() / len(X_test)) # 正答率
0.9555555555555556

これ以外にも、「教師データとテストデータの特徴数が異なっている」のようなパッと想像ができるエラーについては、わかりやすいエラーメッセージと一緒にエラーを起こすことは大切です。assertを使う以外にもエラーの起こし方はあるので、調べてみてください。

また、このように安全なコードを書くためには、アルゴリズムの本質とは関係のないコードを書く必要がたくさん出てきます。本質的ではないコードを書きたくない場合は、有名なOSSのリポジトリを除いてみてください。sklearn.base.BaseEstimatorのような「機械学習モデルの実装に便利な親クラス」が公開されていることがあります。

3.3.5.1. 変数名などを英語に直していく#

さて、このコードでも十分動くのですが、変数名や関数名は英語で書くのが普通です。とりあえず、いくつか英語に直してみます。 また、この際に「必要情報」としていた部分を、全てselfに置き換えておきます。

… 教師データとそのラベルをX_train, y_train. (training dataなのでtrain)。テストデータとそのラベルをX_test, y_testとするのでした。これについてもここで直しておきます。

class knnに必要な情報をまとめたデータ構造:
    def __init__(self, k:int):
        self.k = k
        self.is_fitted = False 
        
def fit(self, X_train, y_train):
    self._X = X_train
    self._y = y_train
    self.is_fitted = True
    return self 

def compute_distance(a, b):
    distance = ((a-b) ** 2).sum(axis=1)
    return distance

def predict(self, X_test):
    assert self.is_fitted, "先に訓練してから予測して下さい"
    pred_labels = []
    
    for (loop_counter, x) in enumerate(X_test):
        distance_vector = compute_distance(x, self._X)
        sorted_indexes = np.argsort(distance_vector)
        neighbors = sorted_indexes[:self.k]
        neighbors_label = self._y[neighbors]
        popular_label = np.bincount(neighbors_label).argmax()
        pred_labels.append(popular_label)
    return np.array(pred_labels)


if __name__ == "__main__":
    data_container = knnに必要な情報をまとめたデータ構造(3)
    data_container = fit(data_container, X_train,y_train)
    pred_labels = predict(data_container, X_test)

    # 正答率
    (pred_labels == y_test).sum() / len(X_test)

よく見るプログラムらしくなってきましたね。

3.3.5.2. 関数からクラスへ#

さて、ここで、fit, predictは「 knnに必要な情報をまとめたデータ構造」のインスタンスを第一引数に取る関数でした。それ以外のデータ構造を渡しても、おそらくエラーが出てしまいそうです。
このような「あるデータ構造専用の関数」のことをメソッドと呼び、classの中で定義する事ができます。

class KNearestNeighborsClassifier:
    def __init__(self, k:int):
        self.k = k
        self.is_fitted = False 
        
    def fit(self, X_train, y_train):
        self._X = X_train
        self._y = y_train
        self.is_fitted = True
        return self 

    def predict(self, X_test):
        assert self.is_fitted, "先に訓練してから予測して下さい"
        pred_labels = []

        for (loop_counter, x) in enumerate(X_test):
            distance_vector = self.compute_distance(x, self._X)
            sorted_indexes = np.argsort(distance_vector)
            neighbors = sorted_indexes[:self.k]
            neighbors_label = self._y[neighbors]
            popular_label = np.bincount(neighbors_label).argmax()
            pred_labels.append(popular_label)
        return np.array(pred_labels)
    
    def compute_distance(self, a, b):
        distance = ((a - b) ** 2).sum(axis=1)
        return distance

if __name__ == "__main__":
    model = KNearestNeighborsClassifier(3)
    model.fit(X_train,y_train)
    pred_labels = model.predict(X_test)

    # 正答率
    acc = (pred_labels == y_test).sum() / len(X_test)
    print(acc)
0.9555555555555556

この書き方をすることで、インスタンス.メソッド(self以外の引数)の形でメソッドの実行が可能です。また、それぞれのメソッドは、別のメソッドからself.メソッド名で呼び出すことができます。fitメソッドからpredictを呼び出す時は self.predict とすればいいのです。selfはインスタンス自体を示しているので、これはインスタンス.メソッドの形になっています。

逆に、通常のメソッドの中で、クラス名.メソッドとするとエラーになります。(ただクラス名を書くだけだとインスタンスになっていませんよね。)

model = KNearestNeighborsClassifier(3)
model.fit(X_train,y_train)
pred_labels = model.predict(X_test)

# 正答率
(pred_labels == y_test).sum() / len(X_test)
0.9555555555555556

これで基本的なクラスの実装は完了しました。

3.3.6. 型アノテーション#

Pythonは動的型付け言語といって、プログラムを書くときに明示的に値の型を書かなくても実行時に型を判別してくれる言語です。そのおかげでC言語のようにint charのような型を指定せずにプログラムを書くことができています。

しかしながら、ある程度長いプログラムを書くときや複数人で開発をするときには、型が明示されていた方がわかりやすいと感じる人が多いようです。そこでPythonにも変数の型を指定する型アノテーションが導入されました。

# 最初に宣言だけして後から代入
a: int
a = 1

# 型の宣言と代入を同時に行う
b: int = 2

これをtype hintと呼ぶこともあります。これが特に用いられるのは関数を定義する際です。

先ほどの例にあったcompute_distanceメソッドでは、aとbが何型なのか、どんな型の値が返ってくるのかがわかりませんでした。

    def compute_distance(self, a, b):
        distance = ((a - b) ** 2).sum(axis=1)
        return distance

これにtype hintをつけることで、以下のようになります。

    def compute_distance(self, a:np.ndarray[np.float64], b:np.ndarray[np.float64])->np.ndarray[np.float64]:
        distance = ((a - b) ** 2).sum(axis=1)
        return distance

これで最低限、aとbがNumPyの配列で、返り値として同じ型のオブジェクトを返すことがわかるようになりました。

本来、このtype hintは期待していない値が入力されてもエラーなどは出してくれません。しかし、mypyを使ったデバッグやIDEの補完機能などでは利用されます。綺麗なコードを書きたい場合は極力type hintをつけることを忘れないようにするべきでしょう。

3.3.7. Docstrings#

ここでは詳しく説明をしませんが、Pythonにはクラスや関数の説明文を書くためのdocstirngという機能があります。関数の場合では、関数名の定義をした次の行に文字列を書いておけば、これがdocstringとして扱われます。多くの場合は改行が必要になるので、「”””」で囲んで関数の説明を書くことになります。docstringの書き方にはNumPyスタイルやGoogleスタイルなどの有名な書式があります。外部公開しないコードであれば、とりあえず関数の説明を書いておくだけでOKです。

def add(a:float, b:float=0)->float:
    """aとbの和を求める

    Args:
        a (float): 足す数
        b (float, optional): 足される数. Defaults to 0.

    Returns:
        float: aとbの和
    """
    return a+b

docstringをしっかりと書いておくことには明確な利点があります。

これを自動で収集してAPIリファレンスを作ってくれるツールや、マウスオーバーでdocstringを表示してくれるIDEやテキストエディタの機能などで役立ちます。

3.3.8. 演習問題#

3.3.8.1. ★☆☆☆☆ Accuracyメソッドの実装#

実装したKNearestNeighborsClassifierクラスに、新しいメソッドaccuracyを追加してください。このメソッドは予測したラベルの配列と正しいラベルの配列の二つを引数に取り、返り値として正答率を返します。

3.3.8.2. ★★☆☆☆ 距離関数の差し替え#

実装したKNearestNeighborsClassifierクラスのcompute_distanceメソッドでは、ユークリッド距離の代わりとして二乗和を使っています。NearestNeighborsClassifierクラスを修正し、クラスの初期化の際に距離関数を切り替えられるようにしてください。また、以下の例のように実行できる必要があります。

# 二乗和を使う場合
KNearestNeighborsClassifier(3)

# l1(マンハッタン)距離を使う場合
KNearestNeighborsClassifier(3, distance_function="l1")

# l2(ユークリッド)距離を使う場合
KNearestNeighborsClassifier(3, distance_function="l2")

# cos距離を使う場合
from scipy.spatial.distance import cosine
cosine_distance = lambda a,B: np.hstack([cosine(a,b) for b in B])

KNearestNeighborsClassifier(3, distance_function=cosine_distance)

# 定義されていない関数名が与えられた場合
KNearestNeighborsClassifier(3, distance_function="存在しない関数名")
# この場合はNotImplementedErrorをraiseしてください。