|
- import os
- import unittest
- from __main__ import vtk, qt, ctk, slicer
- from slicer.ScriptedLoadableModule import *
- #import slicerNetwork
- import json
- import zipfile
- import pathlib
- import sys
- import urllib3
- import string
- #
- # labkeySlicerPythonExtension
- #
- class labkeyBrowser(ScriptedLoadableModule):
- """Uses ScriptedLoadableModule base class, available at:
- https://github.com/Slicer/Slicer/blob/master/Base/Python/slicer/ScriptedLoadableModule.py
- """
- def __init__(self, parent):
- ScriptedLoadableModule.__init__(self, parent)
- self.parent.title = "labkeyBrowser" # TODO make this more human readable by adding spaces
- self.parent.categories = ["LabKey"]
- self.parent.dependencies = []
- self.parent.contributors = ["Andrej Studen (UL/FMF)"] # replace with "Firstname Lastname (Organization)"
- self.parent.helpText = """
- Interface to files in LabKey
- """
- self.parent.acknowledgementText = """
- Developed within the medical physics research programme of the Slovenian research agency.
- """ # replace with organization, grant and thanks.
- #
- # labkeySlicerPythonExtensionWidget
- #
- class labkeyBrowserWidget(ScriptedLoadableModuleWidget):
- """Uses ScriptedLoadableModuleWidget base class, available at:
- https://github.com/Slicer/Slicer/blob/master/Base/Python/slicer/ScriptedLoadableModule.py
- """
- def setup(self):
- ScriptedLoadableModuleWidget.setup(self)
- # Instantiate and connect widgets ...
- self.logic=labkeyBrowserLogic(self)
- fhome=os.path.expanduser('~')
- fsetup=os.path.join(fhome,'.labkey','setup.json')
- try:
- with open(fsetup) as f:
- self.setup=json.load(f)
- except FileNotFoundError:
- self.setup={}
- try:
- pt=self.setup['paths']
- except KeyError:
- self.setup['paths']={}
- try:
- sys.path.append(self.setup['paths']['labkeyInterface'])
- except KeyError:
- self.setup['paths']['labkeyInterface']=loadLibrary('labkeyInterface')
- with open(fsetup,'w') as f:
- json.dump(self.setup,f,indent='\t')
-
- import labkeyInterface
- import labkeyDatabaseBrowser
- import labkeyFileBrowser
- #self.network=slicerNetwork.labkeyURIHandler()
- self.network=labkeyInterface.labkeyInterface()
- fconfig=os.path.join(fhome,'.labkey','network.json')
- self.network.init(fconfig)
- self.db=labkeyDatabaseBrowser.labkeyDB(self.network)
- self.fb=labkeyFileBrowser.labkeyFileBrowser(self.network)
- #
- # Parameters Area
- #
- connectionCollapsibleButton = ctk.ctkCollapsibleButton()
- connectionCollapsibleButton.text = "Connection"
- self.layout.addWidget(connectionCollapsibleButton)
- connectionFormLayout = qt.QFormLayout(connectionCollapsibleButton)
- self.configDir=os.path.join(os.path.expanduser('~'),".labkey")
- self.serverURL=qt.QLineEdit("https://merlin.fmf.uni-lj.si")
- #self.serverURL.textChanged.connect(self.updateServerURL)
- connectionFormLayout.addRow("Server: ", self.serverURL)
- #copy initial setting
- #self.updateServerURL(self.serverURL.text);
- self.startDir=os.path.join(os.path.expanduser('~'),"temp","crt")
- self.userCertButton=qt.QPushButton("Load")
- self.userCertButton.toolTip="Load user certificate (crt)"
- self.userCertButton.connect('clicked(bool)',self.onUserCertButtonClicked)
- connectionFormLayout.addRow("User certificate:",self.userCertButton)
- self.privateKeyButton=qt.QPushButton("Load")
- self.privateKeyButton.toolTip="Load private key"
- self.privateKeyButton.connect('clicked(bool)',\
- self.onPrivateKeyButtonClicked)
- connectionFormLayout.addRow("Private key:",self.privateKeyButton)
- self.caCertButton=qt.QPushButton("Load")
- self.caCertButton.toolTip="Load CA certificate (crt)"
- self.caCertButton.connect('clicked(bool)',self.onCaCertButtonClicked)
- connectionFormLayout.addRow("CA certificate:",self.caCertButton)
- self.loadConfigButton=qt.QPushButton("Load configuration")
- self.loadConfigButton.toolTip="Load configuration"
- self.loadConfigButton.connect('clicked(bool)',\
- self.onLoadConfigButtonClicked)
- connectionFormLayout.addRow("Configuration:",self.loadConfigButton)
- self.loadZipButton=qt.QPushButton("Load certificates (.zip)")
- self.loadZipButton.toolTip="Load certificates from zip file"
- self.loadZipButton.connect('clicked(bool)',\
- self.onLoadZipButtonClicked)
- connectionFormLayout.addRow("Load certificates (merlin):",\
- self.loadZipButton)
-
- self.saveConfigButton=qt.QPushButton("Save configuration")
- self.saveConfigButton.toolTip="Save configuration"
- self.saveConfigButton.clicked.connect(self.onSaveConfigButtonClicked)
- connectionFormLayout.addRow("Configuration:",self.saveConfigButton)
-
- self.resetConfigButton=qt.QPushButton("Reset configuration")
- self.resetConfigButton.toolTip="Reset configuration"
- self.resetConfigButton.clicked.connect(self.onResetConfigButtonClicked)
- connectionFormLayout.addRow("Configuration:",self.resetConfigButton)
-
- self.initButton=qt.QPushButton("Init")
- self.initButton.toolTip="Initialize connection to the server"
- self.initButton.connect('clicked(bool)',self.onInitButtonClicked)
- connectionFormLayout.addRow("Connection:",self.initButton)
- self.authName=qt.QLineEdit("email")
- #self.authName.textChanged.connect(self.updateAuthName)
- connectionFormLayout.addRow("Labkey username: ", self.authName)
- self.authPass=qt.QLineEdit("")
- self.authPass.setEchoMode(qt.QLineEdit.Password)
- #self.authPass.textChanged.connect(self.updateAuthPass)
- connectionFormLayout.addRow("Labkey password: ", self.authPass)
-
- fileDialogCollapsibleButton = ctk.ctkCollapsibleButton()
- fileDialogCollapsibleButton.text = "Remote files"
- self.layout.addWidget(fileDialogCollapsibleButton)
- # Layout within the dummy collapsible button
- fileDialogFormLayout = qt.QFormLayout(fileDialogCollapsibleButton)
- #add item list for each found file/directory
- self.fileList=qt.QListWidget()
- self.fileList.toolTip="Select remote file"
- self.fileList.itemDoubleClicked.connect(self.onFileListDoubleClicked)
- #remote dir is a list
- self.currentRemoteDir=[]
- #add dummy entry
- items=('.','..')
- self.populateFileList(items)
- fileDialogFormLayout.addWidget(self.fileList)
- #add selected file display
- self.selectedFile=qt.QLineEdit("")
- self.selectedFile.toolTip="Selected file"
- fileDialogFormLayout.addRow("Selected file :",self.selectedFile)
- #add possible file Content
- self.fileTypeSelector=qt.QComboBox()
- self.fileTypeSelector.toolTip="Select file type"
- items=('VolumeFile','SegmentationFile','TransformFile')
- self.populateFileTypeSelector(items)
- fileDialogFormLayout.addRow("File type :",self.fileTypeSelector)
- self.keepCachedFileCheckBox=qt.QCheckBox("keep cached file")
- self.keepCachedFileCheckBox.toolTip="Toggle local storage of labkey files"
- self.keepCachedFileCheckBox.setChecked(True)
- fileDialogFormLayout.addRow("Manage cache :",self.keepCachedFileCheckBox)
- loadFileButton=qt.QPushButton("Load file")
- loadFileButton.toolTip="Load file"
- loadFileButton.clicked.connect(self.onLoadFileButtonClicked)
- fileDialogFormLayout.addRow("Action :",loadFileButton)
- loadDirButton=qt.QPushButton("Load directory")
- loadDirButton.toolTip="Load directory"
- loadDirButton.clicked.connect(self.onLoadDirButtonClicked)
- fileDialogFormLayout.addRow("Action :",loadDirButton)
- def populateFileList(self,items):
- self.fileList.clear()
- for it_text in items:
- item=qt.QListWidgetItem(self.fileList)
- item.setText(it_text)
- item.setIcon(qt.QIcon(it_text))
- def populateFileTypeSelector(self,items):
- for item in items:
- self.fileTypeSelector.addItem(item)
- def cleanup(self):
- pass
- def onSelect(self):
- self.applyButton.enabled = self.inputSelector.currentNode() and self.outputSelector.currentNode()
- def onApplyButton(self):
- #logic = labkeySlicerPythonExtensionLogic()
- #enableScreenshotsFlag = self.enableScreenshotsFlagCheckBox.checked
- #screenshotScaleFactor = int(self.screenshotScaleFactorSliderWidget.value)
- print("Run the algorithm")
- #logic.run(self.inputSelector.currentNode(), self.outputSelector.currentNode(), enableScreenshotsFlag,screenshotScaleFactor)
- def onUserCertButtonClicked(self):
- filename=qt.QFileDialog.getOpenFileName(None,'Open user certificate',
- self.startDir, '*.crt')
- #pwd=qt.QInputDialog.getText(None,'Certificate password',
- # 'Enter certificate password',qt.QLineEdit.Password)
- if not(filename) :
- print("No file selected")
- return
- f=qt.QFile(filename)
- if not (f.open(qt.QIODevice.ReadOnly)) :
- print("Could not open file")
- return
- certList=qt.QSslCertificate.fromPath(filename)
- if len(certList) < 1:
- print ("Troubles parsing {0}".format(filename))
- return
- self.logic.cert=qt.QSslCertificate(f)
- print("cert.isNull()={0}".format(self.logic.cert.isNull()))
- self.userCertButton.setText(filename)
- self.authName.setText(self.logic.cert.subjectInfo("emailAddress"))
- def onPrivateKeyButtonClicked(self):
- filename=qt.QFileDialog.getOpenFileName(None,'Open private key',
- self.startDir, '*.key')
- if not (filename) :
- print ("No file selected")
- return
- f=qt.QFile(filename)
- if not (f.open(qt.QIODevice.ReadOnly)) :
- print ("Could not open file")
- return
- self.pwd=qt.QInputDialog.getText(None,'Private key password',
- 'Enter key password',qt.QLineEdit.Password)
- self.logic.key=qt.QSslKey(f,qt.QSsl.Rsa,qt.QSsl.Pem,qt.QSsl.PrivateKey,
- str(self.pwd))
- self.privateKeyButton.setText(filename)
- def onCaCertButtonClicked(self):
- filename=qt.QFileDialog.getOpenFileName(None,'Open authority certificate',
- self.startDir, '*.crt')
- if not(filename) :
- print("No file selected")
- return
- f=qt.QFile(filename)
- if not (f.open(qt.QIODevice.ReadOnly)) :
- print("Could not open file")
- return
- certList=qt.QSslCertificate.fromPath(filename)
- if len(certList) < 1:
- print("Troubles parsing {0}".format(filename))
- return
- self.logic.caCert=qt.QSslCertificate(f)#certList[0]
- self.caCertButton.setText(filename)
- def onLoadConfigButtonClicked(self):
- filename=qt.QFileDialog.getOpenFileName(None,'Open configuration file (JSON)',
- self.configDir, '*.json')
-
-
- self.network.init(filename)
- cfg=self.network.connectionConfig
- if 'SSL' in cfg:
- sslSetup=cfg['SSL']
- self.privateKeyButton.setText(sslSetup['key'])
- self.userCertButton.setText(sslSetup['user'])
- self.caCertButton.setText(sslSetup['ca'])
-
- self.serverURL.setText(cfg['host'])
- if 'labkey' in cfg:
- labkey=cfg['labkey']
- self.authName.setText(labkey['user'])
- self.authPass.setText(labkey['password'])
- #self.serverURL.setText(self.network.hostname)
- #self.authName.setText(self.network.auth_name)
- #self.authPass.setText(self.network.auth_pass)
- self.loadConfigButton.setText(os.path.basename(filename))
-
- def onLoadZipButtonClicked(self):
- filename=qt.QFileDialog.getOpenFileName(\
- None,'Open certificate file (zip)',
- self.configDir, '*.zip')
- zpath=pathlib.Path(filename)
- user=zpath.stem
- userDir=os.path.join(os.path.expanduser('~'),'.labkey',user)
- if not os.path.isdir(userDir):
- os.mkdir(userDir)
-
- caName=None
- with zipfile.ZipFile(filename,'r') as zipObj:
- zipObj.extract(user+'.crt',userDir)
- zipObj.extract(user+'.key',userDir)
- zipList=zipObj.namelist()
- for f in zipList:
- if f.find('CA')>-1:
- caName=f
- zipObj.extract(f,userDir)
- self.serverURL.setText('https://merlin.fmf.uni-lj.si')
- self.privateKeyButton.setText(os.path.join(userDir,user+'.key'))
- self.userCertButton.setText(os.path.join(userDir,user+'.crt'))
- self.pwd='notUsed'
- self.caCertButton.setText(os.path.join(userDir,caName))
- self.authName.setText("guest")
- self.authPass.setText("guest")
- def onSaveConfigButtonClicked(self):
- connectionConfig=self.generateConfig()
- fhome=os.path.expanduser('~')
- labkeyDir=os.path.join(fhome,".labkey")
- if not os.path.isdir(labkeyDir):
- os.mkdir(labkeyDir)
- fconfig=os.path.join(labkeyDir,'network.json')
- with open(fconfig,'w') as f:
- json.dump(connectionConfig,f,indent=3)
- print("Done")
- def generateConfig(self):
- connectionConfig={}
-
- #setup SSL
- if self.privateKeyButton.text=="Load" or\
- self.userCertButton.text=="Load" or\
- self.caCertButton.text=="Load":
- #no ssl
- pass
- else:
- sslSetup={}
- sslSetup['user']=self.userCertButton.text
- sslSetup['key']=self.privateKeyButton.text
- sslSetup['ca']=self.caCertButton.text
- try:
- sslSetup['keyPwd']=self.pwd
- except AttributeError:
- sslSetup['keyPwd']='notUsed'
- connectionConfig['SSL']=sslSetup
- connectionConfig["host"]=self.serverURL.text
- connectionConfig["context"]="labkey"
- labkeySetup={}
- labkeySetup['user']=self.authName.text
- labkeySetup['password']=self.authPass.text
- connectionConfig['labkey']=labkeySetup
- orthancSetup={}
- if self.serverURL.text.find('merlin')>-1:
- orthancSetup['server']='https://orthanc.fmf.uni-lj.si'
- if self.serverURL.text.find('onko-nix')>-1:
- orthancSetup['server']='http://onko-nix.onko-i.si:8042'
- orthancSetup['user']='ask'
- orthancSetup['password']='askPassword'
- connectionConfig['orthanc']=orthancSetup
- return connectionConfig
- def onResetConfigButtonClicked(self):
- self.serverURL.setText('URL')
- self.privateKeyButton.setText("Load")
- self.userCertButton.setText("Load")
- self.caCertButton.setText("Load")
- self.authName.setText("labkey username")
- self.authPass.setText("password")
- def onInitButtonClicked(self):
- self.network.connectionConfig=self.generateConfig()
- print(self.network.connectionConfig['labkey']['user'])
- self.network.initRemote()
- remoteId=self.network.getUserId()
- if remoteId==None:
- self.initButton.setStyleSheet("background-color: red")
- else:
- if remoteId['email']==self.network.connectionConfig['labkey']['user']:
- self.initButton.setStyleSheet("background-color: green")
- else:
- self.initButton.setStyleSheet("background-color: orange")
- #def updateAuthName(self,txt):
- #self.network.auth_name=txt
- #print("Setting username to {0}".format(self.network.auth_name))
- #def updateAuthPass(self,txt):
- #self.network.auth_pass=txt
- #print("Setting password.")
- #def updateServerURL(self,txt):
- #self.network.hostname=txt
- #print("Setting hostname to {}".format(self.network.hostname))
- def onFileListDoubleClicked(self,item):
- if item == None:
- print("Selected items: None")
- return
- iText=item.text()
- print("Selected items: {0} ".format(iText))
-
- #this is hard -> compose path string from currentRemoteDir and selection
- if item.text().find('..')==0:
- #one up
- try:
- self.currentRemoteDir.pop()
- except IndexError:
- pass
- elif item.text().find('.')==0:
- pass
- else:
- self.currentRemoteDir.append(item.text())
- print("Listing {}".format(self.currentRemoteDir))
- remoteDirString=self.fb.GetRootUrl()
- if len(self.currentRemoteDir):
- remoteDirString+='/'+'/'.join(self.currentRemoteDir)
- ok,lst=self.fb.listRemoteDir(remoteDirString)
- flist=[self.fb.baseDir(f) for f in lst]
- print("Got")
- print(flist)
- flist.insert(0,'..')
- flist.insert(0,'.')
- self.populateFileList(flist)
- self.selectedFile.setText(self.currentRemoteDir)
- def onLoadFileButtonClicked(self):
- properties={}
-
- #local path
- tempDir=os.path.join(os.path.expanduser('~'),'temp')
- if not os.path.isdir(tempDir):
- os.mkdir(tempDir)
- localPath=os.path.join(tempDir,self.currentRemoteDir[-1])
- print(localPath)
- #remote path
- remotePath=self.fb.GetRootUrl()+'/'+'/'.join(self.currentRemoteDir)
- print(remotePath)
- #copy over
- self.fb.readFileToFile(remotePath,localPath)
-
- #do slicer magic
- slicer.util.loadNodeFromFile(localPath,self.fileTypeSelector.currentText,
- properties,returnNode=False)
-
- #if not self.keepCachedFileCheckBox.isChecked():
- os.remove(localPath)
- def onLoadDirButtonClicked(self):
- print('Not implemented')
- # labkeySlicerPythonExtensionLogic
- #
- class labkeyBrowserLogic(ScriptedLoadableModuleLogic):
- """This class should implement all the actual
- computation done by your module. The interface
- should be such that other python code can import
- this class and make use of the functionality without
- requiring an instance of the Widget.
- Uses ScriptedLoadableModuleLogic base class, available at:
- https://github.com/Slicer/Slicer/blob/master/Base/Python/slicer/ScriptedLoadableModule.py
- """
- def __init__(self,parent):
- ScriptedLoadableModuleLogic.__init__(self, parent)
- self.qnam=qt.QNetworkAccessManager()
- def hasImageData(self,volumeNode):
- """This is a dummy logic method that
- returns true if the passed in volume
- node has valid image data
- """
- if not volumeNode:
- print('no volume node')
- return False
- if volumeNode.GetImageData() == None:
- print('no image data')
- return False
- return True
- def takeScreenshot(self,name,description,type=-1):
- # show the message even if not taking a screen shot
- self.delayDisplay(description)
- if self.enableScreenshots == 0:
- return
- lm = slicer.app.layoutManager()
- # switch on the type to get the requested window
- widget = 0
- if type == slicer.qMRMLScreenShotDialog.FullLayout:
- # full layout
- widget = lm.viewport()
- elif type == slicer.qMRMLScreenShotDialog.ThreeD:
- # just the 3D window
- widget = lm.threeDWidget(0).threeDView()
- elif type == slicer.qMRMLScreenShotDialog.Red:
- # red slice window
- widget = lm.sliceWidget("Red")
- elif type == slicer.qMRMLScreenShotDialog.Yellow:
- # yellow slice window
- widget = lm.sliceWidget("Yellow")
- elif type == slicer.qMRMLScreenShotDialog.Green:
- # green slice window
- widget = lm.sliceWidget("Green")
- else:
- # default to using the full window
- widget = slicer.util.mainWindow()
- # reset the type so that the node is set correctly
- type = slicer.qMRMLScreenShotDialog.FullLayout
- # grab and convert to vtk image data
- qpixMap = qt.QPixmap().grabWidget(widget)
- qimage = qpixMap.toImage()
- imageData = vtk.vtkImageData()
- slicer.qMRMLUtils().qImageToVtkImageData(qimage,imageData)
- annotationLogic = slicer.modules.annotations.logic()
- annotationLogic.CreateSnapShot(name, description, type, self.screenshotScaleFactor, imageData)
- def run(self,inputVolume,outputVolume,enableScreenshots=0,screenshotScaleFactor=1):
- """
- Run the actual algorithm
- """
- self.delayDisplay('Running the aglorithm')
- self.enableScreenshots = enableScreenshots
- self.screenshotScaleFactor = screenshotScaleFactor
- self.takeScreenshot('labkeySlicerPythonExtension-Start','Start',-1)
- return True
- class labkeyBrowserTest(ScriptedLoadableModuleTest):
- """
- This is the test case for your scripted module.
- Uses ScriptedLoadableModuleTest base class, available at:
- https://github.com/Slicer/Slicer/blob/master/Base/Python/slicer/ScriptedLoadableModule.py
- """
- def setUp(self):
- """ Do whatever is needed to reset the state - typically a scene clear will be enough.
- """
- slicer.mrmlScene.Clear(0)
- def runTest(self):
- """Run as few or as many tests as needed here.
- """
- self.setUp()
- self.test_labkeySlicerPythonExtension1()
- def test_labkeyBrowser(self):
- """ Ideally you should have several levels of tests. At the lowest level
- tests sould exercise the functionality of the logic with different inputs
- (both valid and invalid). At higher levels your tests should emulate the
- way the user would interact with your code and confirm that it still works
- the way you intended.
- One of the most important features of the tests is that it should alert other
- developers when their changes will have an impact on the behavior of your
- module. For example, if a developer removes a feature that you depend on,
- your test should break so they know that the feature is needed.
- """
- self.delayDisplay("Starting the test")
- #
- # first, get some data
- #
- import urllib
- downloads = (
- ('http://slicer.kitware.com/midas3/download?items=5767', 'FA.nrrd', slicer.util.loadVolume),
- )
- for url,name,loader in downloads:
- filePath = slicer.app.temporaryPath + '/' + name
- if not os.path.exists(filePath) or os.stat(filePath).st_size == 0:
- print('Requesting download %s from %s...\n' % (name, url))
- urllib.urlretrieve(url, filePath)
- if loader:
- print('Loading %s...\n' % (name,))
- loader(filePath)
- self.delayDisplay('Finished with download and loading\n')
- volumeNode = slicer.util.getNode(pattern="FA")
- logic = labkeySlicerPythonExtensionLogic()
- self.assertTrue( logic.hasImageData(volumeNode) )
- self.delayDisplay('Test passed!')
- def getHomeDir():
- try:
- return os.environ['HOME']
- except:
- fhome=os.environ['HOMEDRIVE']+os.environ['HOMEPATH']
- return fhome
- def loadLibrary(name):
- #load library from git, store it at a default location and return path to the stored location
- remoteSources={
- "labkeyInterface":\
- "http://wiscigt.powertheword.com/labkey/labkeyinterface/-/archive/master/labkeyinterface-master.zip"
- }
- #two steps:
- #1 Download
- tempDir=os.path.join(os.path.expanduser('~'),'temp')
- if not os.path.isdir(tempDir):
- os.mkdir(tempDir)
- tempFile=os.path.join(tempDir,name+'.zip')
-
- http = urllib3.PoolManager()
- r = http.request('GET', remoteSources[name], preload_content=False)
- chunk_size=65536
- with open(tempFile, 'wb') as out:
- while True:
- data = r.read(chunk_size)
- if not data:
- break
- out.write(data)
- r.release_conn()
-
- #2 Unzip
- installDir=os.path.join(os.path.expanduser('~'),'.labkey','software','src')
- if not os.path.isdir(installDir):
- os.makedirs(installDir)
-
- with zipfile.ZipFile(tempFile,'r') as zip_ref:
- zip_ref.extractall(installDir)
- #cleanup
- os.remove(tempFile)
- zipName=name.lower()+'-master'
- os.rename(os.path.join(installDir,zipName),os.path.join(installDir,name))
- return os.path.join(installDir,name)
|