# -*- coding: utf-8 -*-
#
# This tool helps you to rebase package to the latest version
# Copyright (C) 2013-2014 Red Hat, Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# he Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#
# Authors: Petr Hracek <phracek@redhat.com>
# Tomas Hozza <thozza@redhat.com>
from __future__ import print_function
import tarfile
import zipfile
import bz2
import os
import shutil
import six
try:
import lzma
except ImportError:
from backports import lzma
from rebasehelper.logger import logger
# supported archive types
archive_types = {}
[docs]def register_archive_type(archive):
archive_types[archive.EXTENSION] = archive
return archive
[docs]class ArchiveTypeBase(object):
""" Base class for various archive types """
EXTENSION = ""
[docs] @classmethod
def match(cls, filename=None, *args, **kwargs):
"""
Checks if the filename matches the archive type. If yes, returns
True, otherwise returns False.
"""
if filename is not None and filename.endswith(cls.EXTENSION):
return True
else:
return False
[docs] @classmethod
def open(cls, filename=None, *args, **kwargs):
"""
Opens archive with the given filename and returns the proper
archive type object.
"""
raise NotImplementedError()
[docs]@register_archive_type
class TarXzArchiveType(ArchiveTypeBase):
""" .tar.xz archive type """
EXTENSION = ".tar.xz"
[docs] @classmethod
def open(cls, filename=None):
if filename is None:
raise TypeError("Expected argument 'filename' (pos 1) is missing")
xz_file = lzma.LZMAFile(filename, "r")
return tarfile.open(mode='r', fileobj=xz_file)
[docs]@register_archive_type
class Bz2ArchiveType(ArchiveTypeBase):
""" .bz2 archive type """
EXTENSION = ".bz2"
[docs] @classmethod
def open(cls, filename=None):
if filename is None:
raise TypeError("Expected argument 'filename' (pos 1) is missing")
if filename.endswith('.tar.bz2'):
return tarfile.TarFile.open(filename)
else:
return bz2.BZ2File(filename)
[docs]@register_archive_type
class TarBz2ArchiveType(Bz2ArchiveType):
""" .tar.bz2 archive type """
EXTENSION = ".tar.bz2"
[docs]@register_archive_type
class TarGzArchiveType(TarBz2ArchiveType):
""" .tar.gz archive type """
EXTENSION = ".tar.gz"
[docs] @classmethod
def open(cls, filename=None):
if filename is None:
raise TypeError("Expected argument 'filename' (pos 1) is missing")
return tarfile.TarFile.open(filename)
[docs]@register_archive_type
class TgzArchiveType(TarGzArchiveType):
""" .tgz archive type """
EXTENSION = ".tgz"
[docs]@register_archive_type
class ZipArchiveType(ArchiveTypeBase):
""" .zip archive type """
EXTENSION = ".zip"
[docs] @classmethod
def match(cls, filename=None):
if filename is not None and zipfile.is_zipfile(filename):
return True
else:
return False
[docs] @classmethod
def open(cls, filename=None):
if filename is None:
raise TypeError("Expected argument 'filename' (pos 1) is missing")
return zipfile.ZipFile(filename, "r")
[docs]@register_archive_type
class GemPseudoArchiveType(ArchiveTypeBase):
""" .gem files are not archives - this is a pseudo type """
EXTENSION = ".gem"
[docs] @classmethod
def open(cls, filename=None):
pass
[docs]class Archive(object):
""" Class representing an archive with sources """
def __init__(self, filename=None):
if filename is None:
raise TypeError("Expected argument 'filename' (pos 1) is missing")
self._filename = filename
self._archive_type = None
for archive_type in archive_types.values():
if archive_type.match(self._filename):
self._archive_type = archive_type
if self._archive_type is None:
raise NotImplementedError("Unsupported archive type")
[docs] @classmethod
def get_supported_archives(cls):
"""Return list of supported archive types"""
return archive_types.keys()