import urllib3 #requests doesnt work as it lacks key password checking import chardet import json class labkeyInterface: def init(self,fname): #fname is a file with configuration try: f=open(fname) except OSError as e: print("Configuration error: OS error({}): {} {}".format(e.errno,fname, e.strerror)) raise self.connectionConfig=json.load(f) self.initRemote() def initRemote(self): self.http=urllib3.PoolManager() if 'SSL' in self.connectionConfig: try: self.http = urllib3.PoolManager(\ cert_file=self.connectionConfig['SSL']['user'],\ cert_reqs='CERT_REQUIRED',\ key_file=self.connectionConfig['SSL']['key'],\ ca_certs=self.connectionConfig['SSL']['ca']) except KeyError: self.http = urllib3.PoolManager(\ cert_reqs='CERT_REQUIRED',\ ca_certs=self.connectionConfig['SSL']['ca']) #password=self.connectionConfig['SSL']['keyPwd'],\ doesnt work until 1.25 def GetLabkeyUrl(self): return self.connectionConfig['host']+"/labkey" def GetLabkeyWebdavUrl(self): return self.GetLabkeyUrl()+"/_webdav" def getBasicAuth(self): user=self.connectionConfig['labkey']['user'] pwd=self.connectionConfig['labkey']['password'] return user+":"+pwd def get(self,url,binary=False): debug=False if debug: print("GET: {0}".format(url)) headers=urllib3.util.make_headers(basic_auth=self.getBasicAuth()) try: if not binary: return self.http.request('GET',url,headers=headers) else: return self.http.request('GET',url,headers=headers,preload_content=False) #f contains json as a return value #f contains json as a return value except urllib3.exceptions.HTTPError as e: print(e) #useful for querying code return e def head(self,url): debug=False if debug: print("HEAD: {0}".format(url)) headers=urllib3.util.make_headers(basic_auth=self.getBasicAuth()) try: return self.http.request('HEAD',url,headers=headers) except urllib3.exceptions.HTTPError as e: print(e) #usefule for querying code return e def post(self,url,data): debug=False headers=urllib3.util.make_headers(basic_auth=self.getBasicAuth()) headers["Content-Type"]="application/json" #add csrf;also sets self.cookie headers["X-LABKEY-CSRF"]=self.getCSRF() headers["Cookie"]=self.cookie try: return self.http.request('POST',url,headers=headers,body=data) #f contains json as a return value except urllib3.exceptions.HTTPError as e: print(e) def mkcol(self,url): debug=False headers=urllib3.util.make_headers(basic_auth=self.getBasicAuth()) #add csrf;also sets self.cookie headers["X-LABKEY-CSRF"]=self.getCSRF() headers["Cookie"]=self.cookie try: return self.http.request('MKCOL',url,headers=headers) #f contains json as a return value except urllib3.exceptions.HTTPError as e: print(e) def put(self,url,data): debug=False if debug: print("PUT: {}").format(url) headers=urllib3.util.make_headers(basic_auth=self.getBasicAuth()) headers["Content-Type"]="application/octet-stream" #add csrf headers["X-LABKEY-CSRF"]=self.getCSRF() headers["Cookie"]=self.cookie try: return self.http.request('PUT',url,headers=headers,body=data) #f contains json as a return value except urllib3.exceptions.HTTPError as e: print(e) def getCSRF(self): url=self.GetLabkeyUrl()+'/login/whoAmI.view' try: response=self.get(url) encoding=chardet.detect(response.data)['encoding'] #print(f'Got encoding: {encoding}') jsonData=json.loads(response.data.decode(encoding)) except AttributeError: print('Failed') return None except UnicodeDecodeError: try: encoding='ASCII' jsonData=json.loads(response.data.decode(encoding)) except UnicodeDecodeError: print(response.data) return None self.cookie=response.getheader('Set-Cookie') print('User: {} CSRF: {}'.format(jsonData['displayName'],jsonData['CSRF'])) return jsonData["CSRF"] def getUserId(self): url=self.GetLabkeyUrl()+'/login/whoAmI.view' try: response=self.get(url) self.cookie=response.getheader('Set-Cookie') encoding=chardet.detect(response.data)['encoding'] jsonData=json.loads(response.data.decode(encoding)) return jsonData except AttributeError: return None