Source code for rebasehelper.patch_helper

# -*- coding: utf-8 -*-
# This tool helps you to rebase package to the latest version
# Copyright (C) 2013-2014 Red Hat, Inc.
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# he Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
# Authors: Petr Hracek <>
#          Tomas Hozza <>

from __future__ import print_function
import os

import git
import six

from rebasehelper.logger import logger
from rebasehelper.utils import ConsoleHelper
from rebasehelper.utils import ProcessHelper
from rebasehelper.utils import GitHelper

patch_tools = {}

[docs]def register_patch_tool(patch_tool): patch_tools[patch_tool.CMD] = patch_tool return patch_tool
[docs]class PatchBase(object): """ Class used for using several patching command tools, ... Each method should overwrite method like run_check """
[docs] @classmethod def match(cls, cmd=None, *args, **kwargs): """Method checks whether it is usefull patch method""" return NotImplementedError()
[docs] @classmethod def run_patch(cls, old_dir, new_dir, rest_sources, git_helper, patches, *args, **kwargs): """Method will check all patches in relevant package""" return NotImplementedError()
[docs]@register_patch_tool class GitPatchTool(PatchBase): """Class for git command used for patching old and new sources""" CMD = 'git' source_dir = "" old_sources = "" new_sources = "" diff_cls = None output_data = None old_repo = None new_repo = None non_interactive = False patches = None prep_section = False exec_prep_script = False patch_sources_by_prep_script = False
[docs] @classmethod def match(cls, cmd=None): if cmd is not None and cmd == cls.CMD: return True else: return False
[docs] @staticmethod def apply_patch(repo, patch_object): """ Function applies patches to old sources It tries apply patch with am command and if it fails then with command --apply """ logger.debug('Applying patch with am') patch_name = patch_object.get_path() patch_option = patch_object.get_option() try: except git.GitCommandError: logger.debug('Applying patch with git-am failed.') try: repo.git.apply(patch_name, patch_option) except git.GitCommandError: repo.git.apply(patch_name, patch_option, reject=True, whitespace='fix') repo.git.add(all=True) repo.index.commit('Patch: {0}'.format(os.path.basename(patch_name)), skip_hooks=True) else: # replace last commit message with patch name to preserve mapping between commits and patches repo.head.reset('HEAD~1', index=False) repo.index.commit('Patch: {0}'.format(os.path.basename(patch_name)), skip_hooks=True)
@classmethod def _update_deleted_patches(cls, deleted_patches, inapplicable_patches): """Function checks patches against rebase-patches""" commits = list(cls.old_repo.iter_commits()) updated_patches = [] for patch in cls.patches: patch_name = patch.get_patch_name() if (not [c for c in commits if c.summary.endswith(patch_name)] and patch_name not in deleted_patches and patch_name not in inapplicable_patches): updated_patches.append(patch_name) return updated_patches @staticmethod def _get_automerged_patches(output): automerged_patches = [] if not output: return automerged_patches patch_name = None for line in output.split('\n'): if line.startswith('Applying:'): patch_name = line.split()[-1] elif line.startswith('Auto-merging'): if patch_name and patch_name not in automerged_patches: automerged_patches.append(patch_name) return automerged_patches @classmethod def _git_rebase(cls): """Function performs git rebase between old and new sources""" # in old_sources do: # 1) git remote add new_sources <path_to_new_sources> # 2) git fetch new_sources # 3) git rebase --onto new_sources/master <root_commit_old_sources> <last_commit_old_sources> if not cls.cont:'git-rebase operation to %s is ongoing...', os.path.basename(cls.new_sources)) upstream = 'new_upstream' cls.old_repo.create_remote(upstream, url=cls.new_sources).fetch() root_commit = cls.old_repo.git.rev_list('HEAD', max_parents=0) last_commit = cls.old_repo.commit('HEAD~{}'.format(1 if cls.prep_section else 0)) try: cls.output_data = cls.old_repo.git.rebase(root_commit, last_commit, onto='{}/master'.format(upstream), stdout_as_string=six.PY3) except git.GitCommandError as e: ret_code = e.status cls.output_data = e.stdout else: ret_code = 0 else:'git-rebase operation continues...') try: cls.output_data = cls.old_repo.git.rebase(skip=True, stdout_as_string=six.PY3) except git.GitCommandError as e: ret_code = e.status cls.output_data = e.stdout else: ret_code = 0 logger.debug(cls.output_data) patch_dictionary = {} modified_patches = [] deleted_patches = [] inapplicable_patches = [] while True: automerged_patches = cls._get_automerged_patches(cls.output_data) for patch_name in automerged_patches: commits = [c for c in cls.old_repo.iter_commits() if c.summary.endswith(patch_name)] if commits: base_name = os.path.join(cls.kwargs['rebased_sources_dir'], patch_name) diff = cls.old_repo.git.diff(commits[0].parents[0], commits[0], stdout_as_string=False) with open(base_name, 'wb') as f: f.write(diff) f.write(b'\n') modified_patches.append(base_name) if ret_code != 0: # Take the patch which failed from .git/rebase-apply/next file try: with open(os.path.join(cls.old_sources, '.git', 'rebase-apply', 'next')) as f: failed_patch = cls.patches[int(f.readline()) - 1].get_patch_name() except IOError: raise RuntimeError('Git rebase failed with unknown reason. Please check log file') if not cls.non_interactive:"Git has problems with rebasing patch %s", failed_patch) GitHelper.run_mergetool(cls.old_repo) else: inapplicable_patches.append(failed_patch) try: cls.output_data = cls.old_repo.git.rebase(skip=True, stdout_as_string=six.PY3) except git.GitCommandError as e: ret_code = e.status cls.output_data = e.stdout else: ret_code = 0 continue base_name = os.path.join(cls.kwargs['rebased_sources_dir'], patch_name) # unstaged changes diff = cls.old_repo.commit().diff(None) if diff: # staged changes diff = cls.old_repo.index.diff(cls.old_repo.commit()) modified_files = [d.a_path for d in diff]'Following files were modified: %s', ', '.join(modified_files)) try: commit = cls.old_repo.index.commit(patch_name, skip_hooks=True) except git.UnmergedEntriesError: inapplicable_patches.append(failed_patch) else: diff = cls.old_repo.git.diff(commit.parents[0], commit, stdout_as_string=False) with open(base_name, 'wb') as f: f.write(diff) f.write(b'\n') modified_patches.append(base_name) else: deleted_patches.append(base_name) if not cls.non_interactive: if not ConsoleHelper.get_message('Do you want to continue with another patch'): raise KeyboardInterrupt try: cls.output_data = cls.old_repo.git.rebase(skip=True, stdout_as_string=six.PY3) except git.GitCommandError as e: ret_code = e.status cls.output_data = e.stdout else: ret_code = 0 else: break deleted_patches = cls._update_deleted_patches(deleted_patches, inapplicable_patches) if deleted_patches: patch_dictionary['deleted'] = deleted_patches if modified_patches: patch_dictionary['modified'] = modified_patches if inapplicable_patches: patch_dictionary['inapplicable'] = inapplicable_patches #TODO correct settings for merge tool in ~/.gitconfig # currently now meld is not started return patch_dictionary
[docs] @classmethod def apply_old_patches(cls): """Function applies a patch to a old/new sources""" for patch in cls.patches:"Applying patch '%s' to '%s'", os.path.basename(patch.get_path()), os.path.basename(cls.source_dir)) try: cls.apply_patch(cls.old_repo, patch) except git.GitCommandError: raise RuntimeError('Failed to patch old sources')
@classmethod def _prepare_prep_script(cls, sources, prep): for src in sources: file_name = os.path.join('SOURCES', os.path.basename(src)) for index, row in enumerate(prep): if file_name in row: src_path = [x for x in row.split() if x.endswith(file_name)] prep[index] = row.replace(src_path[0], src) return prep
[docs] @classmethod def create_prep_script(cls, prep): """Function abstract special things from prep section and apply them to old sources""" logger.debug('Extract prep script') # Check whether patch or git am is used inside %prep section # If yes then execute whole %prep section logger.debug("prep section '%s'", prep) found_patching = [x for x in prep if ' patch ' in x] if found_patching: cls.exec_prep_script = True found_git_am = [x for x in prep if 'git am' in x] if found_git_am: cls.patch_sources_by_prep_script = True logger.debug('Fix %SOURCES tags in prep script') prep = cls._prepare_prep_script(cls.rest_sources, prep) logger.debug('Fix %PATCH tags in prep script') prep = cls._prepare_prep_script([x.get_path() for x in cls.patches], prep) prep_script_path = os.path.join(cls.kwargs['workspace_dir'], 'prep_script') logger.debug("Writing Prep script '%s' to the disc", prep_script_path) try: with open(prep_script_path, "w") as f: f.write("#!/bin/bash\n\n") f.writelines('\n'.join(prep)) os.chmod(prep_script_path, 0o755) except IOError: logger.debug("Unable to write prep script file to '%s'", prep_script_path) return None return prep_script_path
[docs] @classmethod def call_prep_script(cls, prep_script_path): cwd = os.getcwd() os.chdir(cls.old_sources) ProcessHelper.run_subprocess(prep_script_path, output=os.path.join(cls.kwargs['workspace_dir'], 'prep_script.log')) if not cls.patch_sources_by_prep_script: cls.old_repo.git.add(all=True) cls.old_repo.index.commit('prep_script prep_corrections', skip_hooks=True) os.chdir(cwd)
[docs] @classmethod def init_git(cls, directory): """Function initialize old and new Git repository""" repo = git.Repo.init(directory) repo.git.config('', GitHelper.get_user(), local=True) repo.git.config('', GitHelper.get_email(), local=True) repo.git.add(all=True) repo.index.commit('Initial commit', skip_hooks=True) return repo
[docs] @classmethod def run_patch(cls, old_dir, new_dir, rest_sources, patches, prep, **kwargs): """ The function can be used for patching one directory against another """ cls.kwargs = kwargs cls.old_sources = old_dir cls.new_sources = new_dir cls.output_data = None cls.cont = cls.kwargs['continue'] cls.rest_sources = rest_sources cls.patches = patches cls.non_interactive = kwargs.get('non_interactive') if not os.path.isdir(os.path.join(cls.old_sources, '.git')): cls.old_repo = cls.init_git(old_dir) cls.new_repo = cls.init_git(new_dir) cls.source_dir = cls.old_sources prep_path = cls.create_prep_script(prep) if not cls.patch_sources_by_prep_script: cls.apply_old_patches() if cls.exec_prep_script or cls.patch_sources_by_prep_script:'Executing prep script') cls.call_prep_script(prep_path) cls.cont = False else: cls.old_repo = git.Repo(old_dir) cls.new_repo = git.Repo(new_dir) return cls._git_rebase()
[docs]class Patcher(object): """ Class representing a process of applying and generating rebased patch using specific tool. """ def __init__(self, tool=None): """ Constructor :param tool: tool to be used. If not supported, raises NotImplementedError :return: None """ if tool is None: raise TypeError("Expected argument 'tool' (pos 1) is missing") self._patch_tool_name = tool self._tool = None for patch_tool in patch_tools.values(): if patch_tool.match(self._patch_tool_name): self._tool = patch_tool if self._tool is None: raise NotImplementedError("Unsupported patch tool")
[docs] def patch(self, old_dir, new_dir, rest_sources, patches, prep, **kwargs): """ Apply patches and generate rebased patches if needed :param old_dir: path to dir with old patches :param new_dir: path to dir with new patches :param patches: old patches :param rebased_patches: rebased patches :param kwargs: -- :return: """ logger.debug("Patching source by patch tool %s", self._patch_tool_name) return self._tool.run_patch(old_dir, new_dir, rest_sources, patches, prep, **kwargs)