Source code for turberfield.dynamics.jobqueue

#!/usr/bin/env python3
# encoding: UTF-8

# This file is part of turberfield.
#
# Turberfield is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Turberfield is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with turberfield.  If not, see <http://www.gnu.org/licenses/>.

import asyncio
from asyncio.queues import QueueEmpty
from collections import namedtuple


[docs]class JobQueue: """ You create a JobQueue just as you would a standard Queue from the `asyncio` module. """ Job = namedtuple("Job", ["t", "priority", "id"]) def __init__(self, loop): self.loop = loop self.queue = asyncio.PriorityQueue(loop=loop) self.jobs = {} def get(self): """ Returns a 2-tuple from the queue, consisting of: job A :py:class:`~turberfield.dynamics.JobQueue.Job` object, consisting of `(t, priority, id)`. msg The message delivered by the job. """ job = yield from self.queue.get() return (job, self.jobs.pop(job.id)) def get_nowait(self): job = self.queue.get_nowait() return (job, self.jobs.pop(job.id))
[docs] def put(self, t, priority, msg): """ Place a message on the queue, to be processed at time `t` with priority `priority`. """ self.jobs[id(msg)] = msg yield from self.queue.put(JobQueue.Job(t, priority, id(msg)))
def put_nowait(self, t, priority, msg): self.jobs[id(msg)] = msg self.queue.put_nowait(JobQueue.Job(t, priority, id(msg)))