#The name of the main class must match the file name in lowercase import urllib import shutil import json import os from subprocess import call import sys import threading from bs4 import BeautifulSoup import random import time import gi from gi.repository import Gio gi.require_version('AppStreamGlib', '1.0') from gi.repository import AppStreamGlib as appstream class appimagemanager: def __init__(self): self.dbg=False self.progress=0 self.partial_progress=0 self.plugin_actions={'install':'appimage','remove':'appimage','pkginfo':'appimage','load':'appimage'} self.result={} self.result['data']={} self.result['status']={} self.bundles_dir="/var/lib/lliurexstore/bundles" self.bundle_types=['appimg'] self.appimage_dir='/opt/bundles/appimg' self.repository_url='https://dl.bintray.com/probono/AppImages' #To get the description of an app we must go to a specific url. #$(appname) we'll be replaced with the appname so the url matches the right one. #If other site has other url naming convention it'll be mandatory to define it with the appropiate replacements self.info_url='https://bintray.com/probono/AppImages/$(appname)' self.disabled=False self.count=0 #def __init__ def set_debug(self,dbg=True): self.dbg=dbg self._debug ("Debug enabled") #def set_debug def _debug(self,msg=''): if self.dbg: print ('DEBUG appimage: '+msg) #def debug def register(self): return(self.plugin_actions) def enable(self,state=False): self.disable=state def execute_action(self,action,applist=None,store=None): if store: self.store=store else: self.store=appstream.Store() self.progress=0 self.result['status']={'status':-1,'msg':''} self.result['data']='' dataList=[] if self.disabled: self._set_status(9) self.result['data']=self.store else: self._chk_installDir() if action=='load': self.result['data']=self._load_appimage_store(self.store) else: for app_info in applist: self.partial_progress=0 if action=='install': dataList.append(self._install_appimage(app_info)) if action=='remove': dataList.append(self._remove_appimage(app_info)) if action=='pkginfo': dataList.append(self._get_info(app_info)) self.progress+=int(self.partial_progress/len(applist)) self.result['data']=list(dataList) self.progress=100 return(self.result) def _set_status(self,status,msg=''): self.result['status']={'status':status,'msg':msg} #def _set_status def _callback(self,partial_size=0,total_size=0): limit=99 if partial_size!=0 and total_size!=0: inc=round(partial_size/total_size,2)*100 self.progress=inc else: inc=1 margin=limit-self.progress inc=round(margin/limit,3) self.progress=(self.progress+inc) if (self.progress>limit): self.progress=limit def _chk_installDir(self): msg_status=True if not os.path.isdir(self.appimage_dir): try: os.makedirs(self.appimage_dir) except: msg_status=False return msg_status def _install_appimage(self,app_info): app_info=self._get_info(app_info) if app_info['state']=='installed': self._set_status(4) else: appimage_url=self.repository_url+'/'+app_info['package'] self._debug("Downloading "+appimage_url) dest_path=self.appimage_dir+'/'+app_info['package'] if appimage_url: try: with urllib.request.urlopen(appimage_url) as response, open(dest_path, 'wb') as out_file: bf=16*1024 acumbf=0 app_size=int(response.info()['Content-Length']) while True: if acumbf>=app_size: break shutil.copyfileobj(response, out_file,bf) acumbf=acumbf+bf self._callback(acumbf,app_size) st = os.stat(dest_path) os.chmod(dest_path, st.st_mode | 0o111) self._set_status(0) except: self._set_status(5) else: self._set_status(12) return app_info #def _install_appimage def _remove_appimage(self,app_info): self._debug("Removing "+app_info['package']) if os.path.isfile(self.appimage_dir+'/'+app_info['package']): try: call([self.appimage_dir+"/"+app_info['package'], "--remove-appimage-desktop-integration"]) except: pass try: os.remove(self.appimage_dir+"/"+app_info['package']) self._set_status(0) except: self._set_status(6) return(app_info) #def _remove_appimage def _get_info(self,app_info): app_info['state']='available' if os.path.isfile(self.appimage_dir+'/'+app_info['package']): app_info['state']='installed' #Get size appimage_url=self.repository_url+'/'+app_info['package'] dest_path=self.appimage_dir+'/'+app_info['package'] if appimage_url: try: with urllib.request.urlopen(appimage_url) as response: app_info['size']=(response.info()['Content-Length']) except: app_info['size']=0 self._set_status(0) self.partial_progress=100 return(app_info) #def _get_info def _load_appimage_store(self,store): self._download_bundles_catalogue() if os.path.exists(self.bundles_dir): for bundle_type in self.bundle_types: self._debug("Loading %s catalog"%bundle_type) store=self._generic_file_load(self.bundles_dir+'/'+bundle_type,store) return(store) #def load_bundles_catalog(self) def _generic_file_load(self,target_path,store): icon_path='/usr/share/icons/hicolor/128x128' if not os.path.isdir(target_path): os.makedirs(target_path) files=os.listdir(target_path) for target_file in os.listdir(target_path): if target_file.endswith('appdata.xml'): store_path=Gio.File.new_for_path(target_path+'/'+target_file) self._debug("Adding file "+target_path+'/'+target_file) try: store.from_file(store_path,icon_path,None) except Exception as e: self._debug("Couldn't add file "+target_file+" to store") self._debug("Reason: "+str(e)) return(store) #def _generic_file_load def _download_bundles_catalogue(self): CURSOR_UP='\033[F' ERASE_LINE='\033[K' content='' applist=[] progress_bar="#" repositories_list={'appimg':['https://dl.bintray.com/probono/AppImages']} #For get the description of an app we must go to a specific url. #$(appname) we'll be replaced with the appname so the url matches the right one. #If other site has other url naming convention it'll be mandatory to define it with the appropiate replacements info_list={'appimg':'https://bintray.com/probono/AppImages/$(appname)'} self.descriptions_dict={} for repo_type,repo_types in repositories_list.items(): info_url=info_list[repo_type] outdir=self.bundles_dir+'/'+repo_type+'/' if self._chk_bundle_dir(outdir): for repo in repo_types: self._debug(("Fetching repo %s")%(repo)) # print (("Fetching %s catalogue: "+progress_bar)%repo_type,end="\r") # progress_bar=progress_bar+"#" # print (("Fetching %s catalogue: "+progress_bar)%repo_type,end="\r") applist=self._generate_applist(self._fetch_repo(repo)) # progress_bar=progress_bar+"##" # print (("Fetching %s catalogue: "+progress_bar)%repo_type,end="\r") self._debug("Processing info...") self._th_generate_xml_catalog(applist,outdir,info_url,repo_type,progress_bar) self._debug("Fetched repo "+repo) # print (("Removing old entries...")) self._clean_bundle_catalogue(applist,outdir) else: self._debug("appImage catalogue could not be fetched: Permission denied") return(True) #def _download_bundles_catalogue def _chk_bundle_dir(self,outdir): msg_status=True if not os.path.isdir(outdir): try: os.makedirs(outdir) except: msg_status=False return(os.access(outdir,os.W_OK|os.R_OK|os.X_OK|os.F_OK)) #def _chk_bundle_dir def _fetch_repo(self,repo): with urllib.request.urlopen(repo) as f: content=(f.read().decode('utf-8')) return(content) #def _fetch_repo def _generate_applist(self,content): garbage_list=[] applist=[] garbage_list=content.split(' ') for garbage_line in garbage_list: if garbage_line.endswith('AppImage"'): app=garbage_line.replace('href=":','') applist.append(app.replace('"','')) return(applist) #def _generate_applist def _th_generate_xml_catalog(self,applist,outdir,info_url,repo_type,progress_bar=''): CURSOR_UP='\033[F' ERASE_LINE='\033[K' maxconnections = 10 semaphore = threading.BoundedSemaphore(value=maxconnections) random_applist = list(applist) random.shuffle(random_applist) len_applist=len(random_applist) inc=30/len_applist # print (CURSOR_UP) for app in random_applist: th=threading.Thread(target=self._th_write_xml, args = (app,outdir,info_url,semaphore,inc)) th.start() # os.system('setterm -cursor off') while threading.active_count()>2: #Discard both main and own threads for i in range(len(progress_bar),int(self.progress)): progress_bar='#'+progress_bar # print (CURSOR_UP) # print (("Fetching %s catalogue: "+progress_bar)%repo_type,end="\r") # os.system('setterm -cursor on') #def _th_generate_xml_catalog def _th_write_xml(self,app,outdir,info_url,semaphore,inc): semaphore.acquire() lock=threading.Lock() name_splitted=app.split('-') name=name_splitted[0] version=name_splitted[1] arch=name_splitted[2] filename=outdir+app.lower().replace('appimage',"appdata.xml") self._debug("checking if we need to download "+filename) if not os.path.isfile(filename): self._write_xml_file(filename,app,name,version,info_url,lock) with lock: self.progress=self.progress+inc semaphore.release() #def _th_write_xml def _write_xml_file(self,filename,app,name,version,info_url,lock): self._debug("Generating "+app+" xml") f=open(filename,'w') f.write(''+"\n") f.write("\n") f.write("\n") f.write(" "+app.lower()+"\n") f.write(" "+app+"\n") f.write(" "+name+"\n") f.write(" CC0-1.0\n") f.write(" "+app+"\n") f.write(" \n") f.write(" \n") f.write(" \n") f.write(" "+name+".desktop\n") with lock: try: if name in self.descriptions_dict.keys(): description=self.descriptions_dict[name] else: description=self._get_description(name,info_url) self.descriptions_dict.update({name:description}) except: description='' summary=' '.join(list(description.split(' ')[:8])) description="This is an AppImage bundle of app "+name+". It hasn't been tested by our developers and comes from a 3rd party dev team. Please use it carefully." if not summary: summary=' '.join(list(description.split(' ')[:8])) f.write("

"+description+"

\n") f.write(" "+summary+"...\n") f.write(" "+app+"\n") f.write(" \n") f.write(" "+name+"\n") f.write(" appimage\n") f.write(" \n") f.write(" \n") f.write(" AppImage\n") # f.write(" GTK\n") f.write(" \n") f.write(""+name+"_"+name+".png\n") f.write("
\n") f.write("
\n") f.close() #def _write_xml_file def _get_description(self,app_name,info_url): desc='' if '$(appname)' in info_url: info_url=info_url.replace('$(appname)',app_name) self._debug("Getting description from "+info_url) try: with urllib.request.urlopen(info_url) as f: content=(f.read().decode('utf-8')) soup=BeautifulSoup(content,"html.parser") description_div=soup.findAll('div', attrs={ "class" : "description-text"}) if len(description_div)>0: desc=description_div[0].text desc=desc.replace(':','.') desc=desc.replace('&','&') except Exception as e: print("Can't get description from "+info_url) print(str(e)) pass return(desc) #def _get_description def _clean_bundle_catalogue(self,applist,outdir): xml_files_list=[] applist=[item.lower() for item in applist] for xml_file in os.listdir(outdir): if xml_file.endswith('appdata.xml'): xml_files_list.append(xml_file.lower().replace('appdata.xml','appimage')) if xml_files_list: xml_discard_list=list(set(xml_files_list).difference(applist)) for discarded_file in xml_discard_list: os.remove(outdir+'/'+discarded_file.replace('appimage','appdata.xml')) #def _clean_bunlde_catalogue