#!/usr/bin/python3 # # Examples: # Export all variables into environment:> eval export $(lliurex-detect -e -a) # Test one condition:> lliurex-detect -x thin && echo si || echo no # Test one condition:> if lliurex-detect -x ltsp; then echo si; else echo no; fi # Get flavour:> FLA=$(lliurex-detect -f) # import sys import os import codecs import pwd,grp import argparse import re import json # import glob # Commented: python 3.5 is not available on lliurex 15 import fnmatch # allows search recursively without glob from subprocess import check_output ltsConfFile="/etc/lts.conf" LLIUREX_VERSION_NUMBER="19" LLIUREX_VERSION_CODENAME="bionic" def detect_live(): #test1=False test2=False test3=False if os.path.isdir('/rofs'): #r1=re.compile('^tmpfs\s/cow\stmpfs') r2=re.compile('^/cow\s/\soverlay') r3=re.compile('^/dev/loop0\s/rofs\ssquashfs') with open('/proc/mounts') as mounts_file: for line in mounts_file.readlines(): #if r1.match(line): # test1=True if r2.match(line): test2=True if r3.match(line): test3=True if test2 and test3: pass else: raise Exception('Not live') #def detect_live(): def detect_flavour(): # Use with python > 3.5 with glob new features # cdd_content=[] # file_name= [ filename for filename in glob.iglob('/usr/share/lliurex-cdd/**/cddflavour', recursive=True) ] # if len(file_name) > 0: # for cddfile in file_name: # try: # with open(cddfile,'r') as fileccd: # cdd= [ line.strip() for line in fileccd ] # cdd_content.extend(cdd) # except Exception as e: # raise Exception('cddflavour error '+str(e)) # else: # file_name= [ filename for filename in glob.iglob('/usr/share/lliurex-cdd/**/cddversion', recursive=True) ] # if len(file_name) > 0: # for cddfile in file_name: # try: # with open(cddfile,'r') as fileccd: # cdd= [ line.strip() for line in fileccd ] # cdd_content.extend(cdd) # except Exception as e: # raise Exception('cddversion file error '+str(e)) # else: # raise Exception('cddflavour or cddversion file not found! ') # cdd_content=list(set(cdd_content)) # cdd_content.sort() # return cdd_content # Code for python < 3.5 without glob file_name=[] cdd_content=[] for root,dirnames,filenames in os.walk('/usr/share/lliurex-cdd'): for filename in fnmatch.filter(filenames,'cddflavour'): file_name.append(os.path.join(root,filename)) if len(file_name) > 0: for cddfile in file_name: try: with open(cddfile,'r') as fileccd: cdd= [ line.strip() for line in fileccd ] cdd_content.extend(cdd) except Exception as e: cdd_content.append('None') #raise Exception('cddflavour file error '+str(e)) else: for root,dirnames,filenames in os.walk('/usr/share/lliurex-cdd'): for filename in fnmatch.filter(filenames,'cddversion'): file_name.append(os.path.join(root,filename)) if len(file_name) > 0: for cddfile in file_name: try: with open(cddfile,'r') as fileccd: cdd= [ line.strip() for line in fileccd ] cdd_content.extend(cdd) except Exception as e: cdd_content.append('None') #raise Exception('cddversion file error '+str(e)) else: #raise Exception('cddflavour or cddversion file not found! ') cdd_content.append('None') cdd_content=list(set(cdd_content)) cdd_content.sort() return cdd_content #def detect_flavour def detect_num_cdd(): try: with open(os.devnull, 'w') as devnull: num_cdd=check_output(['dpkg-query','--showformat=\'${Version}\'','--show','lliurex-version-timestamp'],stderr=devnull).decode('utf-8') return num_cdd.strip('\'') except Exception as e1: try: with open('/usr/share/lliurex-cdd/version','r') as cdd_num_file: return cdd_num_file.readline().strip() except Exception as e2: return '0' #raise Exception('Error1: '+e1+' and Error2: '+e2) #def detect_num_cdd(): def get_history_version(): try: with open('/etc/lliurex-cdd-version','r') as etc_cdd_file: return etc_cdd_file.read() except Exception as e: raise Exception('Error: '+e) #def get_history_version() def detect_type(): # # Possible output values: live,thin,semi,fat,unknown # # display=os.environ.get('DISPLAY') #On thin clients display is ip+display so it's at least 7 chars if display != None and len(display)>= 7: client_type='thin' else: if os.path.exists(ltsConfFile): #Attempt to open lts.conf as is more reliable than check environment try: re_true=re.compile('LTSP_FATCLIENT(\s)*=(\s)*true',re.IGNORECASE) re_false=re.compile('LTSP_FATCLIENT(\s)*=(\s)*false',re.IGNORECASE) with open(ltsConfFile) as ltsfile: for line in ltsfile.readlines(): if re_true.match(line): client_type='semi' break elif re_false.match(line): client_type='thin' break else: client_type='unknown' except Exception as e: fatclient=os.environ.get('LTSP_FATCLIENT') if fatclient=='true': client_type="semi" else: if fatclient=='false': client_type="thin" else: client_type="unknown" else: client_type='fat' try: detect_live() client_type=client_type+',live' except: pass return client_type.rstrip() #def detect_type def detect_user(user=''): if user == '' or user == None: user_id=os.getuid() user_name=pwd.getpwuid(user_id)[0] else: try: user_uid=pwd.getpwnam(user)[3] except: raise Exception('user not found!') user_name=user #user_info [0]=>username [1]=>pwd [2]=>uid [3]=>gid [4]=>gecos [5]=>homedir [6]=>shell #grp_info [0]=>name [1]=>pwd [2]=>gid [3]=>member grupos = [ group[0] for group in grp.getgrall() if user_name in group[3] ] with codecs.open('/etc/passwd','r','utf-8') as filepwd: localusers=[ line.split(':')[0] for line in filepwd.readlines() ] ret=user_name if 'admins' in grupos : ret='*'+str(user_name) if user_name in localusers: ret += '/local' else: ret += '/ldap' return ret #def detect_user(): def check_mirror_llxvar(): ret = {} try: try: with open('/var/lib/n4d/variables/LLIUREXMIRROR','r') as fp: content = json.load(fp) except Exception as e: with open('/var/lib/n4d/variables-dir/LLIUREXMIRROR','r') as fp: content = json.load(fp) content = content.get('LLIUREXMIRROR',{}).get('value',{}).get('llx{}'.format(LLIUREX_VERSION_NUMBER),{}) if content: ret.setdefault('status',content.get('status_mirror')) ret.setdefault('size',content.get('mirror_size')) ret.setdefault('progress',content.get('progress')) ret.setdefault('last_update',content.get('last_mirror_date')) return ret except Exception as e: sys.stderr.write('Error reading LLIUREXMIRROR llxvar, {}\n'.format(e)) sys.stderr.flush() return {'status':'Error','size':'Error','progress':'Error','last_update':'Error'} def check_mirror_timestamp(): try: reg = 'lliurex-version-timestamp_[0-9]{2}\.([0-9]+)_[^.]*\.deb$' for f in os.listdir('/net/mirror/llx{}/pool/main/l/lliurex-version-timestamp/'.format(LLIUREX_VERSION_NUMBER)): m = re.findall(reg,f) if m: timestamp = m[0] timestamp = "{}/{}/20{}".format(timestamp[4:6],timestamp[2:4],timestamp[0:2]) return timestamp except Exception as e: sys.stderr.write("Error checking mirror timestamp, {}\n".format(e)) sys.stderr.flush() return 'Error' def check_mirror(): # # Run checks against mirror # mirror_location='/net/mirror/llx{}'.format(LLIUREX_VERSION_NUMBER) #dict = check keys & recurse into values #list = check items #tuple = check items(files or regexp) ; syntax=('type','value') where type = 'regexp' or something else to exact match #string = check items #need_structure={'pool':[{'main':{'l/lliurex-version-timestamp':('regexp','lliurex-version-timestamp_.*\.deb$')}},'universe','multiverse','preschool','restricted'], # 'dists':['xenial','xenial-security','xenial-updates'], # 'lists':[('regexp','.*xenial.*_amd64_Packages$'),('regexp','.*xenial.*_i386_Packages$')]} need_structure={'pool':[{'main':{'l/lliurex-version-timestamp':('regexp','lliurex-version-timestamp_.*\.deb$')}},'universe','multiverse','restricted'], 'dists':['{}'.format(LLIUREX_VERSION_CODENAME.lower()),'{}-security'.format(LLIUREX_VERSION_CODENAME.lower()),'{}-updates'.format(LLIUREX_VERSION_CODENAME.lower())]} try: return os.path.isdir(mirror_location) and check_structure(mirror_location,need_structure) except Exception as e: # As a guest user, /net isn't accessible somehow. Nontheless, if something goes wrong, we return False anyway #print(e) return False def check_structure(path_from,child=''): DBG=False dbg=lambda x: DBG and sys.stdout.write(x+'\n') typeparam=type(child) if typeparam == type(dict()): for item in child: new_path=path_from+'/'+item if not check_structure(path_from,item): return False if not check_structure(new_path,child[item]): return False elif typeparam == type(list()): for item in child: if not check_structure(path_from,item): return False elif typeparam == type(tuple()): mode,value=child if mode=='regexp': reg=re.compile(value) for f in os.listdir(path_from): path_check=path_from+'/'+f dbg('checking file ' + path_check) if os.path.isfile(path_check): if reg.match(f): return True return False else: path_check=path_from+'/'+value dbg('checking file ' + path_check) return os.path.isfile(path_check) elif typeparam == type(str()): path_check=path_from+'/'+child dbg('checking dir ' + path_check) if not os.path.isdir(path_check): return False return True def store_result(results='',namevar='',action='store'): global eval_mode global result global exit_return_code_mode if action == 'init': if eval_mode: result={} return 0 else: result=[] return 0 elif action == 'store': if results=='': raise Exception('Missing param results to store') if eval_mode and namevar=='': raise Exception('Eval mode need namevar param') if eval_mode: if namevar=='USERTYPE': res=results if res[0] == '*': result['PROMOTED_USER']='yes' res=results[1:] else: result['PROMOTED_USER']='no' result['USERNAME']=res.split('/')[0] result['LOGIN_TYPE']=res.split('/')[1] elif namevar=='SESSION_TYPE': res=results.split(',') result['LIVE']='no' result['LTSP']='no' result['THIN']='no' result['SEMI']='no' result['FAT']='no' if len(res) > 1: result['LIVE']='yes' if res[0] != 'fat': result['LTSP']='yes' result[res[0].upper()]='yes' elif namevar=='FLAVOUR': res=results[-1] result['SERVER']='no' result['DESKTOP']='no' result['CLIENT']='no' result['INFANTIL']='no' result['MUSIC']='no' result['PIME']='no' #lliurex 15 specific options & catch all if res.upper() == 'NETWORK-CLIENT-PROMO': result['CLIENT']='yes' elif res.upper() == 'LLIUREX': if 'INFANTIL' in [ x.upper() for x in results ]: result['INFANTIL']='yes' else: result['DESKTOP']='yes' #end lliurex15 specific options elif 'CLIENT' in [ x.upper() for x in results ] and 'EDU' in [ x.upper() for x in results ]: result['CLIENT']='yes' elif 'DESKTOP' in [ x.upper() for x in results ] and 'EDU' in [ x.upper() for x in results ]: result['DESKTOP']='yes' elif '-LITE' in res.upper(): part = res.split('-') if part and isinstance(part,list) and len(part) > 1: result[part[0].upper()]='yes' else: result[res.upper()]='yes' else: result[namevar]=results else: if type(list()) == type(results): res=results[-1] elif type(str()) == type(results): res=results else: raise Exception('Unknown result type to store') if namevar=='FLAVOUR': #lliurex 15 specific options & catch all if res.upper() == 'NETWORK-CLIENT-PROMO': res='client' elif res.upper() == 'LLIUREX': if 'INFANTIL' in [ x.upper() for x in results ]: res='infantil' else: res='desktop' #end lliurex15 specific options elif 'CLIENT' in [ x.upper() for x in results ] and 'EDU' in [ x.upper() for x in results ]: res='client' elif 'DESKTOP' in [ x.upper() for x in results ] and 'EDU' in [ x.upper() for x in results ]: res='desktop' result.append(res) elif action == 'print': if exit_return_code_mode != False: if exit_return_code_mode.upper() == 'LOCAL': if result['LOGIN_TYPE']=='local': sys.exit(0) else: sys.exit(1) elif exit_return_code_mode.upper() == 'LDAP': if result['LOGIN_TYPE']=='ldap': sys.exit(0) else: sys.exit(1) elif exit_return_code_mode.upper() == 'PROMOTED': if result['PROMOTED_USER']=='yes': sys.exit(0) else: sys.exit(1) elif exit_return_code_mode.upper() == 'MIRROR': if result['MIRROR']=='True': sys.exit(0) else: sys.exit(1) else: if result[exit_return_code_mode.upper()]=='yes': sys.exit(0) else: sys.exit(1) if len(result) > 0: if eval_mode: for k,v in result.items(): print(k+'='+v) else: print (','.join(result)) else: raise Exception('Unknow action') #def store_result(): # # MAIN PROGRAM # parser = argparse.ArgumentParser(description='Get information about running environment') parser.add_argument('-a','--all',metavar='',action='store_const',help='Get all information',const=True) parser.add_argument('-m','--mirror',metavar='',action='store_const',help='Check mirror available',const=True) parser.add_argument('-mv','--mirror-version',metavar='',action='store_const',help='Get mirror information',const=True) parser.add_argument('-e','--eval',metavar='',action='store_const',help='Show all information to evaluate in bash variables',const=True) parser.add_argument('-s','--session',metavar='',action='store_const',help='Get current session type',const=True) parser.add_argument('-f','--flavour',metavar='',action='store_const',help='Get the flavour of current system',const=True) parser.add_argument('-u','--usertype',metavar='username',nargs='?',const='',help='Get the usertype from current user or from passed username') code_types=['live','ltsp','fat','semi','thin','desktop','server','client','infantil','pime','music','local','ldap','promoted','mirror'] parser.add_argument('-x','--with-return-code',metavar='[live|ltsp|fat|semi|thin|desktop|server|client|infantil|pime|music|local|ldap|promoted|mirror]',nargs=1,choices=code_types,help='Execute mode testing value passed') # lliurex-version options parser.add_argument('-n','--number',metavar='',action='store_const',const=True,help='Get the cdd number version') parser.add_argument('-v','--version',metavar='',action='store_const',const=True,help='Get the cdd version') parser.add_argument('-t','--test',metavar='cdd_name',help='Test if the cdd is installed') parser.add_argument('--history',metavar='',action='store_const',const=True,help='Get the installed meta\'s history') args=parser.parse_args() #print(args) args_all=args.all #Commented due to compatibility with lliurex-version without parameters #args_all=True #if args.all != True: # for arg in vars(args): # if getattr(args,arg) != None: # args_all=False # break arg_none_for_lliurex_version=True if args.all != True: for arg in vars(args): if getattr(args,arg) != None: arg_none_for_lliurex_version=False break else: arg_none_for_lliurex_version=False eval_mode=args.eval exit_return_code_mode=False if args.with_return_code != None: eval_mode=True args_all=True exit_return_code_mode=args.with_return_code[0] store_result(action='init') try: if args_all or args.session != None: store_result(detect_type(),'SESSION_TYPE') if args_all or args.flavour != None: store_result(detect_flavour(),'FLAVOUR') if args_all or args.usertype != None: store_result(detect_user(args.usertype),'USERTYPE') if args_all or args.mirror != None or args.mirror_version: store_result(str(check_mirror()),'MIRROR') if args_all or args.mirror_version != None: store_result("MIRROR_VERSION_TIMESTAMP={}".format(check_mirror_timestamp()),'MIRROR_TIMESTAMP') llxvar_mirror = check_mirror_llxvar() if llxvar_mirror: store_result("STATUS={status},PROGRESS={progress},SIZE={size},LAST_UPDATE_CHECK={last_update}".format(**llxvar_mirror),'MIRROR_LLXVAR') #lliurex version options if args.number != None: print(detect_num_cdd()) if args.version != None: print(', '.join(detect_flavour())) if arg_none_for_lliurex_version: ret=detect_flavour() ret.append(detect_num_cdd()) print(', '.join(ret)) if args.test != None: try: ret=detect_flavour() except: sys.exit(1) if args.test in ret: sys.exit(0) else: sys.exit(1) if args.history != None: print(get_history_version().rstrip()) #end lliurex-version options store_result(action='print') except Exception as e: print('Error '+str(e)) sys.exit(1) sys.exit(0)