aboutsummaryrefslogtreecommitdiff
path: root/waflib/Runner.py
blob: 6f64fed35464c3c95b83cc682ed060eab033e65c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
#! /usr/bin/env python
# encoding: utf-8
# WARNING! Do not edit! http://waf.googlecode.com/git/docs/wafbook/single.html#_obtaining_the_waf_file

import random,atexit
try:
	from queue import Queue
except:
	from Queue import Queue
from waflib import Utils,Task,Errors,Logs
GAP=10
class TaskConsumer(Utils.threading.Thread):
	def __init__(self):
		Utils.threading.Thread.__init__(self)
		self.ready=Queue()
		self.setDaemon(1)
		self.start()
	def run(self):
		try:
			self.loop()
		except:
			pass
	def loop(self):
		while 1:
			tsk=self.ready.get()
			if not isinstance(tsk,Task.TaskBase):
				tsk(self)
			else:
				tsk.process()
pool=Queue()
def get_pool():
	try:
		return pool.get(False)
	except:
		return TaskConsumer()
def put_pool(x):
	pool.put(x)
def _free_resources():
	global pool
	lst=[]
	while pool.qsize():
		lst.append(pool.get())
	for x in lst:
		x.ready.put(None)
	for x in lst:
		x.join()
	pool=None
atexit.register(_free_resources)
class Parallel(object):
	def __init__(self,bld,j=2):
		self.numjobs=j
		self.bld=bld
		self.outstanding=[]
		self.frozen=[]
		self.out=Queue(0)
		self.count=0
		self.processed=1
		self.stop=False
		self.error=[]
		self.biter=None
		self.dirty=False
	def get_next_task(self):
		if not self.outstanding:
			return None
		return self.outstanding.pop(0)
	def postpone(self,tsk):
		if random.randint(0,1):
			self.frozen.insert(0,tsk)
		else:
			self.frozen.append(tsk)
	def refill_task_list(self):
		while self.count>self.numjobs*GAP:
			self.get_out()
		while not self.outstanding:
			if self.count:
				self.get_out()
			elif self.frozen:
				try:
					cond=self.deadlock==self.processed
				except:
					pass
				else:
					if cond:
						msg='check the build order for the tasks'
						for tsk in self.frozen:
							if not tsk.run_after:
								msg='check the methods runnable_status'
								break
						lst=[]
						for tsk in self.frozen:
							lst.append('%s\t-> %r'%(repr(tsk),[id(x)for x in tsk.run_after]))
						raise Errors.WafError('Deadlock detected: %s%s'%(msg,''.join(lst)))
				self.deadlock=self.processed
			if self.frozen:
				self.outstanding+=self.frozen
				self.frozen=[]
			elif not self.count:
				self.outstanding.extend(self.biter.next())
				self.total=self.bld.total()
				break
	def add_more_tasks(self,tsk):
		if getattr(tsk,'more_tasks',None):
			self.outstanding+=tsk.more_tasks
			self.total+=len(tsk.more_tasks)
	def get_out(self):
		tsk=self.out.get()
		if not self.stop:
			self.add_more_tasks(tsk)
		self.count-=1
		self.dirty=True
		return tsk
	def error_handler(self,tsk):
		if not self.bld.keep:
			self.stop=True
		self.error.append(tsk)
	def add_task(self,tsk):
		try:
			self.pool
		except AttributeError:
			self.init_task_pool()
		self.ready.put(tsk)
	def init_task_pool(self):
		pool=self.pool=[get_pool()for i in range(self.numjobs)]
		self.ready=Queue(0)
		def setq(consumer):
			consumer.ready=self.ready
		for x in pool:
			x.ready.put(setq)
		return pool
	def free_task_pool(self):
		def setq(consumer):
			consumer.ready=Queue(0)
			self.out.put(self)
		try:
			pool=self.pool
		except:
			pass
		else:
			for x in pool:
				self.ready.put(setq)
			for x in pool:
				self.get_out()
			for x in pool:
				put_pool(x)
			self.pool=[]
	def start(self):
		self.total=self.bld.total()
		while not self.stop:
			self.refill_task_list()
			tsk=self.get_next_task()
			if not tsk:
				if self.count:
					continue
				else:
					break
			if tsk.hasrun:
				self.processed+=1
				continue
			if self.stop:
				break
			try:
				st=tsk.runnable_status()
			except Exception:
				self.processed+=1
				tsk.err_msg=Utils.ex_stack()
				if not self.stop and self.bld.keep:
					tsk.hasrun=Task.SKIPPED
					if self.bld.keep==1:
						if Logs.verbose>1 or not self.error:
							self.error.append(tsk)
						self.stop=True
					else:
						if Logs.verbose>1:
							self.error.append(tsk)
					continue
				tsk.hasrun=Task.EXCEPTION
				self.error_handler(tsk)
				continue
			if st==Task.ASK_LATER:
				self.postpone(tsk)
			elif st==Task.SKIP_ME:
				self.processed+=1
				tsk.hasrun=Task.SKIPPED
				self.add_more_tasks(tsk)
			else:
				tsk.position=(self.processed,self.total)
				self.count+=1
				tsk.master=self
				self.processed+=1
				if self.numjobs==1:
					tsk.process()
				else:
					self.add_task(tsk)
		while self.error and self.count:
			self.get_out()
		assert(self.count==0 or self.stop)
		self.free_task_pool()