summaryrefslogtreecommitdiffstats
path: root/Lib/Queue.py
blob: 0d69777b447e0434feb4a6faae4544cd0bc86f3d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
# A multi-producer, multi-consumer queue.

Empty = 'Queue.Empty' # Exception raised by get_nowait()

class Queue:

	# Initialize a queue object with a given maximum size
	# (If maxsize is <= 0, the maximum size is infinite)
	def __init__(self, maxsize):
		self._init(maxsize)
		self.mutex = thread.allocate_lock()
		self.esema = thread.allocate_lock()
		self.esema.acquire_lock()
		self.fsema = thread.allocate_lock()

	# Get an approximation of the queue size (not reliable!)
	def qsize(self):
		self.mutex.acquire_lock()
		n = self._qsize()
		self.mutex.release_lock()
		return n

	# Check if the queue is empty (not reliable!)
	def empty(self):
		self.mutex.acquire_lock()
		n = self._empty()
		self.mutex.release_lock()
		return n

	# Check if the queue is full (not reliable!)
	def full(self):
		self.mutex.acquire_lock()
		n = self._full()
		self.mutex.release_lock()
		return n

	# Put a new item into the queue
	def put(self, item):
		self.fsema.acquire_lock()
		self.mutex.acquire_lock()
		was_empty = self._empty()
		self._put(item)
		if was_empty:
			self.esema.release_lock()
		if not self._full():
			self.fsema.release_lock()
		self.mutex.release_lock()

	# Get an item from the queue,
	# blocking if necessary until one is available
	def get(self):
		self.esema.acquire_lock()
		self.mutex.acquire_lock()
		was_full = self._full()
		item = self._get()
		if was_full:
			self.fsema.release_lock()
		if not self._empty():
			self.esema.release_lock()
		self.mutex.release_lock()
		return item

	# Get an item from the queue if one is immediately available,
	# raise Empty if the queue is empty or temporarily unavailable
	def get_nowait(self):
		locked = self.esema.acquire_lock(0)
		self.mutex.acquire_lock()
		if self._empty():
			# The queue is empyt -- we can't have esema
			self.mutex.release_lock()
			raise Empty
		if not locked:
			locked = self.esema.acquire_lock(0)
			if not locked:
				# Somebody else has esema
				# but we have mutex --
				# go out of their way
				self.mutex.release_lock()
				raise Empty
		was_full = self._full()
		item = self._get()
		if was_full:
			self.fsema.release_lock()
		if not self._empty():
			self.esema.release_lock()
		self.mutex.release_lock()
		return item

	# XXX Need to define put_nowait() as well.
		

	# Override these methods to implement other queue organizations
	# (e.g. stack or priority queue).
	# These will only be called with appropriate locks held

	# Initialize the queue representation
	def _init(self, maxsize):
		self.maxsize = maxsize
		self.queue = []

	def _qsize(self):
		return len(self.queue)

	# Check wheter the queue is empty
	def _empty(self):
		return not self.queue

	# Check whether the queue is full
	def _full(self):
		return self.maxsize > 0 and len(self.queue) == self.maxsize

	# Put a new item in the queue
	def _put(self, item):
		self.queue.append(item)

	# Get an item from the queue
	def _get(self):
		item = self.queue[0]
		del self.queue[0]
		return item