#!/usr/bin/python # Copyright (C) 2009 Canonical Ltd. # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License version 3, # as published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . import dbus import gobject import dbus.service import logging import os logging.basicConfig(level=logging.DEBUG) from dbus.mainloop.glib import DBusGMainLoop from usbcreator.misc import * USBCREATOR_IFACE = 'com.ubuntu.USBCreator' PROPS_IFACE = 'org.freedesktop.DBus.Properties' DEVICE_IFACE = 'org.freedesktop.UDisks.Device' DISKS_IFACE = 'org.freedesktop.UDisks' def unmount_all(parent): '''Unmounts the device or any partitions of the device.''' bus = dbus.SystemBus() udisks = bus.get_object(DISKS_IFACE, '/org/freedesktop/UDisks') devices = udisks.EnumerateDevices(dbus_interface=DISKS_IFACE) for device in devices: dev = bus.get_object(DISKS_IFACE, device) props = dbus.Interface(dev, dbus.PROPERTIES_IFACE) if (props.Get(device, 'partition-slave') == parent and props.Get(device, 'device-is-mounted')): logging.debug('Unmounting %s' % device) # We explictly avoid catching errors here so that failure to # unmount a partition causes the format method to fail with the # error floating up to the frontend. dev.FilesystemUnmount([], dbus_interface=DEVICE_IFACE) dev = bus.get_object(DISKS_IFACE, parent) iface = dbus.PROPERTIES_IFACE if dev.Get(parent, 'device-is-mounted', dbus_interface=iface): logging.debug('Unmounting %s' % parent) dev.FilesystemUnmount([], dbus_interface=DEVICE_IFACE) def check_system_internal(device): bus = dbus.SystemBus() udisks = bus.get_object(DISKS_IFACE, '/org/freedesktop/UDisks') udisks = dbus.Interface(udisks, DISKS_IFACE) device = udisks.FindDeviceByDeviceFile(device) deviceobj = bus.get_object(DISKS_IFACE, device) if deviceobj.Get(device, 'device-is-system-internal', dbus_interface=PROPS_IFACE): raise dbus.DBusException, 'com.ubuntu.USBCreator.Error.SystemInternal' class USBCreator(dbus.service.Object): def __init__(self): bus_name = dbus.service.BusName(USBCREATOR_IFACE, bus=dbus.SystemBus()) dbus.service.Object.__init__(self, bus_name, '/com/ubuntu/USBCreator') self.dbus_info = None self.polkit = None # TODO return boolean success @dbus.service.method(USBCREATOR_IFACE, in_signature='sbs', out_signature='', sender_keyword='sender', connection_keyword='conn') def InstallBootloader(self, device, allow_system_internal, grub_location, sender=None, conn=None): '''Install a bootloader to the boot code area, either grub or syslinux. The function takes a partition device file of the form /dev/sda1 and an option grub_location argument for where grub is located For GRUB, it's expected that GRUB already exists, all that is installed is the bootsector code from grub-setup. For syslinux: Installs syslinux to the partition boot code area and writes the syslinux boot code to the disk code area. The latter is done to handle cases where a bootloader is accidentally installed to the MBR, and to handle some buggy BIOSes.''' self.check_polkit(sender, conn, 'com.ubuntu.usbcreator.bootloader') if not allow_system_internal: check_system_internal(device) bus = dbus.SystemBus() device_file = device udisks = bus.get_object(DISKS_IFACE, '/org/freedesktop/UDisks') udisks = dbus.Interface(udisks, DISKS_IFACE) device = udisks.FindDeviceByDeviceFile(device) deviceobj = bus.get_object(DISKS_IFACE, device) # Find the parent of the partition. parent = deviceobj.Get(device, 'partition-slave', dbus_interface=PROPS_IFACE) parentobj = bus.get_object(DISKS_IFACE, parent) parent = parentobj.Get(parent, 'device-file', dbus_interface=PROPS_IFACE) if grub_location: #Expect that boot.img and core.img both pre-built in grub_location popen(['dd', 'if=%s' % os.path.join(grub_location, 'boot.img'), 'of=%s' % parent, 'bs=446', 'count=1', 'conv=sync']) popen(['dd', 'if=%s' % os.path.join(grub_location, 'core.img'), 'of=%s' % parent, 'bs=512', 'count=62', 'seek=1', 'conv=sync']) else: popen(['syslinux', '-f', device_file]) # Write the syslinux MBR. popen(['dd', 'if=/usr/lib/syslinux/mbr.bin', 'of=%s' % parent, 'bs=446', 'count=1', 'conv=sync']) num = deviceobj.Get(device, 'partition-number', dbus_interface=PROPS_IFACE) try: popen(['/sbin/parted', parent, 'set', str(num), 'boot', 'on']) except USBCreatorProcessException: # Don't worry about not being able to re-read the partition table. # TODO: As this will still be a problem for KVM users, this should # be fixed by unmounting all the partitions before we get to this # point, then remounting the target partition after. pass @dbus.service.method(USBCREATOR_IFACE, in_signature='sb', out_signature='', sender_keyword='sender', connection_keyword='conn') def Format(self, device, allow_system_internal, sender=None, conn=None): self.check_polkit(sender, conn, 'com.ubuntu.usbcreator.format') if not allow_system_internal: check_system_internal(device) # TODO evand 2009-08-25: Needs a confirmation dialog. # XXX test with a device that doesn't have a partition table. bus = dbus.SystemBus() udisks = bus.get_object(DISKS_IFACE, '/org/freedesktop/UDisks') device = udisks.FindDeviceByDeviceFile(device, dbus_interface=DISKS_IFACE) dev = bus.get_object(DISKS_IFACE, device) # Use the disk if asked to format a partition. if dev.Get(device, 'device-is-partition', dbus_interface=PROPS_IFACE): device = dev.Get(device, 'partition-slave', dbus_interface=PROPS_IFACE) dev = bus.get_object(DISKS_IFACE, device) dev_file = dev.Get(device, 'device-file', dbus_interface=PROPS_IFACE) # TODO LOCK unmount_all(device) size = dev.Get(device, 'device-size', dbus_interface=PROPS_IFACE) dev.PartitionTableCreate('mbr', [], dbus_interface=DEVICE_IFACE, timeout=600) dev.PartitionCreate(0, size, '0x0c', '', ['boot'], [], 'vfat', [], dbus_interface=DEVICE_IFACE) # Zero out the MBR. Will require fancy privileges. popen(['dd', 'if=/dev/zero', 'of=%s' % dev_file, 'bs=446', 'count=1']) # TODO UNLOCK @dbus.service.method(USBCREATOR_IFACE, in_signature='ssb', out_signature='', sender_keyword='sender', connection_keyword='conn') def Image(self, source, target, allow_system_internal, sender=None, conn=None): self.check_polkit(sender, conn, 'com.ubuntu.usbcreator.image') if not allow_system_internal: check_system_internal(target) cmd = ['dd', 'if=%s' % str(source), 'of=%s' % str(target), 'bs=1M'] popen(cmd) @dbus.service.method(USBCREATOR_IFACE, in_signature='s', out_signature='s', sender_keyword='sender', connection_keyword='conn') def MountISO(self, device, sender=None, conn=None): self.check_polkit(sender, conn, 'com.ubuntu.usbcreator.mount') import tempfile ret = tempfile.mkdtemp() device = device.encode('utf-8') popen(['mount', '-o', 'loop', device, ret]) return ret @dbus.service.method(USBCREATOR_IFACE, in_signature='s', out_signature='', sender_keyword='sender', connection_keyword='conn') def UnmountFile(self, device, sender=None, conn=None): popen(['umount', device]) @dbus.service.method(USBCREATOR_IFACE, in_signature='s', out_signature='', sender_keyword='sender', connection_keyword='conn') def Unmount(self, device, sender=None, conn=None): self.check_polkit(sender, conn, 'com.ubuntu.usbcreator.mount') unmount_all(device) @dbus.service.method(USBCREATOR_IFACE, in_signature='s', out_signature='', sender_keyword='sender', connection_keyword='conn') def RemountRW(self, device, sender=None, conn=None): # Until udisks supports remounting devices. self.check_polkit(sender, conn, 'com.ubuntu.usbcreator.mount') popen(['mount', '-o', 'remount,rw', device]) @dbus.service.method(USBCREATOR_IFACE, in_signature='', out_signature='', sender_keyword='sender', connection_keyword='conn') def Shutdown(self, sender=None, conn=None): logging.debug('Shutting down.') loop.quit() # Taken from Jockey 0.5.3. def check_polkit(self, sender, conn, priv): if sender is None and conn is None: return if self.dbus_info is None: self.dbus_info = dbus.Interface(conn.get_object( 'org.freedesktop.DBus', '/org/freedesktop/DBus/Bus', False), 'org.freedesktop.DBus') pid = self.dbus_info.GetConnectionUnixProcessID(sender) if self.polkit is None: self.polkit = dbus.Interface(dbus.SystemBus().get_object( 'org.freedesktop.PolicyKit1', '/org/freedesktop/PolicyKit1/Authority', False), 'org.freedesktop.PolicyKit1.Authority') try: # we don't need is_challenge return here, since we call with # AllowUserInteraction (is_auth, _, details) = self.polkit.CheckAuthorization( ('unix-process', {'pid': dbus.UInt32(pid, variant_level=1), 'start-time': dbus.UInt64(0, variant_level=1)}), priv, {'': ''}, dbus.UInt32(1), '', timeout=600) except dbus.DBusException, e: if e._dbus_error_name == 'org.freedesktop.DBus.Error.ServiceUnknown': # polkitd timed out, connect again self.polkit = None return self.check_polkit(sender, conn, priv) else: raise if not is_auth: logging.debug('_check_polkit_privilege: sender %s on connection %s ' 'pid %i is not authorized for %s: %s' % (sender, conn, pid, priv, str(details))) raise dbus.DBusException, 'com.ubuntu.USBCreator.Error.NotAuthorized' DBusGMainLoop(set_as_default=True) helper = USBCreator() loop = gobject.MainLoop() loop.run()