#!/usr/bin/env python import sys,os sys.path.insert(0,u'/usr/lib/valentin/') import logging import hwdetector.utils.log as log import tarfile import datetime import time import tempfile import json import argparse import zlib import base64 from ruleset import ruleset try: import threading import gi gi.require_version(u'Gtk',u'3.0') from gi.repository import Gtk,GObject GObject.threads_init() except Exception as e: log.error(u'Unable to import Gtk libraries: {}'.format(e)) def run_in_debug(): gettrace=getattr(sys,u'gettrace',None) if gettrace is None: return False elif gettrace(): return True else: return None def indent_var(st=u'',var={},indent=0): creturn=u'\n' if indent==0: creturn=creturn*2 indentation=u'\t'*indent try: if isinstance(var,dict): for x in sorted(var.keys()): if not (x.lower().startswith(u'helper') and indent==0): st += u'{}{}\n{}{}'.format(indentation,x,indent_var(var=var[x],indent=indent+1),creturn) elif isinstance(var,list) or isinstance(var,tuple): if len(var) == 2 and var[0] == u'__gz__': content=zlib.decompress(base64.b64decode(var[1])).decode(u'utf-8') replaced = content.replace(u'\nu',indentation+u'\n'+indentation) st += u'{}{}'.format(indentation,replaced) else: if isinstance(var,tuple): st += u'{}{}'.format(indentation,var) else: for x in sorted(var): st += u'{}\n'.format(indent_var(var=x,indent=indent+1)) elif isinstance(var,str): try: var = var.encode(u'utf-8') except: var = var.decode(u'utf-8') replaced = var.replace(u'\nu',indentation+u'\n'+indentation) st += u'{}{}'.format(indentation,replaced) elif isinstance(var,unicode): replaced = var.replace(u'\nu',indentation+u'\n'+indentation) st += u'{}{}'.format(indentation,replaced) elif isinstance(var,bool) or isinstance(var,int) or isinstance(var,float) or var == None: st += u'{}{}'.format(indentation,var) else: raise Exception(u'Unknown type var') return st except Exception as e: return u'' def make_file(capabilities,*args,**kwargs): try: txt = json.dumps(capabilities,indent=4,separators=(u',',u':'),sort_keys=True) name = tempfile.mkstemp()[1] with open(name,u'w') as f: f.write(txt.encode(u'utf-8')) return name except Exception as e: return False def make_tar(file,*args,**kwargs): try: datestr=u'{:%Y%m%d%H%M}'.format(datetime.datetime.now()) if kwargs[u'fname']: filename = kwargs[u'fname'] else: filename = u'valentin-info-{}.tar.gz'.format(datestr) if os.path.exists(file): with tarfile.open(filename,u'w:gz') as f: f.add(file,arcname=u'debug-info-{}.txt'.format(datestr),recursive=False) logger_message_file=log.filename_to_log if os.path.exists(logger_message_file): f.add(logger_message_file,arcname=os.path.basename(u'valentin-debug-log-messages.txt')) return filename except Exception as e: return False def run_detection(*args,**kwargs): log.debug("Importing hwdetector") import hwdetector log.debug("Instantiate HwDetector") hwd=hwdetector.HwDetector() #hwd.all_plugins_are_needed=True if not run_in_debug(): log.info(u'USING MAX_RUNNING_TIME !!!') hwd.MAX_RUNNING_TIME = 30 #hwd.fake_capabilities={u'IAMGOD':u'yes'} log.debug("Calling run plugins") #ret = hwd.run(needs=[u'ALL_TESTS']) #ret = hwd.run(needs=[u'LLXSYSTEM_TEST',u'LLXNETWORK_TEST']) ret = hwd.run() log.info("Total running time: {}".format(hwd.RUNNING_TIME)) return (ret,hwd.capabilities) def load_file(*args,**kwargs): if kwargs.get(u'fname',None): filename = kwargs[u'fname'] else: return False if os.path.exists(filename): ftxt=None if tarfile.is_tarfile(filename): with tarfile.open(filename,u'r:gz') as tar: target_file=None for tarinfo in tar.getmembers(): if u'debug-info' in tarinfo.name.lower(): target_file=tarinfo.name break if target_file: ftxt = tar.extractfile(target_file).read() else: ftxt = None with open(filename,u'r') as f: ftxt = f.read() return ftxt else: return False def run_analysis(*args,**kwargs): if kwargs.get(u'capabilities',None): fileinfo = kwargs.get(u'capabilities') if not kwargs.get(u'ruleset',None): fileruleset=u'/usr/share/valentin/valentin.rules' if not os.path.exists(fileruleset): return False else: fileruleset=kwargs.get(u'ruleset')[0] rs = ruleset() try: rs.load_ruleset(fileruleset=fileruleset,data=fileinfo) rs.make_tree() rs.make_suggestion() except Exception as e: log.error(e) return True class Handler: def __init__(self,gui): self.gui = gui def destroy(self,*args,**kwargs): log.debug(u'Exitting gui') sys.exit(0) def key_clicked(self,tselection,tpath,tviewcolumn,*args,**kwargs): model,lpaths=tselection.get_selected_rows() id_selected=model[lpaths][0] if self.gui.coldict[id_selected][u'child']: tview=tselection.get_tree_view() for sel in lpaths: if tview.row_expanded(sel): tview.collapse_row(sel) else: tview.expand_row(sel,False) self.gui.showText(text=indent_var(var=self.gui.coldict[id_selected][u'data'])) else: self.gui.showText(id_selected) class Gui: def __init__(self,*args,**kwargs): self.builder=None self.handler = Handler(self) self._load_glade(**kwargs) #self.buffer=Gtk.TextBuffer() #self.buffer.set_text(self.text) self.textview=self.builder.get_object(u'textview1') self.buffer=self.textview.get_buffer() GObject.idle_add(self.buffer.set_text,u'Loading...',priority=GObject.PRIORITY_DEFAULT) #self.textview.set_buffer(self.buffer) self.id=0 self.done=False threading.Thread(target=self.updateText).start() pass def updateText(self,*args,**kwargs): while not self.done: time.sleep(0.2) #self.buffer.set_text(u'{} {}'.format(self.text,self.id)) #self.textview.set_buffer(self.buffer) GObject.idle_add(self.buffer.set_text,u'{} {} items'.format(u'Parsing data ...',self.id),priority=GObject.PRIORITY_DEFAULT) #self.buffer.set_text(u'') GObject.idle_add(self.buffer.set_text,u'',priority=GObject.PRIORITY_LOW) #self.textview.set_buffer(self.buffer) def _store_data(self,data,id=[0]): if not self.store: return None me=id[0] self.id=me if isinstance(data,dict): for key in sorted(data.keys()): if key.startswith(u'HELPER'): continue id[0]+=1 id_child=id[0] col=self.store.append(self.coldict[me][u'col'],[id_child,key]) self.coldict[me][u'child'].append(id_child) self.coldict[id_child]={u'parent':me,u'col':col,u'data':data[key],u'me':id_child,u'child':[]} self._store_data(data=data[key],id=id) elif isinstance(data,list): if len(data) > 1: if len(data)==2 and data[0] == u'__gz__': self.coldict[me][u'data']=zlib.decompress(base64.b64decode(data[1])) self._store_data(data=self.coldict[me][u'data'],id=id) else: for x in data: id[0]+=1 id_child=id[0] col=self.store.append(self.coldict[me][u'col'],[id_child,u'[{}]'.format(str(data.index(x))).encode(u'utf-8').decode(u'utf-8')]) self.coldict[me][u'child'].append(id_child) self.coldict[id_child]={u'parent':me,u'col':col,u'data':x,u'me':id_child,u'child':[]} self._store_data(data=x,id=id) elif len(data) == 1: self.coldict[me][u'data']=data[0] self._store_data(data=data[0],id=id) elif isinstance(data,tuple): self._store_data(data=list(data),id=id) pass elif isinstance(data,str): try: data=data.encode(u'utf-8').decode(u'utf-8') self.coldict[me][u'data']=data except: pass elif isinstance(data,unicode): try: data.decode(u'utf-8') except: data=data.encode(u'utf-8').decode(u'utf-8') self.coldict[me][u'data']=data else: self._store_data(data=str(data).encode(u'utf-8').decode(u'utf-8'),id=id) return True def showData(self,data,*args,**kwargs): self.treeview=self.builder.get_object(u'treeview1') self.store=self.builder.get_object(u'treestore1') self.storesort=self.builder.get_object(u'treemodelsort1') self.storesort.set_sort_column_id(1,Gtk.SortType.ASCENDING) self.coldict={0:{u'parent':0,u'col':None,u'data':data,u'me':0,u'child':[]}} self._store_data(data=data) self.treeview.set_model(self.storesort) self.done=True def showText(self,id=0,text=u'',*args,**kwargs): if text==u'': if id == 0 or not isinstance(id,int): data=u'' else: coldata=self.coldict[id] data=coldata[u'data'] else: data=text if isinstance(data,unicode) or isinstance(data,str): pass else: data=str(data).encode(u'utf-8').decode(u'utf-8') #self.buffer.set_text(data) GObject.idle_add(self.buffer.set_text,data,priority=GObject.PRIORITY_DEFAULT) #self.textview.set_buffer(self.buffer) return True def _show_window(self,*args,**kwargs): if not (self.builder): return self.mainwindow=self.builder.get_object(u'mainwindow') self.builder.connect_signals(self.handler) self.treeview=self.builder.get_object(u'treeview1') self.treeview.set_model(Gtk.TreeStore(int,str)) self.mainwindow.show_all() pass def _load_glade(self,*args,**kwargs): self.gladefile=kwargs.get(u'gladefile',None) log.info(u'Loading glade file {}'.format(self.gladefile)) if self.gladefile and os.path.exists(self.gladefile): try: self.builder=Gtk.Builder() self.builder.add_from_file(self.gladefile) log.info(u'Loaded gladefile') self._show_window() except Exception as e: log.error(u'Can\'t load gladefile') self.builder=None else: log.error(u'can\'t find gladefile') sys.exit(1) pass if __name__ == u'__main__': try: parser = argparse.ArgumentParser(description=u'Simple system diagnostic tool') dlevels = [u'debug',u'info',u'warning',u'error'] parser.add_argument(u'-d',u'--debug-level',metavar=u'debug|info|warning|error',nargs=u'?',choices=dlevels,help=u'Set the debug level (default: warning)') parser.add_argument(u'-c',u'--coloured',action=u'store_const',help=u'Colorize logger messages',const=True) parser.add_argument(u'-f',u'--with-file',metavar=u'filename',nargs=u'?',help=u'Filename for results file (default valentin-info-(date).tar.gz') parser.add_argument(u'-o',u'--to-stdout',action=u'store_const',help=u'Output results to stdout',const=True) parser.add_argument(u'-w',u'--to-stdout-raw',action=u'store_const',help=u'Output results to stdout in json format',const=True) parser.add_argument(u'-g',u'--graphical-mode',action=u'store_const',help=u'Show results in graphical mode',const=True) parser.add_argument(u'-a',u'--analyze-file',metavar=u'filename',nargs=1,help=u'Filename to analyze') parser.add_argument(u'-x',u'--run-detection',action=u'store_const',help=u'Run detection',const=True) parser.add_argument(u'-s',u'--suggest',action=u'store_const',help=u'Suggest actions to repair system',const=True) parser.add_argument(u'-r',u'--ruleset',metavar=u'filename',nargs=1,help=u'Load ruleset when suggest user actions') args = parser.parse_args() if not args.debug_level: #log.set_level(logging.WARNING) log.disable() else: levels={u'debug':logging.DEBUG,u'info':logging.INFO,u'warning':logging.WARNING,u'error':logging.ERROR} log.set_level(levels[args.debug_level.lower()]) if args.coloured: log.set_color(True) kw=vars(args) fname=None if args.with_file: fname=args.with_file if u'.' not in fname: fname = fname + u'.tar.gz' kw.update({u'fname':fname}) if args.analyze_file: fname=args.analyze_file[0] kw.update({u'fname':fname}) capabilities=load_file(**kw) if capabilities: capabilities = json.loads(capabilities) ret = True log.info("File {} loaded".format(fname)) kw.update({u'capabilities':capabilities}) else: log.error("File {} can't be loaded".format(fname)) ret = False else: if args.run_detection: ret,capabilities=run_detection(**kw) log.info(u'Detection done!') if args.with_file: try: file = make_file(capabilities,**kw) fname = make_tar(file,**kw) log.info("File {} created".format(fname)) except Exception as e: log.error("File creation unsuccessful "+str(e)) kw.update({u'capabilities':capabilities}) else: sys.stderr.write(u'Missing detection option\n') ret = False if ret: if args.to_stdout or args.to_stdout_raw: if args.to_stdout: pr=indent_var(var=capabilities) elif args.to_stdout_raw: for x in (x for x in capabilities.keys() if x.lower().startswith(u'helper')): del capabilities[x] def clear_compressed(var): try: if isinstance(var,tuple) or isinstance(var,list): if len(var) == 2 and var[0]==u'__gz__': try: var=zlib.decompress(base64.b64decode(var[1])).decode(u'utf-8') except: pass return var var=list(var) for i in range(len(var)): var[i]=clear_compressed(var[i]) elif isinstance(var,dict): for x in var: var[x]=clear_compressed(var[x]) return var except Exception as e: log.error(e) capabilities=clear_compressed(capabilities) pr=json.dumps(capabilities,indent=4,separators=(u',',u':'),sort_keys=True) sys.stdout.write(pr.encode(u'utf-8')) sys.stderr.write(u'\n') elif args.graphical_mode: GObject.threads_init() gui = Gui(gladefile=os.path.dirname(__file__)+u'/../lib/valentin/varviewer.glade') threading.Thread(target=gui.showData,args=(capabilities,)).start() sys.exit(Gtk.main()) if args.suggest: ret=run_analysis(**kw) if ret: log.info("Exit code = 0") sys.exit(0) else: log.info("Exit code = 1") sys.exit(1) except Exception as e: log.error("Exception occurred: {}".format(e)) log.error("Exit code = 1") sys.exit(1)