Source code for rebasehelper.archive

# -*- coding: utf-8 -*-
#
# This tool helps you to rebase package to the latest version
# Copyright (C) 2013-2014 Red Hat, Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# he Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#
# Authors: Petr Hracek <phracek@redhat.com>
#          Tomas Hozza <thozza@redhat.com>

from __future__ import print_function
import tarfile
import zipfile
import bz2
import os
import shutil

import six

try:
    import lzma
except ImportError:
    from backports import lzma

from rebasehelper.logger import logger


# supported archive types
archive_types = {}


[docs]def register_archive_type(archive): archive_types[archive.EXTENSION] = archive return archive
[docs]class ArchiveTypeBase(object): """ Base class for various archive types """ EXTENSION = ""
[docs] @classmethod def match(cls, filename=None, *args, **kwargs): """ Checks if the filename matches the archive type. If yes, returns True, otherwise returns False. """ if filename is not None and filename.endswith(cls.EXTENSION): return True else: return False
[docs] @classmethod def open(cls, filename=None, *args, **kwargs): """ Opens archive with the given filename and returns the proper archive type object. """ raise NotImplementedError()
[docs] @classmethod def extract(cls, filename=None, *args, **kwargs): """ Extracts the archive into the given path :param path: Path where to extract the archive to. :return: """ raise NotImplementedError()
[docs]@register_archive_type class TarXzArchiveType(ArchiveTypeBase): """ .tar.xz archive type """ EXTENSION = ".tar.xz"
[docs] @classmethod def open(cls, filename=None): if filename is None: raise TypeError("Expected argument 'filename' (pos 1) is missing") xz_file = lzma.LZMAFile(filename, "r") return tarfile.open(mode='r', fileobj=xz_file)
[docs] @classmethod def extract(cls, archive=None, filename=None, path=None, *args, **kwargs): if archive is None: raise TypeError("Expected argument 'archive' (pos 1) is missing") archive.extractall(path)
[docs]@register_archive_type class Bz2ArchiveType(ArchiveTypeBase): """ .bz2 archive type """ EXTENSION = ".bz2"
[docs] @classmethod def open(cls, filename=None): if filename is None: raise TypeError("Expected argument 'filename' (pos 1) is missing") if filename.endswith('.tar.bz2'): return tarfile.TarFile.open(filename) else: return bz2.BZ2File(filename)
[docs] @classmethod def extract(cls, archive=None, filename=None, path=None, *args, **kwargs): if archive is None: raise TypeError("Expected argument 'archive' (pos 1) is missing") if filename.endswith('tar.bz2'): archive.extractall(path) else: data = archive.read() if not os.path.exists(path): os.mkdir(path) with open(os.path.join(path, filename[:-4]), 'wb') as f: f.write(data)
[docs]@register_archive_type class TarBz2ArchiveType(Bz2ArchiveType): """ .tar.bz2 archive type """ EXTENSION = ".tar.bz2"
[docs]@register_archive_type class TarGzArchiveType(TarBz2ArchiveType): """ .tar.gz archive type """ EXTENSION = ".tar.gz"
[docs] @classmethod def open(cls, filename=None): if filename is None: raise TypeError("Expected argument 'filename' (pos 1) is missing") return tarfile.TarFile.open(filename)
[docs] @classmethod def extract(cls, archive=None, filename=None, path=None, *args, **kwargs): if archive is None: raise TypeError("Expected argument 'archive' (pos 1) is missing") archive.extractall(path)
[docs]@register_archive_type class TgzArchiveType(TarGzArchiveType): """ .tgz archive type """ EXTENSION = ".tgz"
[docs]@register_archive_type class ZipArchiveType(ArchiveTypeBase): """ .zip archive type """ EXTENSION = ".zip"
[docs] @classmethod def match(cls, filename=None): if filename is not None and zipfile.is_zipfile(filename): return True else: return False
[docs] @classmethod def open(cls, filename=None): if filename is None: raise TypeError("Expected argument 'filename' (pos 1) is missing") return zipfile.ZipFile(filename, "r")
[docs] @classmethod def extract(cls, archive=None, filename=None, path=None, *args, **kwargs): if archive is None: raise TypeError("Expected argument 'archive' (pos 1) is missing") archive.extractall(path)
[docs]@register_archive_type class GemPseudoArchiveType(ArchiveTypeBase): """ .gem files are not archives - this is a pseudo type """ EXTENSION = ".gem"
[docs] @classmethod def open(cls, filename=None): pass
[docs] @classmethod def extract(cls, archive=None, filename=None, path=None, *args, **kwargs): if archive is not None: raise RuntimeError("In Gem pseudo file types, the archive (pos 1) argument is not used, but passed.") final_dir = os.path.join(path, os.path.basename(filename.rstrip(cls.EXTENSION))) os.makedirs(final_dir) shutil.copy(filename, final_dir)
[docs]class Archive(object): """ Class representing an archive with sources """ def __init__(self, filename=None): if filename is None: raise TypeError("Expected argument 'filename' (pos 1) is missing") self._filename = filename self._archive_type = None for archive_type in archive_types.values(): if archive_type.match(self._filename): self._archive_type = archive_type if self._archive_type is None: raise NotImplementedError("Unsupported archive type")
[docs] def extract_archive(self, path=None): """ Extracts the archive into the given path :param path: Path where to extract the archive to. :return: """ if path is None: TypeError("Expected argument 'path' (pos 1) is missing") logger.debug("Extracting '%s' into '%s'", self._filename, path) try: LZMAError = lzma.LZMAError except AttributeError: LZMAError = lzma.error try: archive = self._archive_type.open(self._filename) except (tarfile.ReadError, LZMAError) as e: raise IOError(six.text_type(e)) self._archive_type.extract(archive, self._filename, path) try: archive.close() except AttributeError: # pseudo archive types don't return real file-like object pass
[docs] @classmethod def get_supported_archives(cls): """Return list of supported archive types""" return archive_types.keys()