123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175 |
- import urllib3
- #requests doesnt work as it lacks key password checking
- import chardet
- import json
- class labkeyInterface:
- def init(self,fname):
- #fname is a file with configuration
- try:
- f=open(fname)
- except OSError as e:
- print("Configuration error: OS error({}): {} {}".format(e.errno,fname, e.strerror))
- raise
- self.connectionConfig=json.load(f)
- self.initRemote()
- def initRemote(self):
-
- self.http=urllib3.PoolManager()
- if 'SSL' in self.connectionConfig:
- try:
- self.http = urllib3.PoolManager(\
- cert_file=self.connectionConfig['SSL']['user'],\
- cert_reqs='CERT_REQUIRED',\
- key_file=self.connectionConfig['SSL']['key'],\
- ca_certs=self.connectionConfig['SSL']['ca'])
- except KeyError:
- self.http = urllib3.PoolManager(\
- cert_reqs='CERT_REQUIRED',\
- ca_certs=self.connectionConfig['SSL']['ca'])
- #password=self.connectionConfig['SSL']['keyPwd'],\ doesnt work until 1.25
- def GetLabkeyUrl(self):
- return self.connectionConfig['host']+"/labkey"
- def GetLabkeyWebdavUrl(self):
- return self.GetLabkeyUrl()+"/_webdav"
- def getBasicAuth(self):
- user=self.connectionConfig['labkey']['user']
- pwd=self.connectionConfig['labkey']['password']
- return user+":"+pwd
-
-
-
- def get(self,url,binary=False):
- debug=False
- if debug:
- print("GET: {0}".format(url))
- headers=urllib3.util.make_headers(basic_auth=self.getBasicAuth())
- try:
- if not binary:
- return self.http.request('GET',url,headers=headers)
- else:
- return self.http.request('GET',url,headers=headers,preload_content=False)
- #f contains json as a return value
- #f contains json as a return value
- except urllib3.exceptions.HTTPError as e:
- print(e)
- #useful for querying code
- return e
- def head(self,url):
- debug=False
- if debug:
- print("HEAD: {0}".format(url))
- headers=urllib3.util.make_headers(basic_auth=self.getBasicAuth())
-
- try:
- return self.http.request('HEAD',url,headers=headers)
- except urllib3.exceptions.HTTPError as e:
- print(e)
- #usefule for querying code
- return e
- def post(self,url,data):
- debug=False
- headers=urllib3.util.make_headers(basic_auth=self.getBasicAuth())
- headers["Content-Type"]="application/json"
- #add csrf;also sets self.cookie
- headers["X-LABKEY-CSRF"]=self.getCSRF()
- headers["Cookie"]=self.cookie
- try:
- return self.http.request('POST',url,headers=headers,body=data)
- #f contains json as a return value
- except urllib3.exceptions.HTTPError as e:
- print(e)
- def mkcol(self,url):
- debug=False
- headers=urllib3.util.make_headers(basic_auth=self.getBasicAuth())
- #add csrf;also sets self.cookie
- headers["X-LABKEY-CSRF"]=self.getCSRF()
- headers["Cookie"]=self.cookie
- try:
- return self.http.request('MKCOL',url,headers=headers)
- #f contains json as a return value
- except urllib3.exceptions.HTTPError as e:
- print(e)
- def put(self,url,data):
- debug=False
- if debug:
- print("PUT: {}").format(url)
- headers=urllib3.util.make_headers(basic_auth=self.getBasicAuth())
- headers["Content-Type"]="application/octet-stream"
- #add csrf
- headers["X-LABKEY-CSRF"]=self.getCSRF()
- headers["Cookie"]=self.cookie
- try:
- return self.http.request('PUT',url,headers=headers,body=data)
- #f contains json as a return value
- except urllib3.exceptions.HTTPError as e:
- print(e)
-
-
- def getCSRF(self):
- url=self.GetLabkeyUrl()+'/login/whoAmI.view'
- try:
- response=self.get(url)
- encoding=chardet.detect(response.data)['encoding']
- #print(f'Got encoding: {encoding}')
- jsonData=json.loads(response.data.decode(encoding))
- except AttributeError:
- print('Failed')
- return None
- except UnicodeDecodeError:
- try:
- encoding='ASCII'
- jsonData=json.loads(response.data.decode(encoding))
- except UnicodeDecodeError:
- print(response.data)
- return None
- self.cookie=response.getheader('Set-Cookie')
- print('User: {} CSRF: {}'.format(jsonData['displayName'],jsonData['CSRF']))
- return jsonData["CSRF"]
- def getUserId(self):
- url=self.GetLabkeyUrl()+'/login/whoAmI.view'
- try:
- response=self.get(url)
- self.cookie=response.getheader('Set-Cookie')
- encoding=chardet.detect(response.data)['encoding']
- jsonData=json.loads(response.data.decode(encoding))
- return jsonData
- except AttributeError:
- return None
|