import os import unittest from __main__ import vtk, qt, ctk, slicer from slicer.ScriptedLoadableModule import * #import slicerNetwork import json import zipfile import pathlib import sys import urllib3 import string # # labkeySlicerPythonExtension # class labkeyBrowser(ScriptedLoadableModule): """Uses ScriptedLoadableModule base class, available at: https://github.com/Slicer/Slicer/blob/master/Base/Python/slicer/ScriptedLoadableModule.py """ def __init__(self, parent): ScriptedLoadableModule.__init__(self, parent) self.parent.title = "labkeyBrowser" # TODO make this more human readable by adding spaces self.parent.categories = ["LabKey"] self.parent.dependencies = [] self.parent.contributors = ["Andrej Studen (UL/FMF)"] # replace with "Firstname Lastname (Organization)" self.parent.helpText = """ Interface to files in LabKey """ self.parent.acknowledgementText = """ Developed within the medical physics research programme of the Slovenian research agency. """ # replace with organization, grant and thanks. # # labkeySlicerPythonExtensionWidget # class labkeyBrowserWidget(ScriptedLoadableModuleWidget): """Uses ScriptedLoadableModuleWidget base class, available at: https://github.com/Slicer/Slicer/blob/master/Base/Python/slicer/ScriptedLoadableModule.py """ def setup(self): ScriptedLoadableModuleWidget.setup(self) # Instantiate and connect widgets ... self.logic=labkeyBrowserLogic(self) fhome=os.path.expanduser('~') fsetup=os.path.join(fhome,'.labkey','slicer','setup.json') try: with open(fsetup) as f: self.setup=json.load(f) except FileNotFoundError: self.setup={} try: pt=self.setup['paths'] except KeyError: self.setup['paths']={} try: sys.path.append(self.setup['paths']['labkeyInterface']) except KeyError: self.setup['paths']['labkeyInterface']=loadLibrary('labkeyInterface') with open(fsetup,'w') as f: json.dump(self.setup,f,indent='\t') import labkeyInterface import labkeyDatabaseBrowser import labkeyFileBrowser #self.network=slicerNetwork.labkeyURIHandler() self.network=labkeyInterface.labkeyInterface() fconfig=os.path.join(fhome,'.labkey','network.json') self.network.init(fconfig) self.db=labkeyDatabaseBrowser.labkeyDB(self.network) self.fb=labkeyFileBrowser.labkeyFileBrowser(self.network) # # Parameters Area # connectionCollapsibleButton = ctk.ctkCollapsibleButton() connectionCollapsibleButton.text = "Connection" self.layout.addWidget(connectionCollapsibleButton) connectionFormLayout = qt.QFormLayout(connectionCollapsibleButton) self.configDir=os.path.join(os.path.expanduser('~'),".labkey") self.serverURL=qt.QLineEdit("https://merlin.fmf.uni-lj.si") #self.serverURL.textChanged.connect(self.updateServerURL) connectionFormLayout.addRow("Server: ", self.serverURL) #copy initial setting #self.updateServerURL(self.serverURL.text); self.startDir=os.path.join(os.path.expanduser('~'),"temp","crt") self.userCertButton=qt.QPushButton("Load") self.userCertButton.toolTip="Load user certificate (crt)" self.userCertButton.connect('clicked(bool)',self.onUserCertButtonClicked) connectionFormLayout.addRow("User certificate:",self.userCertButton) self.privateKeyButton=qt.QPushButton("Load") self.privateKeyButton.toolTip="Load private key" self.privateKeyButton.connect('clicked(bool)',\ self.onPrivateKeyButtonClicked) connectionFormLayout.addRow("Private key:",self.privateKeyButton) self.caCertButton=qt.QPushButton("Load") self.caCertButton.toolTip="Load CA certificate (crt)" self.caCertButton.connect('clicked(bool)',self.onCaCertButtonClicked) connectionFormLayout.addRow("CA certificate:",self.caCertButton) self.loadConfigButton=qt.QPushButton("Load configuration") self.loadConfigButton.toolTip="Load configuration" self.loadConfigButton.connect('clicked(bool)',\ self.onLoadConfigButtonClicked) connectionFormLayout.addRow("Configuration:",self.loadConfigButton) self.loadZipButton=qt.QPushButton("Load certificates (.zip)") self.loadZipButton.toolTip="Load certificates from zip file" self.loadZipButton.connect('clicked(bool)',\ self.onLoadZipButtonClicked) connectionFormLayout.addRow("Load certificates (merlin):",\ self.loadZipButton) self.saveConfigButton=qt.QPushButton("Save configuration") self.saveConfigButton.toolTip="Save configuration" self.saveConfigButton.clicked.connect(self.onSaveConfigButtonClicked) connectionFormLayout.addRow("Configuration:",self.saveConfigButton) self.resetConfigButton=qt.QPushButton("Reset configuration") self.resetConfigButton.toolTip="Reset configuration" self.resetConfigButton.clicked.connect(self.onResetConfigButtonClicked) connectionFormLayout.addRow("Configuration:",self.resetConfigButton) self.initButton=qt.QPushButton("Init") self.initButton.toolTip="Initialize connection to the server" self.initButton.connect('clicked(bool)',self.onInitButtonClicked) connectionFormLayout.addRow("Connection:",self.initButton) self.authName=qt.QLineEdit("email") #self.authName.textChanged.connect(self.updateAuthName) connectionFormLayout.addRow("Labkey username: ", self.authName) self.authPass=qt.QLineEdit("") self.authPass.setEchoMode(qt.QLineEdit.Password) #self.authPass.textChanged.connect(self.updateAuthPass) connectionFormLayout.addRow("Labkey password: ", self.authPass) fileDialogCollapsibleButton = ctk.ctkCollapsibleButton() fileDialogCollapsibleButton.text = "Remote files" self.layout.addWidget(fileDialogCollapsibleButton) # Layout within the dummy collapsible button fileDialogFormLayout = qt.QFormLayout(fileDialogCollapsibleButton) #add item list for each found file/directory self.fileList=qt.QListWidget() self.fileList.toolTip="Select remote file" self.fileList.itemDoubleClicked.connect(self.onFileListDoubleClicked) #remote dir is a list self.currentRemoteDir=[] #add dummy entry items=('.','..') self.populateFileList(items) fileDialogFormLayout.addWidget(self.fileList) #add selected file display self.selectedFile=qt.QLineEdit("") self.selectedFile.toolTip="Selected file" fileDialogFormLayout.addRow("Selected file :",self.selectedFile) #add possible file Content self.fileTypeSelector=qt.QComboBox() self.fileTypeSelector.toolTip="Select file type" items=('VolumeFile','SegmentationFile','TransformFile') self.populateFileTypeSelector(items) fileDialogFormLayout.addRow("File type :",self.fileTypeSelector) self.keepCachedFileCheckBox=qt.QCheckBox("keep cached file") self.keepCachedFileCheckBox.toolTip="Toggle local storage of labkey files" self.keepCachedFileCheckBox.setChecked(True) fileDialogFormLayout.addRow("Manage cache :",self.keepCachedFileCheckBox) loadFileButton=qt.QPushButton("Load file") loadFileButton.toolTip="Load file" loadFileButton.clicked.connect(self.onLoadFileButtonClicked) fileDialogFormLayout.addRow("Action :",loadFileButton) loadDirButton=qt.QPushButton("Load directory") loadDirButton.toolTip="Load directory" loadDirButton.clicked.connect(self.onLoadDirButtonClicked) fileDialogFormLayout.addRow("Action :",loadDirButton) def populateFileList(self,items): self.fileList.clear() for it_text in items: item=qt.QListWidgetItem(self.fileList) item.setText(it_text) item.setIcon(qt.QIcon(it_text)) def populateFileTypeSelector(self,items): for item in items: self.fileTypeSelector.addItem(item) def cleanup(self): pass def onSelect(self): self.applyButton.enabled = self.inputSelector.currentNode() and self.outputSelector.currentNode() def onApplyButton(self): #logic = labkeySlicerPythonExtensionLogic() #enableScreenshotsFlag = self.enableScreenshotsFlagCheckBox.checked #screenshotScaleFactor = int(self.screenshotScaleFactorSliderWidget.value) print("Run the algorithm") #logic.run(self.inputSelector.currentNode(), self.outputSelector.currentNode(), enableScreenshotsFlag,screenshotScaleFactor) def onUserCertButtonClicked(self): filename=qt.QFileDialog.getOpenFileName(None,'Open user certificate', self.startDir, '*.crt') #pwd=qt.QInputDialog.getText(None,'Certificate password', # 'Enter certificate password',qt.QLineEdit.Password) if not(filename) : print("No file selected") return f=qt.QFile(filename) if not (f.open(qt.QIODevice.ReadOnly)) : print("Could not open file") return certList=qt.QSslCertificate.fromPath(filename) if len(certList) < 1: print ("Troubles parsing {0}".format(filename)) return self.logic.cert=qt.QSslCertificate(f) print("cert.isNull()={0}".format(self.logic.cert.isNull())) self.userCertButton.setText(filename) self.authName.setText(self.logic.cert.subjectInfo("emailAddress")) def onPrivateKeyButtonClicked(self): filename=qt.QFileDialog.getOpenFileName(None,'Open private key', self.startDir, '*.key') if not (filename) : print ("No file selected") return f=qt.QFile(filename) if not (f.open(qt.QIODevice.ReadOnly)) : print ("Could not open file") return self.pwd=qt.QInputDialog.getText(None,'Private key password', 'Enter key password',qt.QLineEdit.Password) self.logic.key=qt.QSslKey(f,qt.QSsl.Rsa,qt.QSsl.Pem,qt.QSsl.PrivateKey, str(self.pwd)) self.privateKeyButton.setText(filename) def onCaCertButtonClicked(self): filename=qt.QFileDialog.getOpenFileName(None,'Open authority certificate', self.startDir, '*.crt') if not(filename) : print("No file selected") return f=qt.QFile(filename) if not (f.open(qt.QIODevice.ReadOnly)) : print("Could not open file") return certList=qt.QSslCertificate.fromPath(filename) if len(certList) < 1: print("Troubles parsing {0}".format(filename)) return self.logic.caCert=qt.QSslCertificate(f)#certList[0] self.caCertButton.setText(filename) def onLoadConfigButtonClicked(self): filename=qt.QFileDialog.getOpenFileName(None,'Open configuration file (JSON)', self.configDir, '*.json') self.network.init(filename) cfg=self.network.connectionConfig if 'SSL' in cfg: sslSetup=cfg['SSL'] self.privateKeyButton.setText(sslSetup['key']) self.userCertButton.setText(sslSetup['user']) self.caCertButton.setText(sslSetup['ca']) self.serverURL.setText(cfg['host']) if 'labkey' in cfg: labkey=cfg['labkey'] self.authName.setText(labkey['user']) self.authPass.setText(labkey['password']) #self.serverURL.setText(self.network.hostname) #self.authName.setText(self.network.auth_name) #self.authPass.setText(self.network.auth_pass) self.loadConfigButton.setText(os.path.basename(filename)) def onLoadZipButtonClicked(self): filename=qt.QFileDialog.getOpenFileName(\ None,'Open certificate file (zip)', self.configDir, '*.zip') zpath=pathlib.Path(filename) user=zpath.stem userDir=os.path.join(os.path.expanduser('~'),'.labkey',user) if not os.path.isdir(userDir): os.mkdir(userDir) caName=None with zipfile.ZipFile(filename,'r') as zipObj: zipObj.extract(user+'.crt',userDir) zipObj.extract(user+'.key',userDir) zipList=zipObj.namelist() for f in zipList: if f.find('CA')>-1: caName=f zipObj.extract(f,userDir) self.serverURL.setText('https://merlin.fmf.uni-lj.si') self.privateKeyButton.setText(os.path.join(userDir,user+'.key')) self.userCertButton.setText(os.path.join(userDir,user+'.crt')) self.pwd='notUsed' self.caCertButton.setText(os.path.join(userDir,caName)) self.authName.setText("guest") self.authPass.setText("guest") def onSaveConfigButtonClicked(self): connectionConfig=self.generateConfig() fhome=os.path.expanduser('~') labkeyDir=os.path.join(fhome,".labkey") if not os.path.isdir(labkeyDir): os.mkdir(labkeyDir) fconfig=os.path.join(labkeyDir,'network.json') with open(fconfig,'w') as f: json.dump(connectionConfig,f,indent=3) print("Done") def generateConfig(self): connectionConfig={} #setup SSL if self.privateKeyButton.text=="Load" or\ self.userCertButton.text=="Load" or\ self.caCertButton.text=="Load": #no ssl pass else: sslSetup={} sslSetup['user']=self.userCertButton.text sslSetup['key']=self.privateKeyButton.text sslSetup['ca']=self.caCertButton.text try: sslSetup['keyPwd']=self.pwd except AttributeError: sslSetup['keyPwd']='notUsed' connectionConfig['SSL']=sslSetup connectionConfig["host"]=self.serverURL.text connectionConfig["context"]="labkey" labkeySetup={} labkeySetup['user']=self.authName.text labkeySetup['password']=self.authPass.text connectionConfig['labkey']=labkeySetup orthancSetup={} if self.serverURL.text.find('merlin')>-1: orthancSetup['server']='https://orthanc.fmf.uni-lj.si' if self.serverURL.text.find('onko-nix')>-1: orthancSetup['server']='http://onko-nix.onko-i.si:8042' orthancSetup['user']='ask' orthancSetup['password']='askPassword' connectionConfig['orthanc']=orthancSetup return connectionConfig def onResetConfigButtonClicked(self): self.serverURL.setText('URL') self.privateKeyButton.setText("Load") self.userCertButton.setText("Load") self.caCertButton.setText("Load") self.authName.setText("labkey username") self.authPass.setText("password") def onInitButtonClicked(self): self.network.connectionConfig=self.generateConfig() print(self.network.connectionConfig['labkey']['user']) self.network.initRemote() remoteId=self.network.getUserId() if remoteId==None: self.initButton.setStyleSheet("background-color: red") else: if remoteId['email']==self.network.connectionConfig['labkey']['user']: self.initButton.setStyleSheet("background-color: green") else: self.initButton.setStyleSheet("background-color: orange") #def updateAuthName(self,txt): #self.network.auth_name=txt #print("Setting username to {0}".format(self.network.auth_name)) #def updateAuthPass(self,txt): #self.network.auth_pass=txt #print("Setting password.") #def updateServerURL(self,txt): #self.network.hostname=txt #print("Setting hostname to {}".format(self.network.hostname)) def onFileListDoubleClicked(self,item): if item == None: print("Selected items: None") return iText=item.text() print("Selected items: {0} ".format(iText)) #this is hard -> compose path string from currentRemoteDir and selection if item.text().find('..')==0: #one up try: self.currentRemoteDir.pop() except IndexError: pass elif item.text().find('.')==0: pass else: self.currentRemoteDir.append(item.text()) print("Listing {}".format(self.currentRemoteDir)) remoteDirString=self.fb.GetRootUrl() if len(self.currentRemoteDir): remoteDirString+='/'+'/'.join(self.currentRemoteDir) ok,lst=self.fb.listRemoteDir(remoteDirString) flist=[self.fb.baseDir(f) for f in lst] print("Got") print(flist) flist.insert(0,'..') flist.insert(0,'.') self.populateFileList(flist) self.selectedFile.setText(self.currentRemoteDir) def onLoadFileButtonClicked(self): properties={} #local path tempDir=os.path.join(os.path.expanduser('~'),'temp') if not os.path.isdir(tempDir): os.mkdir(tempDir) localPath=os.path.join(tempDir,self.currentRemoteDir[-1]) print(localPath) #remote path remotePath=self.fb.GetRootUrl()+'/'+'/'.join(self.currentRemoteDir) print(remotePath) #copy over self.fb.readFileToFile(remotePath,localPath) #do slicer magic slicer.util.loadNodeFromFile(localPath,self.fileTypeSelector.currentText, properties,returnNode=False) #if not self.keepCachedFileCheckBox.isChecked(): os.remove(localPath) def onLoadDirButtonClicked(self): print('Not implemented') # labkeySlicerPythonExtensionLogic # class labkeyBrowserLogic(ScriptedLoadableModuleLogic): """This class should implement all the actual computation done by your module. The interface should be such that other python code can import this class and make use of the functionality without requiring an instance of the Widget. Uses ScriptedLoadableModuleLogic base class, available at: https://github.com/Slicer/Slicer/blob/master/Base/Python/slicer/ScriptedLoadableModule.py """ def __init__(self,parent): ScriptedLoadableModuleLogic.__init__(self, parent) self.qnam=qt.QNetworkAccessManager() def hasImageData(self,volumeNode): """This is a dummy logic method that returns true if the passed in volume node has valid image data """ if not volumeNode: print('no volume node') return False if volumeNode.GetImageData() == None: print('no image data') return False return True def takeScreenshot(self,name,description,type=-1): # show the message even if not taking a screen shot self.delayDisplay(description) if self.enableScreenshots == 0: return lm = slicer.app.layoutManager() # switch on the type to get the requested window widget = 0 if type == slicer.qMRMLScreenShotDialog.FullLayout: # full layout widget = lm.viewport() elif type == slicer.qMRMLScreenShotDialog.ThreeD: # just the 3D window widget = lm.threeDWidget(0).threeDView() elif type == slicer.qMRMLScreenShotDialog.Red: # red slice window widget = lm.sliceWidget("Red") elif type == slicer.qMRMLScreenShotDialog.Yellow: # yellow slice window widget = lm.sliceWidget("Yellow") elif type == slicer.qMRMLScreenShotDialog.Green: # green slice window widget = lm.sliceWidget("Green") else: # default to using the full window widget = slicer.util.mainWindow() # reset the type so that the node is set correctly type = slicer.qMRMLScreenShotDialog.FullLayout # grab and convert to vtk image data qpixMap = qt.QPixmap().grabWidget(widget) qimage = qpixMap.toImage() imageData = vtk.vtkImageData() slicer.qMRMLUtils().qImageToVtkImageData(qimage,imageData) annotationLogic = slicer.modules.annotations.logic() annotationLogic.CreateSnapShot(name, description, type, self.screenshotScaleFactor, imageData) def run(self,inputVolume,outputVolume,enableScreenshots=0,screenshotScaleFactor=1): """ Run the actual algorithm """ self.delayDisplay('Running the aglorithm') self.enableScreenshots = enableScreenshots self.screenshotScaleFactor = screenshotScaleFactor self.takeScreenshot('labkeySlicerPythonExtension-Start','Start',-1) return True class labkeyBrowserTest(ScriptedLoadableModuleTest): """ This is the test case for your scripted module. Uses ScriptedLoadableModuleTest base class, available at: https://github.com/Slicer/Slicer/blob/master/Base/Python/slicer/ScriptedLoadableModule.py """ def setUp(self): """ Do whatever is needed to reset the state - typically a scene clear will be enough. """ slicer.mrmlScene.Clear(0) def runTest(self): """Run as few or as many tests as needed here. """ self.setUp() self.test_labkeySlicerPythonExtension1() def test_labkeyBrowser(self): """ Ideally you should have several levels of tests. At the lowest level tests sould exercise the functionality of the logic with different inputs (both valid and invalid). At higher levels your tests should emulate the way the user would interact with your code and confirm that it still works the way you intended. One of the most important features of the tests is that it should alert other developers when their changes will have an impact on the behavior of your module. For example, if a developer removes a feature that you depend on, your test should break so they know that the feature is needed. """ self.delayDisplay("Starting the test") # # first, get some data # import urllib downloads = ( ('http://slicer.kitware.com/midas3/download?items=5767', 'FA.nrrd', slicer.util.loadVolume), ) for url,name,loader in downloads: filePath = slicer.app.temporaryPath + '/' + name if not os.path.exists(filePath) or os.stat(filePath).st_size == 0: print('Requesting download %s from %s...\n' % (name, url)) urllib.urlretrieve(url, filePath) if loader: print('Loading %s...\n' % (name,)) loader(filePath) self.delayDisplay('Finished with download and loading\n') volumeNode = slicer.util.getNode(pattern="FA") logic = labkeySlicerPythonExtensionLogic() self.assertTrue( logic.hasImageData(volumeNode) ) self.delayDisplay('Test passed!') def getHomeDir(): try: return os.environ['HOME'] except: fhome=os.environ['HOMEDRIVE']+os.environ['HOMEPATH'] return fhome def loadLibrary(name): #load library from git, store it at a default location and return path to the stored location remoteSources={ "labkeyInterface":\ "http://wiscigt.powertheword.com/labkey/labkeyinterface/-/archive/master/labkeyinterface-master.zip" } #two steps: #1 Download tempDir=os.path.join(os.path.expanduser('~'),'temp') if not os.path.isdir(tempDir): os.mkdir(tempDir) tempFile=os.path.join(tempDir,name+'.zip') http = urllib3.PoolManager() r = http.request('GET', remoteSources[name], preload_content=False) chunk_size=65536 with open(tempFile, 'wb') as out: while True: data = r.read(chunk_size) if not data: break out.write(data) r.release_conn() #2 Unzip installDir=os.path.join(os.path.expanduser('~'),'.labkey','software','src') if not os.path.isdir(installDir): os.makedirs(installDir) with zipfile.ZipFile(tempFile,'r') as zip_ref: zip_ref.extractall(installDir) #cleanup os.remove(tempFile) zipName=name.lower()+'-master' os.rename(os.path.join(installDir,zipName),os.path.join(installDir,name)) return os.path.join(installDir,name)