#!/usr/bin/env python3 # -*- coding: utf-8 -*- import gi gi.require_version('Gtk', '3.0') from gi.repository import Gtk, Pango, GdkPixbuf, Gdk, Gio, GObject, GLib gi.require_version('AppIndicator3', '0.1') from gi.repository import AppIndicator3 as appindicator import os import subprocess import threading import sys import grp import pwd import re import time gi.require_version('Notify', '0.7') from gi.repository import Notify from aptdaemon import client import syslog import dbus from dbus.mainloop.glib import DBusGMainLoop import pyinotify from pyinotify import WatchManager, Notifier, ThreadedNotifier, EventsCodes, ProcessEvent import signal signal.signal(signal.SIGINT, signal.SIG_DFL) import lliurex.screensaver import gettext gettext.textdomain('lliurex-up') _ = gettext.gettext RSRC="/usr/share/lliurex-up/" SP1=RSRC+"rsrc/sp1.png" SP2=RSRC+"rsrc/sp2.png" SP3=RSRC+"rsrc/sp3.png" SP4=RSRC+"rsrc/sp4.png" SP5=RSRC+"rsrc/sp5.png" SP6=RSRC+"rsrc/sp6.png" SP7=RSRC+"rsrc/sp7.png" SP8=RSRC+"rsrc/sp8.png" TARGET_FILE="/var/run/lliurexUp.lock" DISABLE_INDICATOR_TOKEN="/etc/lliurex-up-indicator/disableIndicator.token" class UpgradeIndicator: ID="net.lliurex.UpIndicator" GROUPS=["admins","adm"] WATCH_DIR=os.path.expanduser("/var/run") def __init__(self,icon_name): self.screensaver_inhibitor = lliurex.screensaver.InhibitScreensaver() self.sp_cont=0 self.sp_img=0 self.FREQUENCY=3600 self.APT_FRECUENCY=1200 self.updatedInfo=False self.remoteUpdateInfo=False self.is_working=False self.is_cache_updated=True self.last_check=0 if os.path.exists(TARGET_FILE): self.lliurexUp_running=True else: self.lliurexUp_running=False self.menuMode() self.app_indicator=appindicator.Indicator.new(UpgradeIndicator.ID,icon_name,appindicator.IndicatorCategory.APPLICATION_STATUS) self.app_indicator.set_status(appindicator.IndicatorStatus.PASSIVE) self.app_indicator.set_title("LliureX-Up") self.menu = Gtk.Menu() self.menu.add_events(Gdk.EventMask.ALL_EVENTS_MASK) self.app_indicator.set_menu(self.menu) Notify.init(UpgradeIndicator.ID) self.populate_menu() self.start_inotify() if self.updatedInfo: self.fcache=Gio.File.new_for_path("/var/cache/apt/pkgcache.bin") self.mcache=self.fcache.monitor_file(Gio.FileMonitorFlags.NONE,None) self.mcache.connect("changed",self.on_cache_changed) GLib.timeout_add_seconds(5, self.worker) #def __init__ def menuMode(self): flavours=self.get_flavour() users_group=self.get_user_group() client=False if users_group: if 'None' not in flavours: for item in flavours: if 'client' in item: client=True break if not client: self.updatedInfo=True #def menuMode def worker(self): """ Timeout thread """ if(self.is_working==False): if self.lliurexUp_running: self.is_alive() if not os.path.exists(os.path.expanduser(DISABLE_INDICATOR_TOKEN)): if self.updatedInfo: if not self.remoteUpdateInfo: if(self.is_cache_updated==True): self.upgrade() self.last_check=0 else: self.last_check+=5 if(self.last_check>self.FREQUENCY): self.last_check=0 self.upgrade() return True #def worker def get_flavour(self): cmd='lliurex-version -v' p=subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE) result=p.communicate()[0] if type(result) is bytes: result=result.decode() flavours = [ x.strip() for x in result.split(',') ] return flavours #def get_flavour def get_user_group(self): user=os.environ["USER"] group_found=False #old groups method #for g in grp.getgrall(): # if(g.gr_name in UpgradeIndicator.GROUPS): # for member in g.gr_mem: # if(member==user): # group_found=True gid = pwd.getpwnam(user).pw_gid groups_gids = os.getgrouplist(user, gid) user_groups = [ grp.getgrgid(x).gr_name for x in groups_gids ] for g in user_groups: if (g in UpgradeIndicator.GROUPS): group_found=True return group_found #def get_user_group def populate_menu(self): label_item=_("Lliurex-Up is being executed") hbox=Gtk.HBox() img=Gtk.Image.new_from_file(SP1) img.remote=True label=Gtk.Label(label=label_item) label.remote=False hbox.pack_start(img,False,False,0) hbox.pack_end(label,False,False,0) item=Gtk.MenuItem() item.remote=True item.add(hbox) self.menu.insert(item,0) if self.updatedInfo: label_item=_("Update the system now") hbox=Gtk.HBox() img=Gtk.Image() img.set_from_icon_name("update-low",Gtk.IconSize.MENU) img.remote=False label=Gtk.Label(label=label_item) label.remote=False hbox.pack_start(img,False,False,0) hbox.pack_end(label,False,False,0) item=Gtk.MenuItem() item.remote=False item.add(hbox) item.connect("activate",self.open_gui) self.menu.insert(item,1) #def populate_menu def start_inotify(self): t=threading.Thread(target=self._inotify) t.daemon=True t.start() #def start_inotify def _inotify(self): wm=WatchManager() mask=pyinotify.IN_CREATE class Process_handler(ProcessEvent): def __init__(self,main): self.main=main def process_IN_CREATE(self,event): if os.path.expanduser(event.pathname)=='/var/run/lliurexUp.lock': self.main.lliurexUp_running=True notifier=Notifier(wm,Process_handler(self)) wdd=wm.add_watch(UpgradeIndicator.WATCH_DIR,mask,rec=True) while True: try: notifier.process_events() if notifier.check_events(): notifier.read_events() except Exception as e: notifier.stop() return False #def _inotify def remote_execute(self): cont=0 remote_pts=[] remote_users=[] isrunning=[] remote_executing=False cmd='who | grep -v "(:0"' p=subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE) result=p.communicate()[0] if type(result) is bytes: result=result.decode() remote_users=[ x.strip() for x in result.split('\n') ] remote_users.pop() for item in remote_users: tmp=re.search('(pts/\w)',item) try: remote_pts.append(tmp.group(1)) except Exception as e: pass cmd='ps -ef | grep "lliurex-upgrade" | grep -v "grep"' p=subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE) result=p.communicate()[0] if type(result) is bytes: result=result.decode() isrunning= [ x.strip() for x in result.split('\n') ] isrunning.pop() for item in isrunning: if '?' in item: cont=cont+1 for t in remote_pts: if t in item: cont=cont+1 if cont>0: remote_executing=True return remote_executing #def remote_execute def is_alive(self): self.is_working=True self.screensaver_inhibitor.inHibit() self.checkRemote=self.remote_execute() self.remoteUpdateInfo=True if self.checkRemote: self.menu.show_all() for item in self.menu.get_children(): if not item.remote: item.set_visible(False) self.app_indicator.set_status(appindicator.IndicatorStatus.ACTIVE) else: self.app_indicator.set_status(appindicator.IndicatorStatus.PASSIVE) GLib.timeout_add(100,self.check_status) return #def is_alive def check_status(self): if os.path.exists(TARGET_FILE): if self.checkRemote: self.spinner_sync() for item in self.menu.get_children(): if item.get_children()[0].get_children()[0].remote: item.get_children()[0].get_children()[0].set_from_file(self.sp_img) self.sp_cont=self.sp_cont+1 return True else: if self.checkRemote: for item in self.menu.get_children(): if item.get_children()[0].get_children()[0].remote: item.get_children()[0].get_children()[0].set_from_file(SP1) message=_("The upgrade process has ended") self.show_message(message) self.app_indicator.set_status(appindicator.IndicatorStatus.PASSIVE) time.sleep(2) self.notify.close() self.remoteUpdateInfo=False self.lliurexUp_running=False self.is_working=False self.is_cache_updated=False self.screensaver_inhibitor.unInhibit() return False #def check_status def spinner_sync(self): if self.sp_cont>80: self.sp_cont=0 if self.sp_cont==0: #img=Gtk.Image.new_from_file(SP1) self.sp_img=SP1 elif self.sp_cont==10: #img=Gtk.Image.new_from_file(SP2) self.sp_img=SP2 elif self.sp_cont==20: #img=Gtk.Image.new_from_file(SP3) self.sp_img=SP3 elif self.sp_cont==30: #img=Gtk.Image.new_from_file(SP4) self.sp_img=SP4 elif self.sp_cont==40: #img=Gtk.Image.new_from_file(SP5) self.sp_img=SP5 elif self.sp_cont==50: #img=Gtk.Image.new_from_file(SP6) self.sp_img=SP6 elif self.sp_cont==60: #img=Gtk.Image.new_from_file(SP7) self.sp_img=SP7 elif self.sp_cont==70: #img=Gtk.Image.new_from_file(SP8) self.sp_img=SP8 #def spinner_sync def on_cache_changed(self,monitor,file,other,type): """Apt cache notification event""" if(type==Gio.FileMonitorEvent.CHANGES_DONE_HINT and self.last_check>self.APT_FRECUENCY): #ignore cache updates at intervals smaller than APT_FRECUENCY self.is_cache_updated=True #def on_cache_changed def upgrade(self): """ Performs an upgrade simulation. Cache will be updated if needed. """ self.is_working=True try: apt_client=client.AptClient() if(self.is_cache_updated==False): apt_client.update_cache(wait=True) self.is_cache_updated=True transaction=apt_client.upgrade_system() transaction.simulate() #this sync is needed in order to update transaction properties after simulation #credits go to Lubuntu Team as this is completly undocumented :) transaction.sync() #transaction dependencies [] # install transaction.dependencies reinstall remove purge upgrade packages=[] cont=0 for d in transaction.dependencies: if cont<5: for p in d: packages.append(d) cont+=1 if(len(packages)>0): self.menu.show_all() for item in self.menu.get_children(): if item.remote: item.set_visible(False) self.app_indicator.set_status(appindicator.IndicatorStatus.ACTIVE) message=_("There are new packages ready to be updated or installed") self.show_message(message) except: pass self.is_working=False self.is_cache_updated=False #def upgrade def show_message(self,message): self.notify=Notify.Notification.new(_("Lliurex-Up"),message, "lliurex-up-indicator") self.notify.set_hint("transient", GLib.Variant.new_boolean(True)) self.notify.show() #def _show_message def open_gui(self,widget): if not os.path.exists(TARGET_FILE): self.app_indicator.set_status(appindicator.IndicatorStatus.PASSIVE) #self.notify.close() cmd='lliurex-up-desktop-launcher.py &' os.system(cmd) #def open_gui def quit(self): Gtk.main_quit() #def quit #class UpgradeIndicator if __name__=="__main__": lliurexup=UpgradeIndicator("lliurex-up-indicator") Gtk.main()