123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186 |
- from torch import device, cuda
- import torch
- from torch import add
- import torch.nn as nn
- import utils.CNN_Layers as CustomLayers
- import torch.nn.functional as F
- import torch.optim as optim
- import pandas as pd
- import matplotlib.pyplot as plt
- import time
- import numpy as np
- from sklearn.metrics import roc_curve, auc
- class CNN_Net(nn.Module):
- def __init__(self, prps, final_layer_size=5):
- super(CNN_Net, self).__init__()
- self.final_layer_size = final_layer_size
- self.device = device('cuda:0' if cuda.is_available() else 'cpu')
- print("CNN Initialized. Using: " + str(self.device))
- # LAYERS
- print(f"CNN Model Initialization")
- self.conv1 = CustomLayers.Conv_elu_maxpool_drop(1, 192, (11, 13, 11), stride=(4,4,4), pool=True, prps=prps)
- self.conv2 = CustomLayers.Conv_elu_maxpool_drop(192, 384, (5, 6, 5), stride=(1,1,1), pool=True, prps=prps)
- self.conv3_mid_flow = CustomLayers.Mid_flow(384, 384, prps=prps)
- self.conv4_sepConv = CustomLayers.Conv_elu_maxpool_drop(384, 96,(3, 4, 3), stride=(1,1,1), pool=True, prps=prps,
- sep_conv=True)
- self.conv5_sepConv = CustomLayers.Conv_elu_maxpool_drop(96, 48, (3, 4, 3), stride=(1, 1, 1), pool=True,
- prps=prps, sep_conv=True)
- self.fc1 = CustomLayers.Fc_elu_drop(113568, 20, prps=prps, softmax=False) # TODO, concatenate clinical data after this
- self.fc2 = CustomLayers.Fc_elu_drop(20, final_layer_size, prps=prps, softmax=True) # For now this works as output layer, though may be incorrect
- # FORWARDS
- def forward(self, x):
- x = self.conv1(x)
- x = self.conv2(x)
- x = self.conv3_mid_flow(x)
- x = self.conv4_sepConv(x)
- x = self.conv5_sepConv(x)
- # FLATTEN x
- flatten_size = x.size(1) * x.size(2) * x.size(3) * x.size(4)
- x = x.view(-1, flatten_size)
- x = self.fc1(x)
- x = self.fc2(x)
- return x
- # TRAIN
- def train_model(self, trainloader, testloader, PATH, epochs):
- self.train()
- criterion = nn.CrossEntropyLoss(reduction='mean')
- optimizer = optim.Adam(self.parameters(), lr=1e-5)
- losses = pd.DataFrame(columns=['Epoch', 'Avg_loss', 'Time'])
- start_time = time.time() # seconds
- for epoch in range(epochs): # loop over the dataset multiple times
- epoch += 1
- # Estimate & count training time
- t = time.strftime("%H:%M:%S", time.gmtime(time.time() - start_time))
- t_remain = time.strftime("%H:%M:%S", time.gmtime((time.time() - start_time)/epoch * epochs))
- print(f"{epoch/epochs * 100} || {epoch}/{epochs} || Time: {t}/{t_remain}")
- running_loss = 0.0
- # Batches & training
- for i, data in enumerate(trainloader, 0):
- # get the inputs; data is a list of [inputs, labels]
- inputs, labels = data[0].to(self.device), data[1].to(self.device)
- # zero the parameter gradients
- optimizer.zero_grad()
- # forward + backward + optimize
- outputs = self.forward(inputs)
- loss = criterion(outputs, labels) # This loss is the mean of losses for the batch
- loss.backward()
- optimizer.step()
- # adds average batch loss to running loss
- running_loss += loss.item()
- # mini-batches for progress
- if(i%10==0 and i!=0):
- print(f"{i}/{len(trainloader)}, temp. loss:{running_loss / len(trainloader)}")
- # average loss
- avg_loss = running_loss / len(trainloader) # Running_loss / number of batches
- print(f"Avg. loss: {avg_loss}")
- # loss on validation
- val_loss = self.evaluate_model(testloader, roc=False)
- losses = losses.append({'Epoch':int(epoch), 'Avg_loss':avg_loss, 'Val_loss':val_loss, 'Time':time.time() - start_time}, ignore_index=True)
- print('Finished Training')
- losses.to_csv('./cnn_net_data.csv')
- # MAKES EPOCH VS AVG LOSS GRAPH
- plt.plot(losses['Epoch'], losses['Avg_loss'], label="Loss on Training")
- plt.xlabel('Epoch')
- plt.ylabel('Average Loss')
- plt.title('Loss vs Epoch On Training & Validation data')
- # MAKES EPOCH VS VALIDATION LOSS GRAPH
- plt.plot(losses['Epoch'], losses['Val_loss'], label="Loss on Validation")
- plt.savefig('./avgloss_epoch_curve.png')
- plt.show()
- torch.save(self.state_dict(), PATH)
- print("Model saved")
- # TEST
- def evaluate_model(self, testloader, roc):
- correct = 0
- total = 0
- predictions = []
- true_labels = []
- criterion = nn.CrossEntropyLoss(reduction='mean')
- self.eval()
- # since we're not training, we don't need to calculate the gradients for our outputs
- with torch.no_grad():
- for data in testloader:
- images, labels = data[0].to(self.device), data[1].to(self.device)
- # calculate outputs by running images through the network
- outputs = self.forward(images)
- # the class with the highest energy is what we choose as prediction
- loss = criterion(outputs, labels) # mean loss from batch
- # Gets accuracy
- _, predicted = torch.max(outputs.data, 1)
- total += labels.size(0)
- correct += (predicted == labels).sum().item()
- # Saves predictions and labels for ROC
- if(roc):
- predictions.extend(outputs.data[:,1].cpu().numpy()) # Grabs probability of positive
- true_labels.extend(labels.cpu().numpy())
- print(f'Accuracy of the network on {total} scans: {100 * correct // total}%')
- if(not roc): print(f'Validation loss: {loss.item()}')
- else:
- # ROC
- thresholds = np.linspace(0, 1, num=50)
- # Calculate TPR and FPR
- fpr, tpr, thresholds = roc_curve(true_labels, predictions)
- # Calculate AUC
- roc_auc = auc(fpr, tpr)
- plt.plot(fpr, tpr, color='darkorange', lw=2, label=f'ROC curve (AUC: {roc_auc})')
- plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
- plt.xlim([0.0, 1.005])
- plt.ylim([0.0, 1.005])
- plt.xlabel('False Positive Rate (1 - Specificity)')
- plt.ylabel('True Positive Rate (Sensitivity)')
- plt.title('Receiver Operating Characteristic (ROC) Curve')
- plt.legend(loc="lower right")
- plt.savefig('./ROC.png')
- plt.show()
- self.train()
- return(loss.item())
- # PREDICT
- def predict(self, loader):
- self.eval()
- with torch.no_grad():
- for data in loader:
- images, labels = data[0].to(self.device), data[1].to(self.device)
- outputs = self.forward(images)
- # the class with the highest energy is what we choose as prediction
- _, predicted = torch.max(outputs.data, 1)
- self.train()
- return predicted
|