#!/usr/bin/env python3 import time #import threading #from multiprocessing.dummy import Pool as ThreadPool from multiprocessing.dummy import Process,Lock #import multiprocessing import sys import MySQLdb as mdb import signal import os import queue import hashlib import gc import os import psutil from functools import wraps import re import daemon import lockfile import logging import logging.handlers from logging import config from functools import reduce import traceback ##### START EDITABLE VARS ##### DEBUG=False MIN_LOG_LEVEL=logging.INFO DAEMON=True FILE='/usr/lib/analytics-server/analytics/config.php' LIMIT=70.0 # PERCENTAGE LOAD LIMIT WARNING_MIN_HEAP = 256 ##### END EDITABLE VARS ##### DEBUG_PRINT_ALIVE_COUNTER=60 THREADED=True MAX_SLOW_CHECK_CLIENTS = 30 USE_FILE_EVENT_LOG=False USE_FILE_EVENT_DB=False USE_CONFIG_FILE_LOG=False CONFIG_FILE_LOG='/var/run/analyticsd_config.log' FILE_EVENT_LOG='/var/run/analyticsd_event.log' if DEBUG: if MIN_LOG_LEVEL: loglevel=MIN_LOG_LEVEL else: loglevel=logging.DEBUG else: loglevel=logging.INFO LOGGING = { 'version': 1, 'disable_existing_loggers': False, 'formatters': { 'verbose': { 'format': '%(levelname)s %(module)s %(message)s' #'format': '%(levelname)s %(module)s P%(process)d T%(thread)d %(message)s' }, }, 'handlers': { 'stdout': { 'class': 'logging.StreamHandler', 'stream': sys.stdout, 'formatter': 'verbose', }, 'sys-logger6': { 'class': 'logging.handlers.SysLogHandler', 'address': '/dev/log', 'facility': "local6", 'formatter': 'verbose', }, }, 'loggers': { 'analyticsd-logger': { 'handlers': ['sys-logger6','stdout'], 'level': loglevel, 'propagate': True, }, } } def to_str(x): if isinstance(x,str): return to_utf(x).decode('unicode_escape') else: return x def to_utf(x): if isinstance(x,str): return x.encode('utf-8') else: return x def printea(msg="",level='critical'): if level == 'critical': logger.critical(msg) elif level == 'error': logger.error(msg) elif level == 'warning': logger.warning(msg) elif level == 'info': logger.info(msg) else: logger.debug(msg) def keepalive(who=""): global DEBUG_PRINT_ALIVE_COUNTER t=str(int(time.time())) if DEBUG: pass # too much verbose # printea('on {} analyticsd({}) is alive, all good :-)'.format(t,who),'debug') else: if who == 'main': if DEBUG_PRINT_ALIVE_COUNTER > 0: DEBUG_PRINT_ALIVE_COUNTER=DEBUG_PRINT_ALIVE_COUNTER-1 else: DEBUG_PRINT_ALIVE_COUNTER=60 printea('on {} analyticsd({}) is alive, all good :-)'.format(t,who),'info') fp = open('/var/run/analyticsd.keepalive','w'); fp.write(t) fp.close() config.dictConfig(LOGGING) logger = logging.getLogger('analyticsd-logger') DBNAME=None USER=None PASS=None IP=None EMPTY_PAUSED_SLEEP=1 CHECK_LOAD_TIME=5 MAX_RETRIES=5 TIMED_ON=False try: with open(FILE,'r') as f: for line in f: if not IP: IP = re.search("^\s*[$]dbhost\s*=\s*'(\w+)'\s*;\s*$",line) if IP: IP = IP.group(1) if not DBNAME: DBNAME = re.search("^\s*[$]dbname\s*=\s*'(\w+)'\s*;\s*$",line) if DBNAME: DBNAME = DBNAME.group(1) if not USER: USER = re.search("^\s*[$]dbuser\s*=\s*'(\w+)'\s*;\s*$",line) if USER: USER = USER.group(1) if not PASS: PASS = re.search("^\s*[$]dbpass\s*=\s*'(\w+)'\s*;\s*$",line) if PASS: PASS = PASS.group(1) if not (IP or DBNAME or USER or PASS): printea("Couldn't get database configuration from {}".format(FILE)) else: printea("Using IP:{} DBNAME:{} USER:{} PASS:{}".format(IP,DBNAME,USER,PASS),'debug') except Exception as e: printea("Couldn't parse {} Error:\"{}\"".format(FILE,str(e))) sys.exit(1) if not (IP or DBNAME or USER or PASS): printea("Couldn't get database configuration from {}".format(FILE)) sys.exit(1) if THREADED: THREADS=['main','server','client','desktop','other'] else: THREADS=['main'] class DB(): def __init__(self,mon,t='main'): self._initialized=False self.t=t self.reconnect=0 self.empty = False self.conn=None self.mon=mon self.q=queue.Queue() self.processed=0 self.need_clean = False printea('Database worker {} initialized'.format(t),'info') def timed(func): @wraps(func) def wrapper(*args,**kwargs): if TIMED_ON: printea("Start({}): @{}".format(func.__name__,time.time()),'debug') ret=func(*args,**kwargs) if TIMED_ON: printea("End ({}): @{}".format(func.__name__,time.time()),'debug') return ret return wrapper def with_retry(func): @wraps(func) def wrapper(*args,**kwargs): if 'retry' not in kwargs: kwargs['retry']=1 if 'mon' not in kwargs: kwargs['mon'] = None try: return func(*args,**kwargs) except Exception as e: printea("Error in ({}) retry={}/{}: {}".format(func.__name__,kwargs['retry'],MAX_RETRIES,str(e))) if kwargs['retry'] == MAX_RETRIES or (kwargs['mon'] and kwargs['mon'].terminate): printea("Fatal error in ({}), max retries exceded".format(func.__name__)) if kwargs['mon']: kwargs['mon'].term() sys.exit(1) return None else: time.sleep(kwargs['retry']**2) kwargs['retry']+=1 return wrapper(*args,**kwargs) return result return wrapper def with_debug(func): @wraps(func) def wrapper(*args,**kwargs): if 'query' in kwargs: printea("executing query: {}".format(kwargs['query']),'debug') return func(*args,**kwargs) return wrapper @with_debug def execute(self,*args,**kwargs): if 'query' not in kwargs: printea("Warning execute called whithout query",'info') return None try: return self.cur.execute(kwargs['query']) except mdb.OperationalError as e: printea('({}) Operational error on mysql, error is: {}'.format(self.t,e),'error') self.init_db() except Exception as e: raise Exception('Error executing: Error=({}) Query=({}) '.format(str(e),kwargs['query'])) def init_db(self): try: self.conn = mdb.connect(IP,USER,PASS,DBNAME) self.conn.autocommit(False) self.cur = self.conn.cursor() self.cur.execute("SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED") variables=self.check_server_variables() if variables: self.mon.server_variables=variables printea("Connected succesfully {} thread_id={}".format(self.t,self.conn.thread_id()),'info') except mdb.Error as e: printea("Error {}: {}".format(e.args[0],e.args[1])) raise Exception(e) def check_server_variables(self): if self.t != 'main': return None else: printea('Getting server variables','info') result = {} self.cur.execute("show global variables") printea('Setting {} vars'.format(self.cur.rowcount),'debug') if self.cur.rowcount > 0: for i in range(self.cur.rowcount): var_name, var_value = self.cur.fetchone() result.setdefault(var_name,var_value) return result def get_config(self): values={} self.cur.execute('select name,value from Config') for val in self.cur.fetchall(): values.setdefault(val[0],val[1]) return values @with_retry def put_config(self,*args,**kwargs): values = kwargs.get('values') vals=[] for x in values.keys(): vals.append("('{}','{}')".format(x,values[x])) try: self.execute(query="insert into Config(`name`, `value`) values {} on duplicate key update name=VALUES(name), value=VALUES(value);".format(','.join(vals))) self.conn.commit() except Exception as e: printea(e,'error') @with_retry def check_temporary_tables_size(self,*args,**kwargs): table_name=kwargs.get('table_name') if self.t != 'load': return None else: size = 0 rows = 0 self.execute(query='select floor((data_length + index_length) / 1048576) `size`,table_rows from information_schema.TABLES where table_schema = "analytics" and table_name = "{}";'.format(table_name)) res=self.cur.fetchone() size = float(res[0]) rows = int(res[1]) return (int(size),rows) def close_db(self): if self.conn: self.conn.close() printea("Closed connection {}".format(self.t),'info') def reduce_flavour(self,version,flavour): if version == '15': if 'server' not in flavour and 'client' not in flavour and 'desktop' in flavour: return 'desktop' elif 'server' in flavour: return 'server' elif 'client' in flavour: return 'client' elif version == '16' or version == '19': if 'server' in flavour: return 'server' elif 'client' in flavour: return 'client' elif 'desktop' in flavour: return 'desktop' return 'other' def reduce_version(self,version): if version[0:2] in ['15','16','19']: return version[0:2] else: return 'other' def gen_uuid(self,*args,**kwargs): try: string=to_utf(u'-'.join([str(x) for x in args])) h=hashlib.sha1(string) return int(h.hexdigest()[0:16],16) except Exception as e: printea(traceback.format_exc(),'critical') @timed @with_retry def get_client(self,*args,**kwargs): try: query="SELECT id,date,user,version,sabor,arch,mem,vga,cpu,ncpu,ltsp,mode from tmp_clients where status=1 LIMIT {}".format(int(self.mon.select_window)) self.execute(query=query) ret =[] if self.cur.rowcount > 0: for i in range(self.cur.rowcount): v_id,v_date,v_user,v_version,v_flavour,v_arch,v_mem,v_vga,v_cpu,v_ncpu,v_ltsp,v_mode=self.cur.fetchone() version=self.reduce_version(v_version) flavour=self.reduce_flavour(version,v_flavour) uuid = self.gen_uuid(v_date.month,v_date.year,v_user,v_version,v_flavour,v_mode) if not v_arch: v_arch = 'NULL' if not v_mem: v_mem = 'NULL' if not v_vga: v_vga = 'NULL' if not v_cpu: v_cpu = 'NULL' if not v_ncpu: v_ncpu = 'NULL' if v_ltsp == '1' or v_ltsp == True: v_ltsp = 'TRUE' elif v_ltsp == '0' or v_ltsp == False: v_ltsp = 'FALSE' else: v_ltsp = 'NULL' if not v_mode: v_mode='NULL' else: v_mode="'"+v_mode+"'" ret.append({'uuid':uuid,'id':v_id,'date':v_date,'uid':v_user,'version':version,'flavour':flavour,'rversion':v_version,'rflavour':v_flavour,'arch':v_arch,'mem':v_mem,'vga':v_vga,'cpu':v_cpu,'ncpu':v_ncpu,'ltsp':v_ltsp,'mode':v_mode}) return ret else: return True except Exception as e: raise Exception("Error getting client: {}".format(e)) @timed @with_retry def get_apps(self,*args,**kwargs): if 'clients' not in kwargs: printea("Warning executed without named parameter clients",'info') return None ret = {} try: cids_to_rel_fla={} for c in kwargs['clients']: if c['id'] not in cids_to_rel_fla: cids_to_rel_fla[c['id']]=(c['version'],c['flavour']) if c['flavour'] not in ret: ret[c['flavour']]={'apps':[],'clients':[],'timestamp':None} for c in kwargs['clients']: ret[c['flavour']]['clients'].append(c) query = "SELECT client,date,app,value FROM tmp_packages WHERE client IN ({}) ".format(','.join(map(str,cids_to_rel_fla.keys()))) self.execute(query=query) if self.cur.rowcount > 0: for i in range(self.cur.rowcount): row = self.cur.fetchone() clid = row[0] rel,fla = cids_to_rel_fla[clid] uuid = self.gen_uuid(row[1].month,row[1].year,rel,fla,row[2]) ret[fla]['apps'].append({'uuid':uuid,'release':rel,'flavour':fla,'id':clid,'date':row[1],'app':row[2],'value':row[3]}) return ret except Exception as e: raise Exception("Error getting apps: {}".format(e)) @timed @with_retry def put_client(self,*args,**kwargs): if 'client_list' not in kwargs: raise Exception("Called without named parameter client_list") values= [] for cli in kwargs['client_list']: values.append("({},'{}','{}','{}','{}','{}','{}','{}',{},'{}','{}',{},{},{})".format(cli['uuid'],cli['date'].strftime('%Y-%m-%d'),cli['uid'],cli['rversion'],cli['rflavour'],cli['version'],cli['flavour'],cli['arch'],cli['mem'],cli['vga'],cli['cpu'],cli['ncpu'],cli['ltsp'],cli['mode'])) query = "INSERT INTO Client_Versions(`uuid`,`date`,`Client_uid`,`string_release`,`string_flavour`,`Releases_name`,`Flavours_name`,`arch`,`mem`,`vga`,`cpu`,`ncpu`,`ltsp`,`mode`) VALUES {} on duplicate key update uuid=uuid".format(','.join(map(str,values))) self.execute(query=query) return True @timed @with_retry def put_apps(self,*args,**kwargs): if 'apps' not in kwargs: raise Exception("Called without named parameter apps") app_list = {} for app in kwargs['apps']: #uuid = self.gen_uuid(app['date'].month,app['date'].year,app['release'],app['flavour'],app['app']) if str(app['uuid']) not in app_list: app_list[str(app['uuid'])]={'uuid':app['uuid'],'date':app['date'],'release':app['release'],'flavour':app['flavour'],'app':app['app'],'value':app['value']} else: app_list[str(app['uuid'])]['value']+=app['value'] values = [] for app in app_list: item=app_list[app] values.append("({},'{}','{}','{}','{}',{})".format(item['uuid'],item['date'].strftime('%Y-%m-%d'),item['release'],item['flavour'],item['app'],item['value'])) query = "INSERT INTO RecvPackages(`uuid`,`date`,`Releases_name`,`Flavours_name`,`string`,`count`) VALUES {} ON DUPLICATE KEY UPDATE count = count + VALUES(count)".format(','.join(map(str,values))) self.execute(query=query) return True @timed @with_retry def del_client(self,*args,**kwargs): if 'client_list' not in kwargs: raise Exception("Called without named parameter client_list") query = "DELETE FROM tmp_clients WHERE id IN ({}) and status=1".format(','.join(map(str,kwargs['client_list']))) self.execute(query=query) return True @timed @with_retry def del_apps(self,*args,**kwargs): if 'client_list' not in kwargs: raise Exception("Called without named parameter client_list") query = "DELETE FROM tmp_packages WHERE client IN ({})".format(','.join(map(str,kwargs['client_list']))) self.execute(query=query) return True @timed def reset_autoinc(self): try: query = "SELECT AUTO_INCREMENT FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = '{}' AND TABLE_NAME = 'tmp_packages'".format(DBNAME) self.execute(query=query) ainc = self.cur.fetchone()[0] #query = "SELECT count(*) from tmp_clients" #self.execute(query=query) #cli_size=self.cur.fetchone()[0] #query = "SELECT count(*) from tmp_packages" #self.execute(query=query) #pkg_size=self.cur.fetchone()[0] #printea("Cleaning memory tables: autoinc={} count_tmp_clients={} count_tmp_packages={} queues_empty={}".format(ainc,cli_size,pkg_size,self.mon.all_queues_empty()),'debug') if ainc > 65500 and self.mon.all_queues_empty(): #and cli_size == 0 and pkg_size == 0: query = "TRUNCATE TABLE tmp_clients" self.execute(query=query) query = "TRUNCATE TABLE tmp_packages" self.execute(query=query) return True except Exception as e: raise Exception("Error reseting auto_increment: {}".format(e)) @timed def process_main_thread(self,*args,**kwargs): if self.mon.slow_check_clients > 0 : self.mon.slow_check_clients -= 1 time.sleep(1) return True else: self.mon.slow_check_clients = self.mon.default_slow_check_clients clis=self.get_client(mon=self.mon) if clis == True: #No clients found (empty) self.empty = True # if > 65500 auto_increment was reset self.mon.schedule(event='NO_MORE_CLIENTS') return False #clients found self.mon.schedule(event='HAVE_CLIENTS',nclients=len(clis)) # if clients found get apps lapps = self.get_apps(clients=clis,mon=self.mon) #lapps can be empty self.del_client(client_list=[cli['id'] for cli in clis],error_func=self.mon.term) #If deletion was failed , thread was died lapps_tmp={'apps':[],'clients':[]} for fla in lapps: if THREADED: lapps[fla]['timestamp']=time.time() self.mon.db[fla].q.put(lapps[fla],True) else: lapps_tmp['apps'].extend(lapps[fla]['apps']) lapps_tmp['clients'].extend(lapps[fla]['clients']) self.mon.db['main'].q.put(lapps_tmp,True) #if DEBUG: self.processed+=len(clis) return True @timed def process_all_threads(self,*args,**kwargs): lapps=self.q.get(True) #print "Running {}".format(self.t) if THREADED: while (lapps['timestamp'] > self.mon.commited): time.sleep(0.001) if len(lapps['clients']) != 0: printea('Thread {} putting client'.format(self.t),'debug') #IF FAIL, AFTER RETRIES THREAD DIES if not self.put_client(client_list=lapps['clients'],mon=self.mon): self.q.put(lapps,True) # USELESS return False # USELESS if len(lapps['apps']) != 0: printea('Thread {} putting clientapps'.format(self.t),'debug') if not (self.put_apps(apps=lapps['apps'],mon=self.mon)): self.q.put(lapps,True) # USELESS return False # USELESS if not self.del_apps(client_list=[ cli['id'] for cli in lapps['clients']],mon=self.mon): self.q.put(lapps,True) # USELESS return False # USELESS #if DEBUG: self.processed+=len(lapps['clients']) return True def process(self,*args,**kwargs): keepalive(self.t) # warning too much verbose, printea("Running thread {}".format(self.t),'debug') if self.t == 'main' and not self.mon.terminate: ret=self.process_main_thread(*args,**kwargs) if ret == False: #No more clients available return True if ret == True: #Clients was put on queues, need process more without waiting, main queue always empty if THREADED: return True #Need return to avoid put empty flag and wait into main thread #after this poing, code for all threads if not self.q.empty(): ret=self.process_all_threads(*args,**kwargs) if ret == False: # USELESS? THREADS was died printea("Error threads") return ret else: del self.q self.q = queue.Queue() self.empty = True return True def worker(self): printea("Starting worker {} processing".format(self.t),'info') while not (self.mon.terminate and self.empty): if self.mon.paused or self.empty: #if self.empty and self.t == 'main': # self.reset_autoinc() if self.mon.paused: printea("Paused by high load {}".format(self.t),'debug') # Too much verbose #if self.empty: # printea("Empty queue {} sleeping by now".format(self.t),'debug') if self.empty: self.empty = False time.sleep(EMPTY_PAUSED_SLEEP) else: try: if self.conn == None: if not self._initialized: self._initialized = True raise EnvironmentError('Initializing connector {}'.format(self.t)) else: raise Warning('Connection not available') else: self.conn.begin() if self.process(): self.conn.commit() self.mon.commited=time.time() self.reconnect = 0 if self.need_clean: gc.collect() else: self.conn.rollback() except EnvironmentError as e: printea(e,'info') self.init_db() except Warning as e: printea(e,'warning') self.init_db() except Exception as e: try: if self.conn != None: self.conn.rollback() except: printea("Can't rollback last actions",'info') pass #if e[0] != 2006: # printea("Exception processing worker({}): {}".format(self.t,e)) #if e[0] == 2006: # SERVER GONE AWAY # printea("Worker ({}) detected server gone away !!!: {}".format(self.t,e)) printea("Trying to recover connection ({}), {}".format(self.t,e)) if self.reconnect == 100: printea("Worker ({}) says: lost connection to database, reconnection not possible, terminating all processes".format(self.t)) self.mon.term() else: self.reconnect+=1 printea('Reconnect({}) to mysql sleeping({})'.format(self.reconnect,self.reconnect*self.reconnect)) time.sleep(self.reconnect*self.reconnect) try: self.init_db() printea("Recovered worker {} connection".format(self.t),'info') except: printea('Unable to initialize worker {}'.format(self.t)) pass class Monitor(): def __init__(self): self.MAX_QUEUE_UTILIZATION = 100 self.USE_MAX_QUEUES = True self.MAX_SELECT_WINDOW = (2 ** 13) +1 self.MIN_SELECT_WINDOW = 32 self.MEM_USED=0 self.MAX_MEM=512 self.MIN_FREE_MEM_SERVER=100 self.USE_MAX_MEM=True self.lock = Lock() self.terminate = False self.finished = False self.paused = False self.select_window = self.MIN_SELECT_WINDOW self.commited = time.time() self.procesed = 0 self.procesed_per_sec = [0]*10 self.load = 0 self.server_variables = None # initialized by main worker self.temporary_tables_size = { 'clients' : 0.0, 'packages':0.0 ,'sum': 0.0 } self.temporary_tables_rows = { 'clients': 0, 'packages' : 0 , 'sum': 0 } self.db_tables_size = { 'clients' : 0.0, 'packages':0.0 ,'sum': 0.0 } self.db_tables_rows = { 'clients': 0, 'packages' : 0 , 'sum': 0 } self.default_slow_check_clients = 1 self.slow_check_clients = self.default_slow_check_clients self.server_mem = 0 self.loadlist = [ 0.0 ] * 100 self.max_heap = None self.heap_alert_count = 0 self.last_events = [] signal.signal(signal.SIGQUIT,self.term) signal.signal(signal.SIGTERM,self.term) signal.signal(signal.SIGINT,self.term) self.db = {} self.threads = {} for x in THREADS: self.db[x] = DB(self,x) #try: # self.db[x].init_db() #except Exception as e: # printea('Error initializing database connections: {}'.format(str(e))) # sys.exit(1) self.cfg= None printea("Monitor initialized with {} threads".format(len(THREADS)),'info') def windowctl(self, *args, **kwargs): if args[0] == '+': if self.select_window*2 < self.MAX_SELECT_WINDOW: self.select_window*=2 self.select_window=int(self.select_window) if args[0] == '-': if self.select_window > self.MIN_SELECT_WINDOW: self.select_window/=2 self.select_window=int(self.select_window) def slowcheckctl(self, *args, **kwargs): if args[0] == 'reset': self.default_slow_check_clients = 0 if args[0] == '+': if self.default_slow_check_clients < MAX_SLOW_CHECK_CLIENTS: if self.default_slow_check_clients == 0: self.default_slow_check_clients = 1 else: self.default_slow_check_clients = self.default_slow_check_clients * 2 if args[0] == '-': if self.default_slow_check_clients > 0: self.default_slow_check_clients = self.default_slow_check_clients / 2 def append_event_log(self,*args,**kwargs): if DEBUG: if USE_FILE_EVENT_LOG: try: fp = open(FILE_EVENT_LOG,'a') except: fp = False else: fp = False separator=':' for x in args: the_time=str(int(time.time())) the_value=str(x) the_string=the_time+separator+the_value self.last_events.append(the_string) if fp: fp.write('{}\n'.format(the_string)) if fp: fp.close() def schedule(self, *args, **kwargs): if kwargs['event'] == 'NO_MORE_CLIENTS': self.append_event_log(kwargs['event']) self.windowctl('-') self.slowcheckctl('+') if kwargs['event'] == 'HAVE_CLIENTS': self.append_event_log(kwargs['event']) if kwargs['nclients'] == self.select_window: self.windowctl('+') else: self.windowctl('-') self.slowcheckctl('reset') # # if self.USE_MAX_QUEUES and self.sum_q_sizes() > self.MAX_QUEUE_UTILIZATION: # self.windowctl('+') # # if self.USE_MAX_MEM and self.MEM_USED > self.MAX_MEM: # self.windowctl('-') # self.slowcheckctl('+') # else: # self.windowctl('+') # # elif kwargs['nclients'] < self.select_window/2: # self.windowctl('-') # # self.slowcheckctl('reset') if kwargs['event']=='CHECK_SANITY': # CPU SETTINGS if not self.terminate and self.load > LIMIT: self.append_event_log('LOAD_LIMIT_REACHED') self.paused = True else: self.paused = False # DB MEM SETTINGS if self.max_heap and self.temporary_tables_size['sum']: if self.temporary_tables_size['sum'] > self.max_heap * 0.3: self.heap_alert_count += 1 self.append_event_log('MAX_HEAP_ALERT') self.windowctl('+') self.slowcheckctl('-') if self.heap_alert_count > 50: self.append_event_log('EXTREME_HEAP_RULES') self.slowcheckctl('reset') if self.paused: #printea('Hitting max temporary table size unpausing','critical') self.paused = False else: self.heap_alert_count=0 # SERVER MEM if self.server_mem and self.server_mem < self.MIN_FREE_MEM_SERVER: self.append_event_log('SERVER_MEM_ALERT') #printea('Hitting max memory from server collecting and reducing window','critical') self.windowctl('-') self.slowcheckctl('+') self.USE_MAX_QUEUES=True for x in THREADS: self.db[x].need_clean=True gc.collect() else: # self.USE_MAX_QUEUES=False for x in THREADS: self.db[x].need_clean=False if self.USE_MAX_MEM and self.MEM_USED > self.MAX_MEM: self.append_event_log('MAX_MEM_ALERT') self.windowctl('-') self.slowcheckctl('+') # QUEUES if self.USE_MAX_QUEUES and self.sum_q_sizes() > self.MAX_QUEUE_UTILIZATION: self.append_event_log('MAX_QUEUES_REACHED') self.windowctl('+') def get_cpu_load(self): self.loadlist.append(psutil.cpu_percent()) self.loadlist=self.loadlist[1:] avg=0.0 for x in self.loadlist: avg+=x return round(avg/100.0,2) def term(self,*args,**kwargs): printea("Begin kill the program, wait please...",'info') self.terminate=True def prepare_threads(self): global DAEMON self.threads['load']=Process(target=self.get_load) self.threads['load'].daemon = DAEMON self.threads['load'].start() for x in THREADS: self.threads[x]=Process(target=self.db[x].worker) self.threads[x].daemon = DAEMON self.threads[x].start() def get_mem_usage(self): process = psutil.Process(os.getpid()) try: mem = process.memory_info()[0] / float(2 ** 20) except: mem = process.get_memory_info()[0] / float(2 ** 20) return mem def get_server_free_mem(self): mem = psutil.virtual_memory() return mem.free / (2 ** 20) def print_stats(self): global CHECK_LOAD_TIME # if DEBUG: if True: out="Processed: " out2='' total=0 for x in THREADS: out2+='{}={} '.format(x,self.db[x].processed) self.cfg.store('processed '+x,self.db[x].processed) if THREADED: if x != 'main': total += self.db[x].processed else: total += self.db[x].processed out += "TOTAL={} {}".format(total,out2) self.cfg.store('processed total',total) proc_per_sec=int((total-self.procesed)/CHECK_LOAD_TIME) self.procesed_per_sec.append(proc_per_sec) self.procesed_per_sec=self.procesed_per_sec[-10:] f=lambda x,y:x+y suma_proc=reduce(f,self.procesed_per_sec) self.cfg.store('processing at',int(suma_proc/10)) self.procesed=total total = 0 if THREADED: out+="Queues: " sizes=self.get_q_sizes() out2='' for x in sizes: self.cfg.store('queue '+x,sizes[x]) out2+='{}={} '.format(x,sizes[x]) total += sizes[x] self.cfg.store('queue totals',total) out+="TOTAL={} {}".format(total,out2) self.cfg.store('select window',self.select_window) self.cfg.store('mem used',self.MEM_USED) self.cfg.store('load',self.load) if (self.paused): self.cfg.store('paused',1) else: self.cfg.store('paused',0) self.cfg.store('temp_clients_size',int(self.temporary_tables_size['clients'])) self.cfg.store('temp_packages_size',int(self.temporary_tables_size['packages'])) self.cfg.store('temp_clients_rows',int(self.temporary_tables_rows['clients'])) self.cfg.store('temp_packages_rows',int(self.temporary_tables_rows['packages'])) self.cfg.store('db_clients_size',int(self.db_tables_size['clients'])) self.cfg.store('db_packages_size',int(self.db_tables_size['packages'])) self.cfg.store('default_slow_check_clients',int(self.default_slow_check_clients)) self.cfg.store('slow_check_clients',int(self.slow_check_clients)) if DEBUG: if USE_FILE_EVENT_DB: max_event=len(self.last_events) if max_event > 20: max_event = 20 for i in range(0,max_event): self.cfg.store('event{:02d}'.format(i),self.last_events[i]) self.last_events=self.last_events[-20:] if THREADED: printea("{} {}SelectSize={} ProcessingAt={} Mem={} Load={} Paused={} Tsize={} Trows={}".format(time.strftime("%Y-%m-%d %H:%M:%S",time.gmtime()),out,self.select_window,int(suma_proc/10),int(self.MEM_USED),self.load,self.paused,self.temporary_tables_size['sum'],self.temporary_tables_rows['sum']),'info') #sys.stdout.write("{}{} {}SelectSize={} ProcessingAt={} Mem={}\n".format('\r'*100,time.strftime("%Y-%m-%d %H:%M:%S",time.gmtime()),out,self.select_window,int(suma_proc/10),int(self.MEM_USED))) else: printea("{} {}SelectSize={} ProcessingAt={} Mem={} Load={} Paused={} Tsize={} Trows={}".format(time.strftime("%Y-%m-%d %H:%M:%S",time.gmtime()),out,self.select_window,int(suma_proc/10),int(self.MEM_USED),self.load,self.paused,self.temporary_tables_size['sum'],self.temporary_tables_rows['sum']),'info') #sys.stdout.write("{}{} {}SelectSize={} ProcessingAt={} Mem={}\n".format('\r'*100,time.strftime("%Y-%m-%d %H:%M:%S",time.gmtime()),out,self.select_window,int(suma_proc/10),int(self.MEM_USED))) def get_q_sizes(self): sizes={} for x in THREADS: sizes[x]=self.db[x].q.qsize() return sizes def sum_q_sizes(self): sizes=self.get_q_sizes() f=lambda x,y: x+y return reduce(f,[sizes[x] for x in sizes]) def all_queues_empty(self): sizes=self.get_q_sizes() for x in sizes: if sizes[x] != 0: return False return True def get_load(self): # runs on separate thread db = DB(self,'load') db.init_db() self.cfg = Config(db) ctime=0 while not (self.terminate and self.finished): #and not self.paused self.cfg.store('keepalive',int(time.time())) time.sleep(1) self.load=self.get_cpu_load() ctime+=1 self.schedule(event='CHECK_SANITY') if ctime >CHECK_LOAD_TIME: self.cfg.write() ctime=0.0 self.MEM_USED=self.get_mem_usage() self.server_mem=self.get_server_free_mem() if self.server_variables and 'max_heap_table_size' in self.server_variables: self.max_heap=int(self.server_variables['max_heap_table_size'])/(2**20) if self.max_heap < WARNING_MIN_HEAP: printea("******************** MIN HEAP IS TOO SMALL -> {} < {} ********************".format(self.max_heap,WARNING_MIN_HEAP),'warning') self.temporary_tables_size['clients'],self.temporary_tables_rows['clients']=db.check_temporary_tables_size(mon=self,table_name='tmp_clients') self.temporary_tables_size['packages'],self.temporary_tables_rows['packages']=db.check_temporary_tables_size(mon=self,table_name='tmp_packages') self.db_tables_size['clients'],self.db_tables_rows['clients']=db.check_temporary_tables_size(mon=self,table_name='Client_Versions') self.db_tables_size['packages'],self.db_tables_rows['packages']=db.check_temporary_tables_size(mon=self,table_name='RecvPackages') self.temporary_tables_size['sum'] =self.temporary_tables_size['clients']+self.temporary_tables_size['packages'] self.temporary_tables_rows['sum'] =self.temporary_tables_rows['clients']+self.temporary_tables_rows['packages'] self.db_tables_size['sum'] =self.db_tables_size['clients']+self.db_tables_size['packages'] self.db_tables_rows['sum'] =self.db_tables_rows['clients']+self.db_tables_rows['packages'] qempty = self.all_queues_empty() if self.temporary_tables_rows['clients'] == 0 and self.temporary_tables_rows['packages'] == 0 and qempty: db.reset_autoinc() self.print_stats() #self.load=os.getloadavg()[0] if qempty: gc.collect() #end if #end while db.close_db() def start(self): self.prepare_threads() def end(self): for x in self.db: self.threads[x].join() self.db[x].close_db() self.finished = True self.print_stats() class Config(): def __init__(self,connection): self._db = connection self.read() def store(self,var,value): var=var.replace(' ','_') if isinstance(value,str): if value.isnumeric(): setattr(self,var,int(value)) else: setattr(self,var,str(value)) else: setattr(self,var,int(value)) def write(self): values={} for x in self.get_internal_vars(): values.setdefault(str(x)[:20],str(getattr(self,x))[:45]) if self._db: self._db.put_config(values=values) the_string="CONFIGURATION\n" if USE_CONFIG_FILE_LOG: fnbuild = lambda kv : str(kv[0])+'='+str(kv[1])+"\n" the_string+=reduce((lambda x,y: x+y) ,map(fnbuild,values.items())) try: with open(CONFIG_FILE_LOG,'w') as fp: fp.write(the_string) except: pass def read(self): if self._db: config=self._db.get_config() for key in config.keys(): if config[key].isnumeric(): setattr(self,key,int(config[key])) else: setattr(self,key,config[key]) else: printea('No config yet') def get_internal_vars(self): return list(filter(lambda x : x[0] != '_',self.__dict__.keys())) def print(self): for v in self.get_internal_vars(): print('{} = {}'.format(v,getattr(self,v))) def main(*args,**kwargs): gc.enable() if DAEMON: fp = open('/var/run/analyticsd.pid','w'); fp.write(str(os.getpid())) fp.close() m = Monitor() m.start() printea("start done",'info') while not m.terminate: time.sleep(0.5) m.end() printea("Exitting...",'info') if __name__ == "__main__": exit = 0 keyword='analyticsd' interpreter='python3' for proc in psutil.process_iter(): a=False b=False try: cmd=proc.cmdline() except: cmd=proc.cmdline for argument in cmd: #print('{} {} {}'.format(proc.cmdline,keyword,argument[-len(keyword):])) if interpreter in argument[-len(interpreter):]: a = True if keyword in argument[-len(keyword):]: b = True if a and b: exit = exit +1 if exit > 1: printea('Another daemon is running','error') sys.exit(1) lck = '/var/run/analyticsd' if DAEMON: if os.path.isfile(lck+'.lock'): printea('Lockfile {} detected, unable to start'.format(lck),'error') sys.exit(1) else: try: with daemon.DaemonContext(detach_process=True,working_directory='/tmp',umask=0o002,pidfile=lockfile.FileLock(lck),files_preserve=[logger.handlers[0].socket.fileno()]): main() except Exception as e: printea(e) sys.exit(1) else: main()