Deep Learning - Backpropagation Algorithm to Classify MNIST

Deep Learning - Backpropagation Algorithm to Classify MNIST

LAVI
MNIST-Backpropagation-Algorithm

先備知識

  • 參數/超參數(Parameter/Hyperparameter)區別:
    • 參數值是經由學習演算法訓練所得出,例如權重和偏差值(Weights and Biases)。
    • 超參數是在學習演算法過程中,必需先設置的參數值。例如:學習率,隱藏神經元層數/個數,Mini-Batch Size等。
    • 超參數協助學習演算法找到適當或最佳參數值。
  • One-Hot Encoding:
    • 機器學習分類問題的標籤,常將類別以One-Hot Vector表示,即向量分量僅有一個維度的值是1,其餘爲0。
    • 例如三分類標籤第一、二、三類分別為:, ,
    • 將類別標籤轉換成One-Hot Vector的過程則稱One-Hot Encoding。
  • 訓練/驗證/測試準確率:
    • 反向傳播演算法停止訓練後,固定類神經網路模型的權重和偏差值,計算每一筆資料的輸出值向量,決定其分類。類別之認定,取最大輸出值分量為其類別
    • 如果訓練集有 1000 筆,其中 900 筆正確,則訓練準確率 = 900/1000 = 90%
    • 驗證/測試準確率也是相同計算方法,即正確筆數與驗證/測試集筆數的比率。
  • 停止條件通常包括:
    • 超過最大世代數時停止。
    • 當訓練集上某些錯誤度量的平均值足夠小時停止,例如平均交叉熵,均方根誤差,平均絕對誤差等。
    • 當世代數增加,雖然訓練資料集準確率上升,而未參與訓練的驗證集準確率卻下降,此時可停止訓練。其功用為檢視是否有過度訓練(Over Training)而造成過度擬合(Over Fitting)的問題 。
  • Stochastic Backpropagation 演算法

Backpropagation Algorithm by PyTorch

老師希望我們學習瞭解反向傳播演算法(Backpropagation algorithm)如何學習多層網路的權重(Weights)和偏差值(Biases),對 MNIST(Modified National Institute of Standards and Technology database)手寫數字數據集進行 9 分類

資料集為老師從 MNIST 資料集中取的部分資料,共有 60,000 筆訓練資料和 10,000 筆測試資料,每筆資料為 28x28 灰階圖像(即 784 維輸入屬性)

import 套件

1
2
3
4
5
6
7
8
import torch
from torch.utils import data as data_
import torchvision
import torchvision.transforms as transforms
import torch.nn as nn
import torch.optim as optim
import pandas as pd
import numpy as np

設定世代數量、讀檔、資料切割與處理

因為題目只有 Training Dataset,所以另外切割 20% 的資料為 Validation Dataset

在讀取資料的過程中進行標準化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

# 設定要跑的世代數量
epochs = 100

# 讀檔,透過 pandas 的 df.read_csv 來讀 csv 檔
df = pd.read_csv('mnist_train9.csv')

# df.iloc,dataframe integer location 用整數位置做為基準去讀資料,
# [:, 1:],row 全部資料都讀,cloumn 是從第 1 格開始讀到最後
# 變數型態是 float32
# .values/255 是做資料的標準化,有利於神經網路的訓練,以避免梯度消失或爆炸的問題
# 最後轉換成 torch 的 tensor 型態
train_data_x = torch.tensor(df.iloc[:, 1:].values/255, dtype=torch.float32)
# [:, 0] y 是讀 label,只要讀第 0 行就好
# pd.get_dummies 是利用 pandas 做 one-hot encoding 的方式
train_data_y = torch.tensor(pd.get_dummies(df.iloc[:, 0]).values, dtype=torch.float32)

# 從資料集中取 80% 當 training dataset、20% 當 validation dataset
train_data_amount = (len(train_data_x) // 10) * 8

# validation 取剛才取出的 dataset 中從 80% 開始到最後的所有資料
validation_data_x = train_data_x[train_data_amount:]
validation_data_y = train_data_y[train_data_amount:]

# train 取從頭開始到 80% 的資料
train_data_x = train_data_x[:train_data_amount]
train_data_y = train_data_y[:train_data_amount]

定義神經網路模型

神經元數量、層數、激活函數都可以修改,微調模型
特別對權重進行常態分佈的初始化,因為每次權重都隨機,模型在 gradient decent 的時候才會可以找到不同的局部最佳解

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 定義神經網絡模型
class NeuralNetwork(nn.Module):
def __init__(self):
# super 的用法就是將函數的調用委託給父類別,而通常那個父類別就是pytorch內建的函數 nn.Module 所以我們需要用 __init__() 來初始化(initialize)整個函數
super(NeuralNetwork, self).__init__()

# 建立 3 層神經網路,每層的輸入與輸出 channel 如下
self.fc1 = nn.Linear(784, 30) # 784 row(in) x 30 cloumn(out)
self.fc2 = nn.Linear(30, 28) # 30 row x 28 cloumn
self.fc3 = nn.Linear(28, 9) # 28 row x 9 clumn

# 每層神經網路的權重都透過常態分佈來設計,標準差為 0.1,平均為 0.0
nn.init.normal_(self.fc1.weight, mean=0.0, std=0.1)
nn.init.normal_(self.fc2.weight, mean=0.0, std=0.1)
nn.init.normal_(self.fc3.weight, mean=0.0, std=0.1)

def forward(self, x):
x = x.view(1, 784) # 吃進來的一筆資料總共 1 row 784 column
x = torch.sigmoid(self.fc1(x)) # 對第一層神經網路的 x 輸出做 sigmoid
x = torch.sigmoid(self.fc2(x)) # 對第二層神經網路的 x 輸出做 sigmoid
x = self.fc3(x)
return torch.softmax(x, dim=1) # 對第三層神經網路的 x 輸出做 softmax

設計 Early Stop 條件

我 Early Stop 的條件設為,每當 validation loss 沒有變得更好,就記錄一次,當超過我設定的 patience 次數,就提早停止訓練

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

# 設計 Early Stop 條件
class Early_Stop_checker:
def __init__(self, patience=1):
self.patience = patience # patience 為能夠接受 validation loss 減少變好的世代數
self.counter = 0 # counter 用來計算目前已經幾代 validation loss 沒有變好
self.min_validation_loss = float('inf') # 紀錄所有世代中最小的 validation loss

def early_stop(self, validation_loss):
if validation_loss < self.min_validation_loss: # 如果當前的 validation loss 比過去最好的 validation loss 要小,則更新 min_validation_loss
self.min_validation_loss = validation_loss
self.counter = 0 # 更新後 counter 歸零
elif validation_loss > self.min_validation_loss: # 如果當前的 validation loss 比過去最好的 validation loss 要不好
self.counter += 1 # counter + 1
if self.counter >= self.patience: # 如果 counter 已經超過 patience,就 early stop
return True
return False

宣告模型、Loss Function、Optimizer

使用 SGD (Stochastic Gradient Decent) 隨機梯度下降法
learning rate 設為 0.01
提早停止的條件為 validation loss 已連續三代沒有變得更好

1
2
3
4
5
6
7
8
# 初始化模型、損失函數和優化器
model = NeuralNetwork()
# 使用交叉熵損失函數
criterion = nn.CrossEntropyLoss()
# 使用隨機梯度下降演算法,learning rate 設 0.01
optimizer = optim.SGD(model.parameters(), lr = 0.01)

early_stop_checker = Early_Stop_checker(patience=3)

模型訓練

因為使用 stochastic gradient decent,所以要一筆一筆資料跑(也可以改用 batch 啦)

反向傳播三板斧,好像很難但其實沒那麼難
optimizer.zero_grad() 清除所有梯度
loss.backward() 透過反向傳播獲得每個參數的梯度值
optimizer.step() 透過梯度下降執行參數更新

每代計算一次 loss,當 validation loss 符合前面設計的 early stopping 條件,就提早停止訓練

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# 訓練模型
for epoch in range(epochs):
# 使用 Stochastic 梯度下降,要一筆一筆資料跑,batch 或 mini-batch 才是一口氣跑多筆資料
for i in range(len(train_data_x)):
# 訓練模型要設定成 model.train()
model.train()

# 清除所有梯度
optimizer.zero_grad()
# 將 train_data_x 一筆一筆放進模型訓練,將該筆資料的預測值存進 predict
# torch.unsqueeze 對 x 做擴維,原本的 torch.tensor(x) = [1, 2, 3, 4],擴維後變成 [[1, 2, 3, 4]]
predict = model(torch.unsqueeze(train_data_x[i], 0))

# 計算 loss,把預測的 predict (也就是 ŷ),和 y 去做 cross entropy 計算
# 將 y 擴維後透過 torch.argmax 返回指定維度
loss = criterion(predict, torch.unsqueeze(train_data_y[i], 0).argmax(dim=1))

# 做 Backpropagation
loss.backward() # 透過反向傳播獲得每個參數的梯度值
optimizer.step() # 透過梯度下降執行參數更新

# 計算這次世代更新完後的 loss 和 accuracy
# torch.no_grad() 顧名思義就是 no gradient
with torch.no_grad():
# 評估模型要設定成 model.eval()
model.eval()

# ----------------------------- training ----------------------------------------------- #
train_loss = 0

for i in range(len(train_data_x)):
# 一樣把 train data x 丟給模型預測,只是這次不用再計算梯度和更新
train_predict = model(torch.unsqueeze(train_data_x[i], 0))
# 加總全部的 loss (等等最後會取平均)
train_loss += criterion(train_predict, torch.unsqueeze(train_data_y[i], 0).argmax(dim=1))

# 將所有資料的 loss (剛剛加總了)取平均
train_loss = train_loss/len(train_data_x)

# ----------------------------- validation ----------------------------------------------- #
validation_loss = 0

# 驗證其實做的是一樣的事情,只是在模型訓練的時候,並沒有學過這些資料
for i in range(len(validation_data_x)):
# 把要驗證的資料都進已經訓練好的模型預測
validation_predict = model(torch.unsqueeze(validation_data_x[i], 0))
# 計算驗證 loss
validation_loss += criterion(validation_predict, torch.unsqueeze(validation_data_y[i], 0).argmax(dim=1))

# 計算驗證 loss
validation_loss = validation_loss/len(validation_data_x)

# Early Stopping,提前終止訓練,為了避免 overfitting
if early_stop_checker.early_stop(validation_loss):
print(f"Early stopping at epoch: {epoch} ")
break

# 輸出當前世代、當前的 train loss、validation loss
print("----------------------")
print(f"Epoch {epoch + 1}/{epochs}")
print(f"Train Loss: {train_loss:.4f}")
print(f"Validation Loss: {validation_loss:.4f}")
print("----------------------")

計算訓練模型的準確率

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38

train_correct = 0
validation_correct = 0

# 計算 train accuracy 和 validation accuracy
for i in range(len(train_data_x)):
train_predict = model(torch.unsqueeze(train_data_x[i], 0))
# torch.max(train_predict, 1) 回傳的是在 train_predict 中有最大值的 index,也就會是該筆資料的 label
# (train_predict 回傳的是此筆資料是 1, 2, 3, 4, 5, 6, 7, 8, 9 的機率,所以取最大機率的那格,就是預測此筆資料為那個數字)
_, predicted_label = torch.max(train_predict, 1)

# 如果預測成功了,預測成功的筆數就 + 1
if(predicted_label == torch.argmax(train_data_y[i])):
train_correct += 1

for i in range(len(validation_data_x)):
validation_predict = model(torch.unsqueeze(validation_data_x[i], 0))

# 確認驗證資料是否預測正確
_, predicted_label = torch.max(validation_predict, 1)

# 如果驗證預測正確,驗證預測正確筆數 + 1
if(predicted_label == torch.argmax(validation_data_y[i])):
validation_correct += 1

# 準確率為所有的資料中,預測成功的筆數
train_accuracy = train_correct / len(train_data_x)
validation_accuracy = validation_correct / len(validation_data_x)

# 輸出訓練結束、最後在哪個世代結束、最終的 train loss、train accuracy、validation loss、validation accuracy
print("----------------------")
print('Finished Training')
print(f"Epoch result {epoch}")
print(f"Train Loss: {train_loss:.4f}")
print(f"Train Accuracy: {train_accuracy * 100:.2f}%")
print(f"Validation Loss: {validation_loss:.4f}")
print(f"Validation Accuracy: {validation_accuracy * 100:.2f}%")
print("----------------------")

測試模型

讀取測試資料,對測試資料進行處理,用剛才訓練好的模型進行預測,將預測結果儲存起來

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# 讀 test 資料
df = pd.read_csv('mnist_test9.csv')

# 同樣,讀所有 test 資料,但因為 test 資料不會有 label,所以 [:, :] 所有 row 和 column 都讀
test_data_x = torch.tensor(df.iloc[:, :].values/255, dtype=torch.float32)

# 因為最後要把 testing 每筆資料的預測結果輸出成 csv 檔,所以要記錄結果
test_predict_result = []
classification = [1, 2, 3, 4, 5, 6, 7, 8, 9]

# testing 也是一種評估模型模式,一樣不用計算梯度
with torch.no_grad():
model.eval()
for i in range(len(test_data_x)):
# 把 test 資料都進模型預測
test_predict = model(torch.unsqueeze(test_data_x[i], 0))
# 得到該筆資料的 label
_, predicted_label = torch.max(test_predict, 1)

# 把預測結果的 label 記錄起來
test_predict_result.append(classification[predicted_label.numpy().item()])

輸出預測資料與模型

1
2
3
4
5
6
7
8
# 把 test_predict_result 轉成 pandas 的 dataframe,並且給他欄位名稱叫做 Label
test_predict_result = pd.DataFrame({'Label': test_predict_result})
# 把 test_predict_result 輸出 csv 檔
test_predict_result.to_csv('test_predict_result.csv', index=False)

# 保存訓練好的模型,路徑為跟此程式相同路徑
PATH = './mnist_nn.pth'
torch.save(model.state_dict(), PATH)

完整程式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
import torch
from torch.utils import data as data_
import torchvision
import torchvision.transforms as transforms
import torch.nn as nn
import torch.optim as optim
import pandas as pd
import numpy as np

# 設定要跑的世代數量
epochs = 100

# 讀檔,透過 pandas 的 df.read_csv 來讀 csv 檔
df = pd.read_csv('mnist_train9.csv')

# df.iloc,dataframe integer location 用整數位置做為基準去讀資料,
# [:, 1:],row 全部資料都讀,cloumn 是從第 1 格開始讀到最後
# 變數型態是 float32
# .values/255 是做資料的標準化,有利於神經網路的訓練,以避免梯度消失或爆炸的問題
# 最後轉換成 torch 的 tensor 型態
train_data_x = torch.tensor(df.iloc[:, 1:].values/255, dtype=torch.float32)
# [:, 0] y 是讀 label,只要讀第 0 行就好
# pd.get_dummies 是利用 pandas 做 one-hot encoding 的方式
train_data_y = torch.tensor(pd.get_dummies(df.iloc[:, 0]).values, dtype=torch.float32)

# 從資料集中取 80% 當 training dataset、20% 當 validation dataset
train_data_amount = (len(train_data_x) // 10) * 8

# validation 取剛才取出的 dataset 中從 80% 開始到最後的所有資料
validation_data_x = train_data_x[train_data_amount:]
validation_data_y = train_data_y[train_data_amount:]

# train 取從頭開始到 80% 的資料
train_data_x = train_data_x[:train_data_amount]
train_data_y = train_data_y[:train_data_amount]


# 定義神經網絡模型
class NeuralNetwork(nn.Module):
def __init__(self):
# super 的用法就是將函數的調用委託給父類別,而通常那個父類別就是pytorch內建的函數 nn.Module 所以我們需要用 __init__() 來初始化(initialize)整個函數
super(NeuralNetwork, self).__init__()

# 建立 3 層神經網路,每層的輸入與輸出 channel 如下
self.fc1 = nn.Linear(784, 30) # 784 row(in) x 30 cloumn(out)
self.fc2 = nn.Linear(30, 28) # 30 row x 28 cloumn
self.fc3 = nn.Linear(28, 9) # 28 row x 9 clumn

# 每層神經網路的權重都透過常態分佈來設計,標準差為 0.1,平均為 0.0
nn.init.normal_(self.fc1.weight, mean=0.0, std=0.1)
nn.init.normal_(self.fc2.weight, mean=0.0, std=0.1)
nn.init.normal_(self.fc3.weight, mean=0.0, std=0.1)

def forward(self, x):
x = x.view(1, 784) # 吃進來的一筆資料總共 1 row 784 column
x = torch.sigmoid(self.fc1(x)) # 對第一層神經網路的 x 輸出做 sigmoid
x = torch.sigmoid(self.fc2(x)) # 對第二層神經網路的 x 輸出做 sigmoid
x = self.fc3(x)
return torch.softmax(x, dim=1) # 對第三層神經網路的 x 輸出做 softmax

# 設計 Early Stop 條件
class Early_Stop_checker:
def __init__(self, patience=1):
self.patience = patience # patience 為能夠接受 validation loss 減少變好的世代數
self.counter = 0 # counter 用來計算目前已經幾代 validation loss 沒有變好
self.min_validation_loss = float('inf') # 紀錄所有世代中最小的 validation loss

def early_stop(self, validation_loss):
if validation_loss < self.min_validation_loss: # 如果當前的 validation loss 比過去最好的 validation loss 要小,則更新 min_validation_loss
self.min_validation_loss = validation_loss
self.counter = 0 # 更新後 counter 歸零
elif validation_loss > self.min_validation_loss: # 如果當前的 validation loss 比過去最好的 validation loss 要不好
self.counter += 1 # counter + 1
if self.counter >= self.patience: # 如果 counter 已經超過 patience,就 early stop
return True
return False

# 初始化模型、損失函數和優化器
model = NeuralNetwork()
# 使用交叉熵損失函數
criterion = nn.CrossEntropyLoss()
# 使用隨機梯度下降演算法,learning rate 設 0.01
optimizer = optim.SGD(model.parameters(), lr = 0.01)


early_stop_checker = Early_Stop_checker(patience=3)

# 訓練模型
for epoch in range(epochs):
# 使用 Stochastic 梯度下降,要一筆一筆資料跑,batch 或 mini-batch 才是一口氣跑多筆資料
for i in range(len(train_data_x)):
# 訓練模型要設定成 model.train()
model.train()

# 清除所有梯度
optimizer.zero_grad()
# 將 train_data_x 一筆一筆放進模型訓練,將該筆資料的預測值存進 predict
# torch.unsqueeze 對 x 做擴維,原本的 torch.tensor(x) = [1, 2, 3, 4],擴維後變成 [[1, 2, 3, 4]]
predict = model(torch.unsqueeze(train_data_x[i], 0))

# 計算 loss,把預測的 predict (也就是 ŷ),和 y 去做 cross entropy 計算
# 將 y 擴維後透過 torch.argmax 返回指定維度
loss = criterion(predict, torch.unsqueeze(train_data_y[i], 0).argmax(dim=1))

# 做 Backpropagation
loss.backward() # 透過反向傳播獲得每個參數的梯度值
optimizer.step() # 透過梯度下降執行參數更新

# 計算這次世代更新完後的 loss 和 accuracy
# torch.no_grad() 顧名思義就是 no gradient
with torch.no_grad():
# 評估模型要設定成 model.eval()
model.eval()

# ----------------------------- training ----------------------------------------------- #
train_loss = 0

for i in range(len(train_data_x)):
# 一樣把 train data x 丟給模型預測,只是這次不用再計算梯度和更新
train_predict = model(torch.unsqueeze(train_data_x[i], 0))
# 加總全部的 loss (等等最後會取平均)
train_loss += criterion(train_predict, torch.unsqueeze(train_data_y[i], 0).argmax(dim=1))

# 將所有資料的 loss (剛剛加總了)取平均
train_loss = train_loss/len(train_data_x)

# ----------------------------- validation ----------------------------------------------- #
validation_loss = 0

# 驗證其實做的是一樣的事情,只是在模型訓練的時候,並沒有學過這些資料
for i in range(len(validation_data_x)):
# 把要驗證的資料都進已經訓練好的模型預測
validation_predict = model(torch.unsqueeze(validation_data_x[i], 0))
# 計算驗證 loss
validation_loss += criterion(validation_predict, torch.unsqueeze(validation_data_y[i], 0).argmax(dim=1))

# 計算驗證 loss
validation_loss = validation_loss/len(validation_data_x)

# Early Stopping,提前終止訓練,為了避免 overfitting
if early_stop_checker.early_stop(validation_loss):
print(f"Early stopping at epoch: {epoch} ")
break

# 輸出當前世代、當前的 train loss、validation loss
print("----------------------")
print(f"Epoch {epoch + 1}/{epochs}")
print(f"Train Loss: {train_loss:.4f}")
print(f"Validation Loss: {validation_loss:.4f}")
print("----------------------")

train_correct = 0
validation_correct = 0

# 計算 train accuracy 和 validation accuracy
for i in range(len(train_data_x)):
train_predict = model(torch.unsqueeze(train_data_x[i], 0))
# torch.max(train_predict, 1) 回傳的是在 train_predict 中有最大值的 index,也就會是該筆資料的 label
# (train_predict 回傳的是此筆資料是 1, 2, 3, 4, 5, 6, 7, 8, 9 的機率,所以取最大機率的那格,就是預測此筆資料為那個數字)
_, predicted_label = torch.max(train_predict, 1)

# 如果預測成功了,預測成功的筆數就 + 1
if(predicted_label == torch.argmax(train_data_y[i])):
train_correct += 1

for i in range(len(validation_data_x)):
validation_predict = model(torch.unsqueeze(validation_data_x[i], 0))

# 確認驗證資料是否預測正確
_, predicted_label = torch.max(validation_predict, 1)

# 如果驗證預測正確,驗證預測正確筆數 + 1
if(predicted_label == torch.argmax(validation_data_y[i])):
validation_correct += 1

# 準確率為所有的資料中,預測成功的筆數
train_accuracy = train_correct / len(train_data_x)
validation_accuracy = validation_correct / len(validation_data_x)

# 輸出訓練結束、最後在哪個世代結束、最終的 train loss、train accuracy、validation loss、validation accuracy
print("----------------------")
print('Finished Training')
print(f"Epoch result {epoch}")
print(f"Train Loss: {train_loss:.4f}")
print(f"Train Accuracy: {train_accuracy * 100:.2f}%")
print(f"Validation Loss: {validation_loss:.4f}")
print(f"Validation Accuracy: {validation_accuracy * 100:.2f}%")
print("----------------------")

# ---------------------- Testing -----------------------------------------------#
# 讀 test 資料
df = pd.read_csv('mnist_test9.csv')

# 同樣,讀所有 test 資料,但因為 test 資料不會有 label,所以 [:, :] 所有 row 和 column 都讀
test_data_x = torch.tensor(df.iloc[:, :].values/255, dtype=torch.float32)

# 因為最後要把 testing 每筆資料的預測結果輸出成 csv 檔,所以要記錄結果
test_predict_result = []
classification = [1, 2, 3, 4, 5, 6, 7, 8, 9]

# testing 也是一種評估模型模式,一樣不用計算梯度
with torch.no_grad():
model.eval()
for i in range(len(test_data_x)):
# 把 test 資料都進模型預測
test_predict = model(torch.unsqueeze(test_data_x[i], 0))
# 得到該筆資料的 label
_, predicted_label = torch.max(test_predict, 1)

# 把預測結果的 label 記錄起來
test_predict_result.append(classification[predicted_label.numpy().item()])

# 把 test_predict_result 轉成 pandas 的 dataframe,並且給他欄位名稱叫做 Label
test_predict_result = pd.DataFrame({'Label': test_predict_result})
# 把 test_predict_result 輸出 csv 檔
test_predict_result.to_csv('test_predict_result.csv', index=False)

# 保存訓練好的模型,路徑為跟此程式相同路徑
PATH = './mnist_nn.pth'
torch.save(model.state_dict(), PATH)

Backpropagation Algorithm by Python

另外,我有寫過一份直接用 Python 手刻的版本
直接參考上面老師提供的 Stochastic Backpropagation 演算法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

# Sigmoid activation function
def Sigmoid(n):
return 1 / (1 + np.exp(-n))

# One-Hot Encoding
def One_Hot_Encoding(y):
return pd.get_dummies(y).to_numpy()

# Softmax: e^zi / Σ e^zj
def Softmax(aL):
# size: aL = 9 * 1
for col in range(1):
total = 0.0
for row in range(len(aL)):
total += np.exp(aL[row][col])
for row in range(len(aL)):
aL[row][col] = np.exp(aL[row][col]) / total
return aL


# Load the training data
df = pd.read_csv('mnist_train9.csv')

# row size: 40500, col size: 785
rowx, colx = df.shape

# take 80% data to training, 20% data to verify
training_data_amount = (rowx // 10) * 8

# row size: 40500, col size: 784
# avoid Sigmoid too large: [all data] / 1000
training_data_x = df.to_numpy()[ : training_data_amount, 1:] / 1000
verify_data_x = df.to_numpy()[training_data_amount : , 1:] / 1000

# row size: 9, col size: 1
training_data_y = df.to_numpy()[ : training_data_amount, 0]
verify_data_y = df.to_numpy()[training_data_amount : , 0]

# one hot encoding (y):
training_data_y = One_Hot_Encoding(training_data_y)
verify_data_y = One_Hot_Encoding(verify_data_y)


# Parameter/Hyperparameter
epoch = 40
learning_rate = 0.01

# use normal distribution random number to init w
w1 = np.random.normal(loc = 0, scale = 0.1, size = (30, 784))
w2 = np.random.normal(loc = 0, scale = 0.1, size = (28, 30))
w3 = np.random.normal(loc = 0, scale = 0.1, size = (9, 28))

# init b
b1 = np.random.rand(30, 1)
b2 = np.random.rand(28, 1)
b3 = np.random.rand(9, 1)

# output information
output_epoch = []
output_training_correct_rate = []
output_verify_correct_rate = []


# training model ===================================

epoch_result = 0
for i in range (epoch):

for j in range(len(training_data_x)):

# size: (1, 784)
x_tmp = []
x_tmp.append(training_data_x[j])
x_tmp = np.array(x_tmp)

# size: (1, 9)
y_tmp = []
y_tmp.append(training_data_y[j])
y_tmp = np.array(y_tmp)

# feedforward -----------------------------------

# size: w1 = 30 * 784, x_tmp.transpose() = 784 * 1, b1 = 30 * 1
# size: a1 = 30 * 1
n1 = w1 @ x_tmp.transpose() + b1
a1 = Sigmoid(n1)

# size: w2 = 28 * 30, a1 = 30 * 1, b2 = 28 * 1
# size: a2 = 28 * 1
n2 = w2 @ a1 + b2
a2 = Sigmoid(n2)

# size: w3 = 9 * 28, a2 = 28 * 1, b3 = 9 * 1
# size: aL = 9 * 1
n3 = w3 @ a2 + b3
aL = Softmax(n3)

# backward ------------------------------------

# size: aL = 9 * 1, y_tmp = 1 * 9
# size: delta_L = 9 * 1
delta_L = (aL - y_tmp.transpose())

# size: w3 = 9 * 26, delta_L = 9 * 1, a2 = 26 * 1
# size: delta_2 = 26 * 1
delta_2 = (w3.transpose() @ delta_L) * (a2 * (1 - a2))

# size: w2 = 26 * 20, delta_2 = 26 * 1, a1 = 20 * 1
# size: delta_1 = 20 * 1
delta_1 = (w2.transpose() @ delta_2) * (a1 * (1 - a1))

# update
# size: w3 = 9 * 26, delta_L = 9 * 1, a2 = 26 * 1
# size: b3 = 9 * 1, delta_L = 9 * 1
w3 = w3 - learning_rate * (delta_L @ a2.transpose())
b3 = b3 - learning_rate * delta_L

# size: w2 = 26 * 20, delta_2 = 26 * 1, a1 = 20 * 1
# size: b2 = 26 * 1, delta_2 = 26 * 1
w2 = w2 - learning_rate * (delta_2 @ a1.transpose())
b2 = b2 - learning_rate * delta_2

# size: w1 = 20 * 784, delta_1 = 20 * 1, x_tmp = 1 * 784
# size: b1 = 20 * 1, delta_1 = 20 * 1
w1 = w1 - learning_rate * (delta_1 @ x_tmp)
b1 = b1 - learning_rate * delta_1

# calcualte accuracy rate for not overfitting ==================

# training data
training_correct_num = 0
for j in range (len(training_data_x)):

x_tmp = []
x_tmp.append(training_data_x[j])
x_tmp = np.array(x_tmp)

y_tmp = []
y_tmp.append(training_data_y[j])
y_tmp = np.array(y_tmp)

n1 = w1 @ x_tmp.transpose() + b1
a1 = Sigmoid(n1)

n2 = w2 @ a1 + b2
a2 = Sigmoid(n2)

n3 = w3 @ a2 + b3
aL = Softmax(n3)

# caculate correct num

max_value_aL = -2
max_idx_aL = 0
for k in range (len(aL)):
if(max_value_aL < aL[k][0]):
max_value_aL = aL[k][0]
max_idx_aL = k

max_idx_y = 0
for k in range (len(y_tmp[0])):
if(y_tmp[0][k] == 1):
max_idx_y = k
break

if(max_idx_aL == max_idx_y):
training_correct_num += 1

# verify data -----------------------------
verify_correct_num = 0

for j in range (len(verify_data_x)):

x_tmp = []
x_tmp.append(verify_data_x[j])
x_tmp = np.array(x_tmp)

y_tmp = []
y_tmp.append(verify_data_y[j])
y_tmp = np.array(y_tmp)

n1 = w1 @ x_tmp.transpose() + b1
a1 = Sigmoid(n1)

n2 = w2 @ a1 + b2
a2 = Sigmoid(n2)

n3 = w3 @ a2 + b3
aL = Softmax(n3)

# caculate correct num

max_value_aL = -2
max_idx_aL = 0
for k in range (len(aL)):
if(max_value_aL < aL[k][0]):
max_value_aL = aL[k][0]
max_idx_aL = k

max_idx_y = 0
for k in range (len(y_tmp[0])):
if(y_tmp[0][k] == 1):
max_idx_y = k
break

if(max_idx_aL == max_idx_y):
verify_correct_num += 1

training_correct_rate = training_correct_num / len(training_data_x)
verify_correct_rate = verify_correct_num / len(verify_data_x)

# avoid overfitting
if(training_correct_rate > 0.96 or verify_correct_rate > 0.96):
epoch_result = i+1
break

# output information for drawing compare plot
output_epoch.append(i+1)
output_training_correct_rate.append(training_correct_rate)
output_verify_correct_rate.append(verify_correct_rate)

# print("Epoch now:", i+1)
# print("Training Data Correct rate:", training_correct_num / len(training_data_x))
# print("Verify Data Correct rate:" , verify_correct_num / (len(verify_data_x)))
# print()

"""
# draw the compare plot ===================================================

plt.plot(output_epoch, output_training_correct_rate, color='indianred')
plt.plot(output_epoch, output_verify_correct_rate, color='#7eb54e')

# str_title = "Training Correct Rate by Learning Rate " + str(learning_rate)
plt.title("Training Correct Rate vs Verify Correct Rate" )

plt.xlabel("Epoch")
plt.ylabel("Correct Rate")

plt.legend(["Training Correct Rate", "Verify Correct Rate"], loc = "lower right")
plt.grid()
plt.savefig('output9.png')
plt.show()

"""

# Predict Test9 ============================================================

df = pd.read_csv('mnist_test9.csv')

# row size: 9020, col size: 784
rowx, colx = df.shape

test_data_x = df.to_numpy()[:, :] / 1000

classification = [1, 2, 3, 4, 5, 6, 7, 8, 9]

ans_result = []
for i in range (len(test_data_x)):
x_tmp = []
x_tmp.append(test_data_x[i])
x_tmp = np.array(x_tmp)

n1 = w1 @ x_tmp.transpose() + b1
a1 = Sigmoid(n1)

n2 = w2 @ a1 + b2
a2 = Sigmoid(n2)

n3 = w3 @ a2 + b3
aL = Softmax(n3)

aL_idx_classification = np.argmax(aL)
ans_result.append(classification[aL_idx_classification])

ans_result = np.array(ans_result)
ans_result_df = pd.DataFrame(ans_result)
ans_result_df.to_csv('ans9.csv', index=False, header=False)

# output result
hidden_layer_neurons = [w1.shape[0], w2.shape[0], w3.shape[0]]
print("Final Result:")
print("Epoch:", epoch_result, ", Learning Rate:", learning_rate, ", Hidden Layer:", hidden_layer_neurons)
print("Training Data Correct Rate:", training_correct_num / len(training_data_x))
print("Validation Data Correct Rate:" , verify_correct_num / (len(verify_data_x)))

Reference

  • 黃貞瑛老師的深度學習課程
  • 許瀚丰、應名宥學長的助教輔導課程
  • 吳建中、詹閎安同學的共同討論
On this page
Deep Learning - Backpropagation Algorithm to Classify MNIST