Here is the final result. I would be glad if it could help someone.
# Copyright (c) 2012, Jeremie Le Hen
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
# ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import collections
import heapq
import threading
import time
class TimelyQueue:
"""
Implements a similar but stripped down interface of Queue which
delivers items on time only.
"""
def __init__(self, resolution=5):
"""
`resolution' is an optimization to avoid wasting CPU cycles when
something is about to happen in less than X ms.
"""
self.timerthread = threading.Thread(target=self.__timer)
self.timerthread.daemon = True
self.resolution = float(resolution) / 1000
self.queue = []
self.triggered = collections.deque()
self.putcond = threading.Condition()
self.getcond = threading.Condition()
# Optimization to avoid waking the thread uselessly.
self.putwaketime = 0
self.terminating = False
self.timerthread.start()
def put(self, when, item):
"""
`when' is a Unix time from Epoch.
"""
with self.putcond:
heapq.heappush(self.queue, (when, item))
if when < self.putwaketime or self.putwaketime == 0:
self.putcond.notify()
def get(self, timeout=None):
"""
Timely return the next object on the queue.
"""
with self.getcond:
if len(self.triggered) > 0:
when, item = self.triggered.popleft()
return item
self.getcond.wait(timeout)
try:
when, item = self.triggered.popleft()
except IndexError:
return None
return item
def qsize(self):
"""
Self explanatory.
"""
with self.putcond:
return len(self.queue)
def terminate(self):
"""
Request the embedded thread to terminate.
"""
with self.putcond:
self.terminating = True
self.putcond.notifyAll()
def __timer(self):
with self.putcond:
maxwait = None
while True:
curtime = time.time()
try:
when, item = self.queue[0]
maxwait = when - curtime
self.putwaketime = when
except IndexError:
maxwait = None
self.putwaketime = 0
self.putcond.wait(maxwait)
if self.terminating:
return
curtime = time.time()
while True:
# Don't dequeue now, we are not sure to use it yet.
try:
when, item = self.queue[0]
except IndexError:
break
if when > curtime + self.resolution:
break
self.triggered.append(heapq.heappop(self.queue))
if len(self.triggered) > 0:
with self.getcond:
self.getcond.notify(len(self.triggered))
if __name__ == "__main__":
q = TimelyQueue()
N = 100000
t0 = time.time()
for i in range(N):
q.put(time.time() + 2, i)
dt = time.time() - t0
print "put done in %.3fs (%.2f put/sec)" % (dt, N / dt)
t0 = time.time()
i = 0
while i < N:
a = q.get(3)
if i == 0:
dt = time.time() - t0
print "start get after %.3fs" % dt
t0 = time.time()
i += 1
dt = time.time() - t0
print "get done in %.3fs (%.2f get/sec)" % (dt, N / dt)
q.terminate()
# Give change to the thread to exit properly, otherwise we may get
# a stray interpreter exception.
time.sleep(0.1)