from torch import device, cuda import torch from torch import add import torch.nn as nn import utils.CNN_Layers as CustomLayers import torch.nn.functional as F import torch.optim as optim import pandas as pd import matplotlib.pyplot as plt import time import numpy as np from sklearn.metrics import roc_curve, auc, confusion_matrix, classification_report import seaborn as sns class CNN_Net(nn.Module): def __init__(self, prps, final_layer_size=5): super(CNN_Net, self).__init__() self.final_layer_size = final_layer_size self.device = device('cuda:0' if cuda.is_available() else 'cpu') print("CNN Initialized. Using: " + str(self.device)) # LAYERS print(f"CNN Model Initialization") self.conv1 = CustomLayers.Conv_elu_maxpool_drop(1, 192, (11, 13, 11), stride=(4,4,4), pool=True, prps=prps) self.conv2 = CustomLayers.Conv_elu_maxpool_drop(192, 384, (5, 6, 5), stride=(1,1,1), pool=True, prps=prps) self.conv3_mid_flow = CustomLayers.Mid_flow(384, 384, prps=prps) self.conv4_sepConv = CustomLayers.Conv_elu_maxpool_drop(384, 96,(3, 4, 3), stride=(1,1,1), pool=True, prps=prps, sep_conv=True) self.conv5_sepConv = CustomLayers.Conv_elu_maxpool_drop(96, 48, (3, 4, 3), stride=(1, 1, 1), pool=True, prps=prps, sep_conv=True) self.fc1 = CustomLayers.Fc_elu_drop(113568, 20, prps=prps, softmax=False) # TODO, concatenate clinical data after this self.fc2 = CustomLayers.Fc_elu_drop(20, final_layer_size, prps=prps, softmax=True) # For now this works as output layer, though may be incorrect # FORWARDS def forward(self, x): x = self.conv1(x) x = self.conv2(x) x = self.conv3_mid_flow(x) x = self.conv4_sepConv(x) x = self.conv5_sepConv(x) # FLATTEN x flatten_size = x.size(1) * x.size(2) * x.size(3) * x.size(4) x = x.view(-1, flatten_size) x = self.fc1(x) x = self.fc2(x) return x # TRAIN def train_model(self, trainloader, testloader, PATH, epochs): self.train() criterion = nn.CrossEntropyLoss(reduction='mean') optimizer = optim.Adam(self.parameters(), lr=1e-5) losses = pd.DataFrame(columns=['Epoch', 'Avg_loss', 'Time']) start_time = time.time() # seconds for epoch in range(epochs): # loop over the dataset multiple times epoch += 1 # Estimate & count training time t = time.strftime("%H:%M:%S", time.gmtime(time.time() - start_time)) t_remain = time.strftime("%H:%M:%S", time.gmtime((time.time() - start_time)/epoch * epochs)) print(f"{epoch/epochs * 100} || {epoch}/{epochs} || Time: {t}/{t_remain}") running_loss = 0.0 # Batches & training for i, data in enumerate(trainloader, 0): # get the inputs; data is a list of [inputs, labels] inputs, labels = data[0].to(self.device), data[1].to(self.device) # zero the parameter gradients optimizer.zero_grad() # forward + backward + optimize outputs = self.forward(inputs) loss = criterion(outputs, labels) # This loss is the mean of losses for the batch loss.backward() optimizer.step() # adds average batch loss to running loss running_loss += loss.item() # mini-batches for progress if(i%10==0 and i!=0): print(f"{i}/{len(trainloader)}, temp. loss:{running_loss / len(trainloader)}") # average loss avg_loss = running_loss / len(trainloader) # Running_loss / number of batches print(f"Avg. loss: {avg_loss}") # loss on validation val_loss = self.evaluate_model(testloader, roc=False) losses = losses.append({'Epoch':int(epoch), 'Avg_loss':avg_loss, 'Val_loss':val_loss, 'Time':time.time() - start_time}, ignore_index=True) print('Finished Training') losses.to_csv('./cnn_net_data.csv') # MAKES EPOCH VS AVG LOSS GRAPH plt.plot(losses['Epoch'], losses['Avg_loss'], label="Loss on Training") plt.xlabel('Epoch') plt.ylabel('Average Loss') plt.title('Loss vs Epoch On Training & Validation data') # MAKES EPOCH VS VALIDATION LOSS GRAPH plt.plot(losses['Epoch'], losses['Val_loss'], label="Loss on Validation") plt.savefig('./avgloss_epoch_curve.png') plt.show() torch.save(self.state_dict(), PATH) print("Model saved") # TEST def evaluate_model(self, testloader, roc): correct = 0 total = 0 predictionsLabels = [] predictionsProbabilities = [] true_labels = [] criterion = nn.CrossEntropyLoss(reduction='mean') self.eval() # since we're not training, we don't need to calculate the gradients for our outputs with torch.no_grad(): for data in testloader: images, labels = data[0].to(self.device), data[1].to(self.device) # calculate outputs by running images through the network outputs = self.forward(images) # the class with the highest energy is what we choose as prediction loss = criterion(outputs, labels) # mean loss from batch # Gets accuracy _, predicted = torch.max(outputs.data, 1) total += labels.size(0) correct += (predicted == labels).sum().item() # Saves predictionsProbabilities and labels for ROC if(roc): predictionsLabels.extend(predicted.cpu().numpy()) predictionsProbabilities.extend(outputs.data[:, 1].cpu().numpy()) # Grabs probability of positive true_labels.extend(labels.cpu().numpy()) print(f'Accuracy of the network on {total} scans: {100 * correct // total}%') if(not roc): print(f'Validation loss: {loss.item()}') else: # ROC # Calculate TPR and FPR fpr, tpr, thresholds = roc_curve(true_labels, predictionsProbabilities) # Calculate AUC roc_auc = auc(fpr, tpr) plt.plot(fpr, tpr, color='darkorange', lw=2, label=f'ROC curve (AUC: {roc_auc})') plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--') plt.xlim([0.0, 1.005]) plt.ylim([0.0, 1.005]) plt.xlabel('False Positive Rate (1 - Specificity)') plt.ylabel('True Positive Rate (Sensitivity)') plt.title('Receiver Operating Characteristic (ROC) Curve') plt.legend(loc="lower right") plt.savefig('./ROC.png') plt.show() # Calculate confusion matrix cm = confusion_matrix(true_labels, predictionsLabels) # Plot confusion matrix plt.figure(figsize=(8, 6)) sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', cbar=False) plt.xlabel('Predicted labels') plt.ylabel('True labels') plt.title('Confusion Matrix') plt.savefig('./confusion_matrix.png') plt.show() # Classification Report report = classification_report(true_labels, predictionsLabels) print(report) self.train() return(loss.item()) # PREDICT def predict(self, loader): self.eval() with torch.no_grad(): for data in loader: images, labels = data[0].to(self.device), data[1].to(self.device) outputs = self.forward(images) # the class with the highest energy is what we choose as prediction _, predicted = torch.max(outputs.data, 1) self.train() return predicted