1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
|
import os
import shutil
import subprocess
import tempfile
import unittest
import gitrepo
class TestGitRepo(unittest.TestCase):
def setUp(self):
super(TestGitRepo, self).setUp()
self.tmpdir = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, self.tmpdir)
self.main = gitrepo.Repo(self.tmpdir, 'main', None)
os.mkdir(self.main.path)
subprocess.check_call(['git', 'init'], cwd=self.main.path)
def _add_commit(self, path, content, message):
with open(os.path.join(self.main.path, path), 'w') as f:
f.write(content)
subprocess.check_call(['git', 'add', '.'], cwd=self.main.path)
subprocess.check_call(
['git', 'commit', '-a', '-m', message], cwd=self.main.path)
return self._last_commit()
def _last_commit(self):
out = subprocess.check_output(
['git', 'log', '--format=oneline', '-1'], cwd=self.main.path)
return out.split(' ')[0]
def test_pull_simple(self):
self._add_commit('foo1', 'commit1', 'commit1')
self._add_commit('foo2', 'commit2', 'commit2')
repo = gitrepo.Repo(self.tmpdir, 'clone', self.main.path)
repo._clone()
commits = []
commits.append(self._add_commit('foo3', 'commit3', 'commit3'))
commits.append(self._add_commit('foo4', 'commit4', 'commit4'))
repo.update()
out = subprocess.check_output(
['git', 'log', '--format=oneline', '-2'], cwd=repo.path)
found = [x.split(' ')[0] for x in out.split('\n') if x]
self.assertEqual(commits, list(reversed(found)))
def test_pull_rewrite(self):
self._add_commit('foo1', 'commit1', 'commit1')
repo = gitrepo.Repo(self.tmpdir, 'clone', self.main.path)
repo._clone()
commits = []
commits.append(self._add_commit('foo3', 'commit3', 'commit3'))
# now add this foo4 to be overwritten
self._add_commit('foo4', 'commit4', 'commit4')
repo.update()
subprocess.check_call(
['git', 'reset', '--hard', 'HEAD^'], cwd=self.main.path)
commits.append(self._add_commit('foo4a', 'commit4a', 'commit4a'))
repo.update()
out = subprocess.check_output(
['git', 'log', '--format=oneline', '-2'], cwd=repo.path)
found = [x.split(' ')[0] for x in out.split('\n') if x]
self.assertEqual(commits, list(reversed(found)))
def test_commits_to_check_empty(self):
'''works off an empty repo that's never been analyzed'''
commits = []
commits.append(self._add_commit('foo', 'foocontent', 'commit1'))
commits.append(self._add_commit('foo', 'foocontent2', 'commit2'))
found = [x.id for x in self.main.process_unchecked_commits()]
self.assertEqual(commits, found)
def test_commits_to_check_previous(self):
'''works off an empty repo that's been analyzed'''
self._add_commit('foo', 'foocontent', 'commit1')
# force last commit file to be updated
list(self.main.process_unchecked_commits())
commits = []
commits.append(self._add_commit('foo', 'foocontent2', 'commit2'))
found = [x.id for x in self.main.process_unchecked_commits()]
self.assertEqual(commits, found)
def test_commits_to_check_rewrite(self):
'''can handle a history rewrite'''
commits = []
commits.append(self._add_commit('foo', 'foocontent', 'commit1'))
commits.append(self._add_commit('foo', 'foocontent2', 'commit2'))
last_commit = os.path.join(self.main.path, 'patchwork-last-commit')
with open(last_commit, 'w') as f:
f.write('11111111111') # invalid sha1, so we'll search back
found = [x.id for x in self.main.process_unchecked_commits()]
self.assertEqual(commits, found)
|