#!/usr/bin/env python3 import sys,os sys.path.insert(0,'/usr/lib/valentin/') import logging import hwdetector.utils.log as log import tarfile import datetime import time import tempfile import json import argparse import zlib import base64 from ruleset import ruleset try: import threading import gi gi.require_version('Gtk','3.0') from gi.repository import Gtk,GObject #GObject.threads_init() except Exception as e: log.error('Unable to import Gtk libraries: {}'.format(e)) def run_in_debug(): gettrace=getattr(sys,'gettrace',None) if gettrace is None: return False elif gettrace(): return True else: return None def indent_var(st='',var={},indent=0): creturn='\n' if indent==0: creturn=creturn*2 indentation='\t'*indent try: if isinstance(var,dict): for x in sorted(var.keys()): if not (x.lower().startswith('helper') and indent==0): st += '{}{}\n{}{}'.format(indentation,x,indent_var(var=var[x],indent=indent+1),creturn) elif isinstance(var,list) or isinstance(var,tuple): if len(var) == 2 and var[0] == '__gz__': content=zlib.decompress(base64.b64decode(var[1])).decode('utf-8') replaced = content.replace('\nu',indentation+'\n'+indentation) st += '{}{}'.format(indentation,replaced) else: if isinstance(var,tuple): st += '{}{}'.format(indentation,var) else: for x in sorted(var): st += '{}\n'.format(indent_var(var=x,indent=indent+1)) elif isinstance(var,str): try: var = var.encode('utf-8') except: var = var.decode('utf-8') replaced = var.replace('\nu',indentation+'\n'+indentation) st += '{}{}'.format(indentation,replaced) elif isinstance(var,str): replaced = var.replace('\nu',indentation+'\n'+indentation) st += '{}{}'.format(indentation,replaced) elif isinstance(var,bool) or isinstance(var,int) or isinstance(var,float) or var == None: st += '{}{}'.format(indentation,var) else: raise Exception('Unknown type var') return st except Exception as e: return '' def make_file(capabilities,*args,**kwargs): try: txt = json.dumps(capabilities,indent=4,separators=(',',':'),sort_keys=True) name = tempfile.mkstemp()[1] with open(name,'w') as f: f.write(txt.encode('utf-8')) return name except Exception as e: return False def make_tar(file,*args,**kwargs): try: datestr='{:%Y%m%d%H%M}'.format(datetime.datetime.now()) if kwargs['fname']: filename = kwargs['fname'] else: filename = 'valentin-info-{}.tar.gz'.format(datestr) if os.path.exists(file): with tarfile.open(filename,'w:gz') as f: f.add(file,arcname='debug-info-{}.txt'.format(datestr),recursive=False) logger_message_file=log.filename_to_log if os.path.exists(logger_message_file): f.add(logger_message_file,arcname=os.path.basename('valentin-debug-log-messages.txt')) return filename except Exception as e: return False def run_detection(*args,**kwargs): print('run detection') log.debug("Importing hwdetector") import hwdetector log.debug("Instantiate HwDetector") hwd=hwdetector.HwDetector() #hwd.all_plugins_are_needed=True if not run_in_debug(): log.info('USING MAX_RUNNING_TIME !!!') hwd.MAX_RUNNING_TIME = 30 #hwd.fake_capabilities={u'IAMGOD':u'yes'} log.debug("Calling run plugins") #ret = hwd.run(needs=[u'ALL_TESTS']) #ret = hwd.run(needs=[u'LLXSYSTEM_TEST',u'LLXNETWORK_TEST']) ret = hwd.run() log.info("Total running time: {}".format(hwd.RUNNING_TIME)) return (ret,hwd.capabilities) def load_file(*args,**kwargs): if kwargs.get('fname',None): filename = kwargs['fname'] else: return False if os.path.exists(filename): ftxt=None if tarfile.is_tarfile(filename): with tarfile.open(filename,'r:gz') as tar: target_file=None for tarinfo in tar.getmembers(): if 'debug-info' in tarinfo.name.lower(): target_file=tarinfo.name break if target_file: ftxt = tar.extractfile(target_file).read() else: ftxt = None with open(filename,'r') as f: ftxt = f.read() return ftxt else: return False def run_analysis(*args,**kwargs): if kwargs.get('capabilities',None): fileinfo = kwargs.get('capabilities') if not kwargs.get('ruleset',None): fileruleset='/usr/share/valentin/valentin.rules' if not os.path.exists(fileruleset): return False else: fileruleset=kwargs.get('ruleset')[0] rs = ruleset() try: rs.load_ruleset(fileruleset=fileruleset,data=fileinfo) rs.make_tree() rs.make_suggestion() except Exception as e: log.error(e) return True class Handler: def __init__(self,gui): self.gui = gui def destroy(self,*args,**kwargs): log.debug('Exitting gui') sys.exit(0) def key_clicked(self,tselection,tpath,tviewcolumn,*args,**kwargs): model,lpaths=tselection.get_selected_rows() id_selected=model[lpaths][0] if self.gui.coldict[id_selected]['child']: tview=tselection.get_tree_view() for sel in lpaths: if tview.row_expanded(sel): tview.collapse_row(sel) else: tview.expand_row(sel,False) self.gui.showText(text=indent_var(var=self.gui.coldict[id_selected]['data'])) else: self.gui.showText(id_selected) class Gui: def __init__(self,*args,**kwargs): self.builder=None self.handler = Handler(self) self._load_glade(**kwargs) #self.buffer=Gtk.TextBuffer() #self.buffer.set_text(self.text) self.textview=self.builder.get_object('textview1') self.buffer=self.textview.get_buffer() GObject.idle_add(self.buffer.set_text,'Loading...',priority=GObject.PRIORITY_DEFAULT) #self.textview.set_buffer(self.buffer) self.id=0 self.done=False threading.Thread(target=self.updateText).start() pass def updateText(self,*args,**kwargs): while not self.done: time.sleep(0.2) #self.buffer.set_text(u'{} {}'.format(self.text,self.id)) #self.textview.set_buffer(self.buffer) GObject.idle_add(self.buffer.set_text,'{} {} items'.format('Parsing data ...',self.id),priority=GObject.PRIORITY_DEFAULT) #self.buffer.set_text(u'') GObject.idle_add(self.buffer.set_text,'',priority=GObject.PRIORITY_LOW) #self.textview.set_buffer(self.buffer) def _store_data(self,data,id=[0]): if not self.store: return None me=id[0] self.id=me if isinstance(data,dict): for key in sorted(data.keys()): if key.startswith('HELPER'): continue id[0]+=1 id_child=id[0] col=self.store.append(self.coldict[me]['col'],[id_child,key]) self.coldict[me]['child'].append(id_child) self.coldict[id_child]={'parent':me,'col':col,'data':data[key],'me':id_child,'child':[]} self._store_data(data=data[key],id=id) elif isinstance(data,list): if len(data) > 1: if len(data)==2 and data[0] == '__gz__': self.coldict[me]['data']=zlib.decompress(base64.b64decode(data[1])) self._store_data(data=self.coldict[me]['data'],id=id) else: for x in data: id[0]+=1 id_child=id[0] col=self.store.append(self.coldict[me]['col'],[id_child,'[{}]'.format(str(data.index(x))).encode('utf-8').decode('utf-8')]) self.coldict[me]['child'].append(id_child) self.coldict[id_child]={'parent':me,'col':col,'data':x,'me':id_child,'child':[]} self._store_data(data=x,id=id) elif len(data) == 1: self.coldict[me]['data']=data[0] self._store_data(data=data[0],id=id) elif isinstance(data,tuple): self._store_data(data=list(data),id=id) pass elif isinstance(data,str): try: data=data.encode('utf-8').decode('utf-8') self.coldict[me]['data']=data except: pass elif isinstance(data,str): try: data.decode('utf-8') except: data=data.encode('utf-8').decode('utf-8') self.coldict[me]['data']=data else: self._store_data(data=str(data).encode('utf-8').decode('utf-8'),id=id) return True def showData(self,data,*args,**kwargs): self.treeview=self.builder.get_object('treeview1') self.store=self.builder.get_object('treestore1') self.storesort=self.builder.get_object('treemodelsort1') self.storesort.set_sort_column_id(1,Gtk.SortType.ASCENDING) self.coldict={0:{'parent':0,'col':None,'data':data,'me':0,'child':[]}} self._store_data(data=data) self.treeview.set_model(self.storesort) self.done=True def showText(self,id=0,text='',*args,**kwargs): if text=='': if id == 0 or not isinstance(id,int): data='' else: coldata=self.coldict[id] data=coldata['data'] else: data=text if isinstance(data,str) or isinstance(data,str): pass else: data=str(data).encode('utf-8').decode('utf-8') #self.buffer.set_text(data) GObject.idle_add(self.buffer.set_text,data,priority=GObject.PRIORITY_DEFAULT) #self.textview.set_buffer(self.buffer) return True def _show_window(self,*args,**kwargs): if not (self.builder): return self.mainwindow=self.builder.get_object('mainwindow') self.builder.connect_signals(self.handler) self.treeview=self.builder.get_object('treeview1') self.treeview.set_model(Gtk.TreeStore(int,str)) self.mainwindow.show_all() pass def _load_glade(self,*args,**kwargs): self.gladefile=kwargs.get('gladefile',None) log.info('Loading glade file {}'.format(self.gladefile)) if self.gladefile and os.path.exists(self.gladefile): try: self.builder=Gtk.Builder() self.builder.add_from_file(self.gladefile) log.info('Loaded gladefile') self._show_window() except Exception as e: log.error('Can\'t load gladefile') self.builder=None else: log.error('can\'t find gladefile') sys.exit(1) pass if __name__ == '__main__': try: parser = argparse.ArgumentParser(description='Simple system diagnostic tool') dlevels = ['debug','info','warning','error'] parser.add_argument('-d','--debug-level',metavar='debug|info|warning|error',nargs='?',choices=dlevels,help='Set the debug level (default: warning)') parser.add_argument('-c','--coloured',action='store_const',help='Colorize logger messages',const=True) parser.add_argument('-f','--with-file',metavar='filename',nargs='?',help='Filename for results file (default valentin-info-(date).tar.gz') parser.add_argument('-o','--to-stdout',action='store_const',help='Output results to stdout',const=True) parser.add_argument('-w','--to-stdout-raw',action='store_const',help='Output results to stdout in json format',const=True) parser.add_argument('-g','--graphical-mode',action='store_const',help='Show results in graphical mode',const=True) parser.add_argument('-a','--analyze-file',metavar='filename',nargs=1,help='Filename to analyze') parser.add_argument('-x','--run-detection',action='store_const',help='Run detection',const=True) parser.add_argument('-s','--suggest',action='store_const',help='Suggest actions to repair system',const=True) parser.add_argument('-r','--ruleset',metavar='filename',nargs=1,help='Load ruleset when suggest user actions') args = parser.parse_args() if not args.debug_level: #log.set_level(logging.WARNING) log.disable() else: levels={'debug':logging.DEBUG,'info':logging.INFO,'warning':logging.WARNING,'error':logging.ERROR} log.set_level(levels[args.debug_level.lower()]) if args.coloured: log.set_color(True) kw=vars(args) fname=None if args.with_file: fname=args.with_file if '.' not in fname: fname = fname + '.tar.gz' kw.update({'fname':fname}) if args.analyze_file: fname=args.analyze_file[0] kw.update({'fname':fname}) capabilities=load_file(**kw) if capabilities: capabilities = json.loads(capabilities) ret = True log.info("File {} loaded".format(fname)) kw.update({'capabilities':capabilities}) else: log.error("File {} can't be loaded".format(fname)) ret = False else: if args.run_detection: ret,capabilities=run_detection(**kw) log.info('Detection done!') if args.with_file: try: file = make_file(capabilities,**kw) fname = make_tar(file,**kw) log.info("File {} created".format(fname)) except Exception as e: log.error("File creation unsuccessful "+str(e)) kw.update({'capabilities':capabilities}) else: sys.stderr.write('Missing detection option\n') ret = False if ret: if args.to_stdout or args.to_stdout_raw: if args.to_stdout: pr=indent_var(var=capabilities) elif args.to_stdout_raw: for x in (x for x in list(capabilities.keys()) if x.lower().startswith('helper')): del capabilities[x] def clear_compressed(var): try: if isinstance(var,tuple) or isinstance(var,list): if len(var) == 2 and var[0]=='__gz__': try: var=zlib.decompress(base64.b64decode(var[1])).decode('utf-8') except: pass return var var=list(var) for i in range(len(var)): var[i]=clear_compressed(var[i]) elif isinstance(var,dict): for x in var: var[x]=clear_compressed(var[x]) return var except Exception as e: log.error(e) capabilities=clear_compressed(capabilities) pr=json.dumps(capabilities,indent=4,separators=(',',':'),sort_keys=True) sys.stdout.write(str(pr)) sys.stderr.write('\n') elif args.graphical_mode: GObject.threads_init() gui = Gui(gladefile=os.path.dirname(__file__)+'/../lib/valentin/varviewer.glade') threading.Thread(target=gui.showData,args=(capabilities,)).start() sys.exit(Gtk.main()) if args.suggest: ret=run_analysis(**kw) if ret: log.info("Exit code = 0") sys.exit(0) else: log.info("Exit code = 1") sys.exit(1) except Exception as e: log.error("Exception occurred: {}".format(e)) log.error("Exit code = 1") sys.exit(1)