1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
|
import os
import shutil
import subprocess
import tempfile
import unittest
import gitrepo
class TestGitRepo(unittest.TestCase):
def setUp(self):
super(TestGitRepo, self).setUp()
self.tmpdir = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, self.tmpdir)
self.main = gitrepo.Repo(self.tmpdir, "main", None)
os.mkdir(self.main.path)
subprocess.check_call(["git", "init"], cwd=self.main.path)
def _add_commit(self, path, content, message):
with open(os.path.join(self.main.path, path), "w") as f:
f.write(content)
subprocess.check_call(
["git", "add", "."], cwd=self.main.path, text=True)
subprocess.check_call(
["git", "commit", "-a", "-m", message], cwd=self.main.path,
text=True
)
return self._last_commit()
def _last_commit(self):
out = subprocess.check_output(
["git", "log", "--format=oneline", "-1"], cwd=self.main.path,
text=True
)
return out.split(" ")[0]
def test_pull_simple(self):
self._add_commit("foo1", "commit1", "commit1")
self._add_commit("foo2", "commit2", "commit2")
repo = gitrepo.Repo(self.tmpdir, "clone", self.main.path)
repo._clone()
commits = []
commits.append(self._add_commit("foo3", "commit3", "commit3"))
commits.append(self._add_commit("foo4", "commit4", "commit4"))
repo.update()
out = subprocess.check_output(
["git", "log", "--format=oneline", "-2"], cwd=repo.path, text=True
)
found = [x.split(" ")[0] for x in out.split("\n") if x]
self.assertEqual(commits, list(reversed(found)))
def test_pull_rewrite(self):
self._add_commit("foo1", "commit1", "commit1")
repo = gitrepo.Repo(self.tmpdir, "clone", self.main.path)
repo._clone()
commits = []
commits.append(self._add_commit("foo3", "commit3", "commit3"))
# now add this foo4 to be overwritten
self._add_commit("foo4", "commit4", "commit4")
repo.update()
subprocess.check_call(
["git", "reset", "--hard", "HEAD^"], cwd=self.main.path)
commits.append(self._add_commit("foo4a", "commit4a", "commit4a"))
repo.update()
out = subprocess.check_output(
["git", "log", "--format=oneline", "-2"], cwd=repo.path
)
found = [x.split(" ")[0] for x in out.decode().split("\n") if x]
self.assertEqual(commits, list(reversed(found)))
def test_commits_to_check_empty(self):
"""works off an empty repo that's never been analyzed"""
commits = []
commits.append(self._add_commit("foo", "foocontent", "commit1"))
commits.append(self._add_commit("foo", "foocontent2", "commit2"))
found = [x.id.decode() for x in self.main.process_unchecked_commits()]
self.assertEqual(commits, found)
def test_commits_to_check_previous(self):
"""works off an empty repo that's been analyzed"""
self._add_commit("foo", "foocontent", "commit1")
# force last commit file to be updated
list(self.main.process_unchecked_commits())
commits = []
commits.append(self._add_commit("foo", "foocontent2", "commit2"))
found = [x.id.decode() for x in self.main.process_unchecked_commits()]
self.assertEqual(commits, found)
def test_commits_to_check_rewrite(self):
"""can handle a history rewrite"""
commits = []
commits.append(self._add_commit("foo", "foocontent", "commit1"))
commits.append(self._add_commit("foo", "foocontent2", "commit2"))
last_commit = os.path.join(self.main.path, "patchwork-last-commit")
with open(last_commit, "w") as f:
f.write("11111111111") # invalid sha1, so we'll search back
found = [x.id.decode() for x in self.main.process_unchecked_commits()]
self.assertEqual(commits, found)
|