summaryrefslogtreecommitdiff
path: root/tests/test_gitrepo.py
blob: d1ec23f3b6ab92d810f368d9851e275a421aab10 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
import os
import shutil
import subprocess
import tempfile
import unittest

import gitrepo


class TestGitRepo(unittest.TestCase):
    def setUp(self):
        super(TestGitRepo, self).setUp()

        self.tmpdir = tempfile.mkdtemp()
        self.addCleanup(shutil.rmtree, self.tmpdir)
        self.main = gitrepo.Repo(self.tmpdir, 'main', None)
        os.mkdir(self.main.path)
        subprocess.check_call(['git', 'init'], cwd=self.main.path)

    def _add_commit(self, path, content, message):
        with open(os.path.join(self.main.path, path), 'w') as f:
            f.write(content)
        subprocess.check_call(['git', 'add', '.'], cwd=self.main.path)
        subprocess.check_call(
            ['git', 'commit', '-a', '-m', message], cwd=self.main.path)
        return self._last_commit()

    def _last_commit(self):
        out = subprocess.check_output(
            ['git', 'log', '--format=oneline', '-1'], cwd=self.main.path)
        return out.split(' ')[0]

    def test_pull_simple(self):
        self._add_commit('foo1', 'commit1', 'commit1')
        self._add_commit('foo2', 'commit2', 'commit2')

        repo = gitrepo.Repo(self.tmpdir, 'clone', self.main.path)
        repo._clone()

        commits = []
        commits.append(self._add_commit('foo3', 'commit3', 'commit3'))
        commits.append(self._add_commit('foo4', 'commit4', 'commit4'))

        repo.update()

        out = subprocess.check_output(
            ['git', 'log', '--format=oneline', '-2'], cwd=repo.path)
        found = [x.split(' ')[0] for x in out.split('\n') if x]
        self.assertEqual(commits, list(reversed(found)))

    def test_pull_rewrite(self):
        self._add_commit('foo1', 'commit1', 'commit1')

        repo = gitrepo.Repo(self.tmpdir, 'clone', self.main.path)
        repo._clone()

        commits = []
        commits.append(self._add_commit('foo3', 'commit3', 'commit3'))
        # now add this foo4 to be overwritten
        self._add_commit('foo4', 'commit4', 'commit4')
        repo.update()

        subprocess.check_call(
            ['git', 'reset', '--hard', 'HEAD^'], cwd=self.main.path)
        commits.append(self._add_commit('foo4a', 'commit4a', 'commit4a'))
        repo.update()

        out = subprocess.check_output(
            ['git', 'log', '--format=oneline', '-2'], cwd=repo.path)
        found = [x.split(' ')[0] for x in out.split('\n') if x]
        self.assertEqual(commits, list(reversed(found)))

    def test_commits_to_check_empty(self):
        '''works off an empty repo that's never been analyzed'''
        commits = []
        commits.append(self._add_commit('foo', 'foocontent', 'commit1'))
        commits.append(self._add_commit('foo', 'foocontent2', 'commit2'))

        found = [x.id for x in self.main.process_unchecked_commits()]
        self.assertEqual(commits, found)

    def test_commits_to_check_previous(self):
        '''works off an empty repo that's been analyzed'''
        self._add_commit('foo', 'foocontent', 'commit1')
        # force last commit file to be updated
        list(self.main.process_unchecked_commits())

        commits = []
        commits.append(self._add_commit('foo', 'foocontent2', 'commit2'))

        found = [x.id for x in self.main.process_unchecked_commits()]
        self.assertEqual(commits, found)

    def test_commits_to_check_rewrite(self):
        '''can handle a history rewrite'''
        commits = []
        commits.append(self._add_commit('foo', 'foocontent', 'commit1'))
        commits.append(self._add_commit('foo', 'foocontent2', 'commit2'))

        last_commit = os.path.join(self.main.path, 'patchwork-last-commit')
        with open(last_commit, 'w') as f:
            f.write('11111111111')  # invalid sha1, so we'll search back

        found = [x.id for x in self.main.process_unchecked_commits()]
        self.assertEqual(commits, found)