from torch import device, cuda import torch from torch import add import torch.nn as nn import utils.CNN_Layers as CustomLayers import torch.nn.functional as F import torch.optim as optim import utils.CNN_methods as CNN import pandas as pd import matplotlib.pyplot as plt import time import numpy as np # from sklearn.metrics import roc_curve, auc class CNN_Net(nn.Module): def __init__(self, input, prps, final_layer_size=5): super(CNN_Net, self).__init__() self.final_layer_size = final_layer_size self.device = device('cuda:0' if cuda.is_available() else 'cpu') print("CNN Initialized. Using: " + str(self.device)) # GETS FIRST IMAGE FOR SIZE data_iter = iter(input) first_batch = next(data_iter) first_features = first_batch[0] image = first_features[0] # LAYERS print(f"CNN Model Initialization. Input size: {image.size()}") self.conv1 = CustomLayers.Conv_elu_maxpool_drop(1, 192, (11, 13, 11), stride=(4,4,4), pool=True, prps=prps) self.conv2 = CustomLayers.Conv_elu_maxpool_drop(192, 384, (5, 6, 5), stride=(1,1,1), pool=True, prps=prps) self.conv3_mid_flow = CustomLayers.Mid_flow(384, 384, prps=prps) self.conv4_sepConv = CustomLayers.Conv_elu_maxpool_drop(384, 96,(3, 4, 3), stride=(1,1,1), pool=True, prps=prps, sep_conv=True) self.conv5_sepConv = CustomLayers.Conv_elu_maxpool_drop(96, 48, (3, 4, 3), stride=(1, 1, 1), pool=True, prps=prps, sep_conv=True) self.fc1 = CustomLayers.Fc_elu_drop(113568, 20, prps=prps, softmax=False) # TODO, concatenate clinical data after this self.fc2 = CustomLayers.Fc_elu_drop(20, final_layer_size, prps=prps, softmax=True) # For now this works as output layer, though may be incorrect # FORWARDS def forward(self, x): x = self.conv1(x) x = self.conv2(x) x = self.conv3_mid_flow(x) x = self.conv4_sepConv(x) x = self.conv5_sepConv(x) # FLATTEN x flatten_size = x.size(1) * x.size(2) * x.size(3) * x.size(4) x = x.view(-1, flatten_size) x = self.fc1(x) x = self.fc2(x) return x # TRAIN def train_model(self, trainloader, testloader, PATH, epochs): self.train() criterion = nn.CrossEntropyLoss(reduction='mean') optimizer = optim.Adam(self.parameters(), lr=1e-5) losses = pd.DataFrame(columns=['Epoch', 'Avg_loss', 'Time']) start_time = time.time() # seconds for epoch in range(epochs): # loop over the dataset multiple times epoch += 1 # Estimate & count training time t = time.strftime("%H:%M:%S", time.gmtime(time.time() - start_time)) t_remain = time.strftime("%H:%M:%S", time.gmtime((time.time() - start_time)/epoch * epochs)) print(f"{epoch/epochs * 100} || {epoch}/{epochs} || Time: {t}/{t_remain}") running_loss = 0.0 # Batches & training for i, data in enumerate(trainloader, 0): # get the inputs; data is a list of [inputs, labels] inputs, labels = data[0].to(self.device), data[1].to(self.device) # zero the parameter gradients optimizer.zero_grad() # forward + backward + optimize outputs = self.forward(inputs) loss = criterion(outputs, labels) # This loss is the mean of losses for the batch loss.backward() optimizer.step() # adds average batch loss to running loss running_loss += loss.item() # mini-batches for progress if(i%10==0 and i!=0): print(f"{i}/{len(trainloader)}, temp. loss:{running_loss / len(trainloader)}") # average loss avg_loss = running_loss / len(trainloader) # Running_loss / number of batches print(f"Avg. loss: {avg_loss}") # loss on validation val_loss = self.evaluate_model(testloader, roc=False) losses = losses.append({'Epoch':int(epoch), 'Avg_loss':avg_loss, 'Val_loss':val_loss, 'Time':time.time() - start_time}, ignore_index=True) print('Finished Training') losses.to_csv('./cnn_net_data.csv') # MAKES EPOCH VS AVG LOSS GRAPH plt.plot(losses['Epoch'], losses['Avg_loss']) plt.xlabel('Epoch') plt.ylabel('Average Loss') plt.title('Average Loss vs Epoch On Training') plt.savefig('./avgloss_epoch_curve.png') plt.show() # MAKES EPOCH VS VALIDATION LOSS GRAPH plt.plot(losses['Epoch'], losses['Val_loss']) plt.xlabel('Epoch') plt.ylabel('Validation Loss') plt.title('Validation Loss vs Epoch On Training') plt.savefig('./valloss_epoch_curve.png') plt.show() torch.save(self.state_dict(), PATH) print("Model saved") # TEST def evaluate_model(self, testloader, roc): correct = 0 total = 0 predictions = [] true_labels = [] criterion = nn.CrossEntropyLoss(reduction='mean') self.eval() # since we're not training, we don't need to calculate the gradients for our outputs with torch.no_grad(): for data in testloader: images, labels = data[0].to(self.device), data[1].to(self.device) # calculate outputs by running images through the network outputs = self.forward(images) # the class with the highest energy is what we choose as prediction loss = criterion(outputs, labels) # mean loss from batch # Gets accuracy _, predicted = torch.max(outputs.data, 1) total += labels.size(0) correct += (predicted == labels).sum().item() # Saves predictions and labels for ROC if(roc): predictions.extend(outputs.data[:,1].cpu().numpy()) # Grabs probability of positive true_labels.extend(labels.cpu().numpy()) print(f'Accuracy of the network on {total} scans: {100 * correct // total}%') if(not roc): print(f'Validation loss: {loss.item()}') else: # ROC thresholds = np.linspace(0, 1, num=50) tpr = [] fpr = [] acc = [] true_labels = np.array(true_labels) for threshold in thresholds: # Thresholding the predictions (meaning all predictions above threshold are considered positive) thresholded_predictions = (predictions >= threshold).astype(int) # Calculating true positives, false positives, true negatives, false negatives true_positives = np.sum((thresholded_predictions == 1) & (true_labels == 1)) false_positives = np.sum((thresholded_predictions == 1) & (true_labels == 0)) true_negatives = np.sum((thresholded_predictions == 0) & (true_labels == 0)) false_negatives = np.sum((thresholded_predictions == 0) & (true_labels == 1)) accuracy = (true_positives + true_negatives) / (true_positives + false_positives + true_negatives + false_negatives) # Calculate TPR and FPR tpr.append(true_positives / (true_positives + false_negatives)) fpr.append(false_positives / (false_positives + true_negatives)) acc.append(accuracy) plt.plot(fpr, tpr, color='darkorange', lw=2, label='ROC curve') plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--') plt.xlim([0.0, 1.0]) plt.ylim([0.0, 1.0]) plt.xlabel('False Positive Rate (1 - Specificity)') plt.ylabel('True Positive Rate (Sensitivity)') plt.title('Receiver Operating Characteristic (ROC) Curve') plt.legend(loc="lower right") plt.savefig('./ROC.png') plt.show() plt.plot(thresholds, acc) plt.xlabel('Thresholds') plt.ylabel('Accuracy') plt.title('Accuracy vs thresholds') plt.savefig('./acc.png') plt.show() # ROC ATTEMPT 2 # fprRoc, tprRoc = roc_curve(true_labels, predictions) # plt.plot(fprRoc, tprRoc) self.train() return(loss.item()) # PREDICT def predict(self, loader): self.eval() with torch.no_grad(): for data in loader: images, labels = data[0].to(self.device), data[1].to(self.device) outputs = self.forward(images) # the class with the highest energy is what we choose as prediction _, predicted = torch.max(outputs.data, 1) self.train() return predicted