Source code for rebasehelper.helpers.download_helper

# -*- coding: utf-8 -*-
#
# This tool helps you rebase your package to the latest version
# Copyright (C) 2013-2019 Red Hat, Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#
# Authors: Petr Hráček <phracek@redhat.com>
#          Tomáš Hozza <thozza@redhat.com>
#          Nikola Forró <nforro@redhat.com>
#          František Nečas <fifinecas@seznam.cz>

import logging
import os
import sys
import time
import urllib.error
import urllib.request
from typing import cast

import requests

from rebasehelper.exceptions import DownloadError
from rebasehelper.logger import CustomLogger


logger: CustomLogger = cast(CustomLogger, logging.getLogger(__name__))


[docs]class DownloadHelper: """Class for downloading files and performing HTTP requests."""
[docs] @staticmethod def progress(download_total, downloaded, start_time, show_size=True): """Prints current progress and estimated remaining time of a download to the standard output. Args: download_total (int): Total download size in bytes. downloaded (int): Size of the already downloaded portion of a file in bytes. start_time (float): Time when the download started in seconds since epoch. show_size (bool): Whether to show the number of downloaded bytes. """ bar_width = 32 infinite_step = 256 * 1024 # move every 256 kilobytes delta = time.time() - start_time def format_time(t): h, rem = divmod(int(t), 3600) m, s = divmod(rem, 60) return '{:0>2d}:{:0>2d}:{:0>2d}'.format(h, m, s) def format_size(s): units = [' ', 'K', 'M', 'G', 'T'] i = 0 while s >= 1024.0 and i < len(units) - 1: s /= 1024.0 i += 1 return '{:>7.2F}{}'.format(s, units[i]) if download_total < 0: # infinite progress bar pct = ' ' * 4 pos = int(downloaded / infinite_step) % (bar_width - 5) bar = '[{}<=>{}]'.format(' ' * pos, ' ' * (bar_width - 5 - pos)) ts = ' in {}'.format(format_time(delta)) else: r = float(downloaded) / float(download_total) if download_total else 0.0 pct = '{:>3d}%'.format(int(r * 100)) pos = int(r * (bar_width - 3)) bar = '[{}>{}]'.format('=' * pos, ' ' * (bar_width - 3 - pos)) ts = 'eta {}'.format(format_time(delta / r - delta) if r > 0.0 else ' ' * 7 + '?') size = format_size(downloaded) if show_size else '' # no point to log progress, write directly to stdout sys.stdout.write('\r{}{} {} {} '.format(pct, bar, size, ts)) sys.stdout.flush()
[docs] @staticmethod def request(url, **kwargs): """Performs an HTTP request or an FTP RETR command. Args: url (str): HTTP, HTTPS or FTP URL. **kwargs: Keyword arguments to be passed to requests.session.get(). Returns: requests.Response: Response object. """ class FTPAdapter(requests.adapters.BaseAdapter): def send(self, request, stream=False, timeout=None, verify=True, # pylint: disable=unused-argument cert=None, proxies=None): # pylint: disable=unused-argument response = requests.models.Response() response.request = request response.connection = self try: resp = urllib.request.urlopen(request.url) except urllib.error.URLError as e: response.status_code = 400 response.reason = e.reason else: response.status_code = 200 response.headers = requests.structures.CaseInsensitiveDict(getattr(resp, 'headers', {})) response.raw = resp response.url = resp.url return response def close(self): pass session = requests.Session() session.mount('ftp://', FTPAdapter()) try: return session.get(url, **kwargs) except requests.exceptions.RequestException as e: logger.error('%s: %s', type(e).__name__, str(e)) return None
[docs] @staticmethod def download_file(url, destination_path, blocksize=8192): """Downloads a file from HTTP, HTTPS or FTP URL. Args: url (str): URL to be downloaded. destination_path (str): Path to where the downloaded file will be stored. blocksize (int): Block size in bytes. """ r = DownloadHelper.request(url, stream=True) if r is None: raise DownloadError("An unexpected error occurred during the download.") if not 200 <= r.status_code < 300: raise DownloadError(r.reason) file_size = int(r.headers.get('content-length', -1)) if r.headers.get('content-encoding', 'identity') != 'identity': # use infinite progress bar in case of compressed content, there is no # way to determine the uncompressed size in advance file_size = -1 # file exists, check the size if os.path.exists(destination_path): if file_size < 0 or file_size != os.path.getsize(destination_path): logger.verbose("The destination file '%s' exists, but sizes don't match! Removing it.", destination_path) os.remove(destination_path) else: logger.verbose("The destination file '%s' exists, and the size is correct! Skipping download.", destination_path) return try: with open(destination_path, 'wb') as local_file: logger.info('Downloading file from URL %s', url) download_start = time.time() downloaded = 0 # report progress DownloadHelper.progress(file_size, downloaded, download_start) # do the actual download for chunk in r.iter_content(chunk_size=blocksize): downloaded += len(chunk) local_file.write(chunk) # report progress DownloadHelper.progress(file_size, downloaded, download_start) sys.stdout.write('\n') sys.stdout.flush() except KeyboardInterrupt as e: os.remove(destination_path) raise e