#The name of the main class must match the file name in lowercase import os import urllib import shutil import gi from gi.repository import Gio gi.require_version ('Snapd', '1') from gi.repository import Snapd gi.require_version('AppStreamGlib', '1.0') from gi.repository import AppStreamGlib as appstream import time #Needed for async find method, perhaps only on xenial wrap=Gio.SimpleAsyncResult() class snapmanager: def __init__(self): self.dbg=False self.progress=0 self.partial_progress=0 self.plugin_actions={'install':'snap','remove':'snap','pkginfo':'snap','load':'snap'} self.cache_dir="/var/lib/lliurexstore/bundles/cache" self.icons_folder=self.cache_dir+"/icons" self.result={} self.result['data']={} self.result['status']={} self.disabled=False self.icon_cache_enabled=True self.cli_mode=False if not os.path.isdir(self.icons_folder): try: os.makedirs(self.icons_folder) except: self.icon_cache_enabled=False self._debug("Icon cache disabled") #def __init__ def set_debug(self,dbg=True): self.dbg=dbg self._debug ("Debug enabled") #def set_debug def _debug(self,msg=''): if self.dbg: print ('DEBUG snap: '+msg) #def debug def register(self): return(self.plugin_actions) def enable(self,state=False): self.disabled=state def execute_action(self,action,applist=None,store=None,results=0): if store: self.store=store else: self.store=appstream.Store() self.progress=0 self.result['status']={'status':-1,'msg':''} self.result['data']='' if self.disabled==True: self._set_status(9) self.result['data']=self.store else: self.snap_client=Snapd.Client() self.snap_client.connect_sync(None) dataList=[] if action=='load': self.result['data']=self._load_snap_store(self.store) else: for app_info in applist: self.partial_progress=0 if action=='install': dataList.append(self._install_snap(app_info)) if action=='remove': dataList.append(self._remove_snap(app_info)) if action=='pkginfo': dataList.append(self._get_info(app_info)) self.progress+=round(self.partial_progress/len(applist),1) if self.progress>98: self.progress=98 self.result['data']=list(dataList) self.progress=100 return(self.result) def _set_status(self,status,msg=''): self.result['status']={'status':status,'msg':msg} #def _set_status def _callback(self,client,change, _,user_data): # Interate over tasks to determine the aggregate tasks for completion. total = 0 done = 0 for task in change.get_tasks(): total += task.get_progress_total() done += task.get_progress_done() self.progress = round((done/total)*100) #def _callback def _load_snap_store(self,store): pkgs=[] if self.cli_mode: pkgs=self._search_snap("*") else: pkgs=self._search_snap_async("*") self._set_status(1) for pkg in pkgs: store.add_app(self._generate_appstream_app(pkg)) return(store) def _generate_appstream_app(self,pkg): bundle=appstream.Bundle() app=appstream.App() #F*****g appstream have kinds undefined but kind_from_string works... wtf? # bundle.set_kind(appstream.BundleKind.SNAP) bundle.set_kind(bundle.kind_from_string('SNAP')) bundle.set_id(pkg.get_name()+'.snap') app.add_bundle(bundle) app.set_name("C",pkg.get_name()) app.add_pkgname(pkg.get_name()+'.snap') app.add_category("Snap") release=appstream.Release() release.set_version(pkg.get_version()) app.add_release(release) app.set_id("io.snapcraft.%s"%pkg.get_name()+'.snap') # app.set_id(pkg.get_name()+'.snap') app.set_id_kind=appstream.IdKind.DESKTOP description=("This is an Snap bundle of app %s. It hasn't been tested by our developers and comes from a 3rd party dev team. Please use it carefully."%pkg.get_name()) app.set_description("C",description+'
'+pkg.get_description()) app.set_comment("C",pkg.get_summary()) app.add_keyword("C",pkg.get_name()) for word in pkg.get_summary().split(' '): app.add_keyword("C",word) if pkg.get_icon(): app.set_icon_path(pkg.get_icon()) return(app) def _search_cb(self,obj,request,*args): global wrap wrap=request def _search_snap_async(self,tokens): self._debug("Async Searching %s"%tokens) pkgs=None global wrap self.snap_client.find_async(Snapd.FindFlags.MATCH_NAME, tokens, None, self._search_cb,(None,),None) while 'Snapd' not in str(type(wrap)): time.sleep(0.1) snaps=self.snap_client.find_finish(wrap) if type(snaps)!=type([]): pkgs=[snaps] else: pkgs=snaps stable_pkgs=[] for pkg in pkgs: if pkg.get_channel()=='stable': stable_pkgs.append(pkg) return(stable_pkgs) def _search_snap(self,tokens): self._debug("Searching %s"%tokens) pkg=None try: pkgs=self.snap_client.find_sync(Snapd.FindFlags.MATCH_NAME,tokens,None,None) except Exception as e: print(e) self._set_status(1) stable_pkgs=[] for pkg in pkgs: if pkg.get_channel()=='stable': stable_pkgs.append(pkg) self._debug("Done") return(stable_pkgs) #def _search_snap def _download_file(self,url,app_name): target_file=self.icons_folder+'/'+app_name+".png" if not os.path.isfile(target_file): self._debug("Downloading %s to %s"%(url,target_file)) try: with urllib.request.urlopen(url) as response, open(target_file, 'wb') as out_file: bf=16*1024 acumbf=0 file_size=int(response.info()['Content-Length']) while True: if acumbf>=file_size: break shutil.copyfileobj(response, out_file,bf) acumbf=acumbf+bf st = os.stat(target_file) os.chmod(target_file, st.st_mode | 0o111) except Exception as e: self._debug("Unable to download %s"%url) self._debug("Reason: %s"%e) return(target_file) #def _download_file def _get_info(self,app_info): #switch to launch async method when running under a gui #For an unknown reason request will block when sync mode under a gui and async blocks when on cli (really funny) if self.cli_mode: pkgs=self._search_snap(app_info['name']) else: pkgs=self._search_snap_async(app_info['name']) if type(pkgs)==type([]): for pkg in pkgs: if pkg.get_download_size(): app_info['size']=str(pkg.get_download_size()) else: app_info['size']=str(pkg.get_installed_size()) if pkg.get_icon(): if self.icon_cache_enabled: app_info['icon']=self._download_file(pkg.get_icon(),app_info['name']) else: app_info['icon']=pkg.get_icon() if pkg.get_status(): #Not working on xenial # if pkg.get_status()==Snapd.SnapStatus.INSTALLED or pkg.get_status()==3: # app_info['state']='installed' try: self.snap_client.list_one_sync(pkg.get_name()) app_info['state']='installed' except: app_info['state']='available' if pkg.get_screenshots(): screenshot_list=[] for screen in pkg.get_screenshots(): screenshot_list.append(screen.get_url()) app_info["screenshots"]=screenshot_list #Method not working in xenial, license default type assigned in infoManager # if pkg.get_license(): # app_info["license"]=pkg.get_license() break else: app_info['size']='0' self._debug("Info for %s"%app_info) self.partial_progress=100 return(app_info) #def _get_info def _install_snap(self,app_info): self._debug("Installing %s"%app_info['name']) if app_info['state']=='installed': self._set_status(4) else: try: self.snap_client.install_sync(app_info['name'], None, # channel self._callback, (None,), None) # cancellable app_info['state']='installed' self._set_status(0) except Exception as e: print("Install error %s"%e) self._set_status(5) self._debug("Installed %s"%app_info) return app_info #def _install_snap def _remove_snap(self,app_info): if app_info['state']=='available': self._set_status(3) else: try: self.snap_client.remove_sync(app_info['name'], self._callback, (None,), None) # cancellable app_info['state']='available' self._set_status(0) except Exception as e: print("Remove error %s"%e) self._set_status(6) return app_info #def _remove_snap