Make sure OrdererQueue returns the correct queue size
This commit is contained in:
parent
d116bbdfe9
commit
b69070275f
1 changed files with 5 additions and 12 deletions
|
@ -136,14 +136,8 @@ class ProcessingQueue(Queue.Queue, object):
|
||||||
def _qsize(self):
|
def _qsize(self):
|
||||||
return self._current_queue._qsize() if self._current_queue else 0
|
return self._current_queue._qsize() if self._current_queue else 0
|
||||||
|
|
||||||
def total_size(self):
|
def _total_qsize(self):
|
||||||
"""
|
return sum(q._qsize() for q in self._queues) if self._queues else 0
|
||||||
Return the approximate total size of all queues (not reliable!)
|
|
||||||
"""
|
|
||||||
self.mutex.acquire()
|
|
||||||
n = sum(q._qsize() for q in self._queues) if self._queues else 0
|
|
||||||
self.mutex.release()
|
|
||||||
return n
|
|
||||||
|
|
||||||
def put(self, item, block=True, timeout=None):
|
def put(self, item, block=True, timeout=None):
|
||||||
"""Put an item into the queue.
|
"""Put an item into the queue.
|
||||||
|
@ -160,17 +154,16 @@ class ProcessingQueue(Queue.Queue, object):
|
||||||
try:
|
try:
|
||||||
if self.maxsize > 0:
|
if self.maxsize > 0:
|
||||||
if not block:
|
if not block:
|
||||||
# Use >= instead of == due to OrderedQueue!
|
if self._total_qsize() == self.maxsize:
|
||||||
if self._qsize() >= self.maxsize:
|
|
||||||
raise Queue.Full
|
raise Queue.Full
|
||||||
elif timeout is None:
|
elif timeout is None:
|
||||||
while self._qsize() >= self.maxsize:
|
while self._total_qsize() == self.maxsize:
|
||||||
self.not_full.wait()
|
self.not_full.wait()
|
||||||
elif timeout < 0:
|
elif timeout < 0:
|
||||||
raise ValueError("'timeout' must be a non-negative number")
|
raise ValueError("'timeout' must be a non-negative number")
|
||||||
else:
|
else:
|
||||||
endtime = _time() + timeout
|
endtime = _time() + timeout
|
||||||
while self._qsize() >= self.maxsize:
|
while self._total_qsize() == self.maxsize:
|
||||||
remaining = endtime - _time()
|
remaining = endtime - _time()
|
||||||
if remaining <= 0.0:
|
if remaining <= 0.0:
|
||||||
raise Queue.Full
|
raise Queue.Full
|
||||||
|
|
Loading…
Reference in a new issue