client = $client; // Prepare the destination. $this->destination = $this->prepareTarget($dest); if ($this->destination['scheme'] === 's3') { $this->s3Args = $this->getS3Args($this->destination['path']); } // Prepare the source. if (is_string($source)) { $this->sourceMetadata = $this->prepareTarget($source); $this->source = $source; } elseif ($source instanceof Iterator) { if (empty($options['base_dir'])) { throw new \InvalidArgumentException('You must provide the source' . ' argument as a string or provide the "base_dir" option.'); } $this->sourceMetadata = $this->prepareTarget($options['base_dir']); $this->source = $source; } else { throw new \InvalidArgumentException('source must be the path to a ' . 'directory or an iterator that yields file names.'); } // Validate schemes. if ($this->sourceMetadata['scheme'] === $this->destination['scheme']) { throw new \InvalidArgumentException("You cannot copy from" . " {$this->sourceMetadata['scheme']} to" . " {$this->destination['scheme']}." ); } // Handle multipart-related options. $this->concurrency = isset($options['concurrency']) ? $options['concurrency'] : MultipartUploader::DEFAULT_CONCURRENCY; $this->mupThreshold = isset($options['mup_threshold']) ? $options['mup_threshold'] : 16777216; if ($this->mupThreshold < MultipartUploader::PART_MIN_SIZE) { throw new \InvalidArgumentException('mup_threshold must be >= 5MB'); } // Handle "before" callback option. if (isset($options['before'])) { $this->before = $options['before']; if (!is_callable($this->before)) { throw new \InvalidArgumentException('before must be a callable.'); } } // Handle "debug" option. if (isset($options['debug'])) { if ($options['debug'] === true) { $options['debug'] = fopen('php://output', 'w'); } if (is_resource($options['debug'])) { $this->addDebugToBefore($options['debug']); } } } /** * Transfers the files. */ public function promise() { // If the promise has been created, just return it. if (!$this->promise) { // Create an upload/download promise for the transfer. $this->promise = $this->sourceMetadata['scheme'] === 'file' ? $this->createUploadPromise() : $this->createDownloadPromise(); } return $this->promise; } /** * Transfers the files synchronously. */ public function transfer() { $this->promise()->wait(); } private function prepareTarget($targetPath) { $target = [ 'path' => $this->normalizePath($targetPath), 'scheme' => $this->determineScheme($targetPath), ]; if ($target['scheme'] !== 's3' && $target['scheme'] !== 'file') { throw new \InvalidArgumentException('Scheme must be "s3" or "file".'); } return $target; } /** * Creates an array that contains Bucket and Key by parsing the filename. * * @param string $path Path to parse. * * @return array */ private function getS3Args($path) { $parts = explode('/', str_replace('s3://', '', $path), 2); $args = ['Bucket' => $parts[0]]; if (isset($parts[1])) { $args['Key'] = $parts[1]; } return $args; } /** * Parses the scheme from a filename. * * @param string $path Path to parse. * * @return string */ private function determineScheme($path) { return !strpos($path, '://') ? 'file' : explode('://', $path)[0]; } /** * Normalize a path so that it has UNIX-style directory separators and no trailing / * * @param string $path * * @return string */ private function normalizePath($path) { return rtrim(str_replace('\\', '/', $path), '/'); } private function resolveUri($uri) { $resolved = []; $sections = explode('/', $uri); foreach ($sections as $section) { if ($section === '.' || $section === '') { continue; } if ($section === '..') { array_pop($resolved); } else { $resolved []= $section; } } return ($uri[0] === '/' ? '/' : '') . implode('/', $resolved); } private function createDownloadPromise() { $parts = $this->getS3Args($this->sourceMetadata['path']); $prefix = "s3://{$parts['Bucket']}/" . (isset($parts['Key']) ? $parts['Key'] . '/' : ''); $commands = []; foreach ($this->getDownloadsIterator() as $object) { // Prepare the sink. $objectKey = preg_replace('/^' . preg_quote($prefix, '/') . '/', '', $object); $resolveSink = $this->destination['path'] . '/'; if (isset($parts['Key']) && strpos($objectKey, $parts['Key']) !== 0) { $resolveSink .= $parts['Key'] . '/'; } $resolveSink .= $objectKey; $sink = $this->destination['path'] . '/' . $objectKey; $command = $this->client->getCommand( 'GetObject', $this->getS3Args($object) + ['@http' => ['sink' => $sink]] ); if (strpos( $this->resolveUri($resolveSink), $this->destination['path'] ) !== 0 ) { throw new AwsException( 'Cannot download key ' . $objectKey . ', its relative path resolves outside the' . ' parent directory', $command); } // Create the directory if needed. $dir = dirname($sink); if (!is_dir($dir) && !mkdir($dir, 0777, true)) { throw new \RuntimeException("Could not create dir: {$dir}"); } // Create the command. $commands []= $command; } // Create a GetObject command pool and return the promise. return (new Aws\CommandPool($this->client, $commands, [ 'concurrency' => $this->concurrency, 'before' => $this->before, 'rejected' => function ($reason, $idx, Promise\PromiseInterface $p) { $p->reject($reason); } ]))->promise(); } private function createUploadPromise() { // Map each file into a promise that performs the actual transfer. $files = \Aws\map($this->getUploadsIterator(), function ($file) { return (filesize($file) >= $this->mupThreshold) ? $this->uploadMultipart($file) : $this->upload($file); }); // Create an EachPromise, that will concurrently handle the upload // operations' yielded promises from the iterator. return Promise\each_limit_all($files, $this->concurrency); } /** @return Iterator */ private function getUploadsIterator() { if (is_string($this->source)) { return Aws\filter( Aws\recursive_dir_iterator($this->sourceMetadata['path']), function ($file) { return !is_dir($file); } ); } return $this->source; } /** @return Iterator */ private function getDownloadsIterator() { if (is_string($this->source)) { $listArgs = $this->getS3Args($this->sourceMetadata['path']); if (isset($listArgs['Key'])) { $listArgs['Prefix'] = $listArgs['Key'] . '/'; unset($listArgs['Key']); } $files = $this->client ->getPaginator('ListObjects', $listArgs) ->search('Contents[].Key'); $files = Aws\map($files, function ($key) use ($listArgs) { return "s3://{$listArgs['Bucket']}/$key"; }); return Aws\filter($files, function ($key) { return substr($key, -1, 1) !== '/'; }); } return $this->source; } private function upload($filename) { $args = $this->s3Args; $args['SourceFile'] = $filename; $args['Key'] = $this->createS3Key($filename); $command = $this->client->getCommand('PutObject', $args); $this->before and call_user_func($this->before, $command); return $this->client->executeAsync($command); } private function uploadMultipart($filename) { $args = $this->s3Args; $args['Key'] = $this->createS3Key($filename); $filename = $filename instanceof \SplFileInfo ? $filename->getPathname() : $filename; return (new MultipartUploader($this->client, $filename, [ 'bucket' => $args['Bucket'], 'key' => $args['Key'], 'before_initiate' => $this->before, 'before_upload' => $this->before, 'before_complete' => $this->before, 'concurrency' => $this->concurrency, ]))->promise(); } private function createS3Key($filename) { $filename = $this->normalizePath($filename); $relative_file_path = ltrim( preg_replace('#^' . preg_quote($this->sourceMetadata['path']) . '#', '', $filename), '/\\' ); if (isset($this->s3Args['Key'])) { return rtrim($this->s3Args['Key'], '/').'/'.$relative_file_path; } return $relative_file_path; } private function addDebugToBefore($debug) { $before = $this->before; $sourcePath = $this->sourceMetadata['path']; $s3Args = $this->s3Args; $this->before = static function ( CommandInterface $command ) use ($before, $debug, $sourcePath, $s3Args) { // Call the composed before function. $before and $before($command); // Determine the source and dest values based on operation. switch ($operation = $command->getName()) { case 'GetObject': $source = "s3://{$command['Bucket']}/{$command['Key']}"; $dest = $command['@http']['sink']; break; case 'PutObject': $source = $command['SourceFile']; $dest = "s3://{$command['Bucket']}/{$command['Key']}"; break; case 'UploadPart': $part = $command['PartNumber']; case 'CreateMultipartUpload': case 'CompleteMultipartUpload': $sourceKey = $command['Key']; if (isset($s3Args['Key']) && strpos($sourceKey, $s3Args['Key']) === 0) { $sourceKey = substr($sourceKey, strlen($s3Args['Key']) + 1); } $source = "{$sourcePath}/{$sourceKey}"; $dest = "s3://{$command['Bucket']}/{$command['Key']}"; break; default: throw new \UnexpectedValueException( "Transfer encountered an unexpected operation: {$operation}." ); } // Print the debugging message. $context = sprintf('%s -> %s (%s)', $source, $dest, $operation); if (isset($part)) { $context .= " : Part={$part}"; } fwrite($debug, "Transferring {$context}\n"); }; } }