Source code for dummymp.taskmgr

#!/usr/bin/env python
# DummyMP - Multiprocessing Library for Dummies!
# Copyright 2014 Albert Huang.
# 
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# 
#   http://www.apache.org/licenses/LICENSE-2.0
# 
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied. See the License for the specific language governing
# permissions and limitations under the License.
# 
# DummyMP Library - Task Manager
#   multiprocessing library for dummies!
#   (library for easily running functions in parallel)
# 

import logging
import time
from multiprocessing import Process, Queue

from . import config
from .detect import *
from .process import _runner

[docs]def process_queue(): """Process inter-process messages. Process the inter-process :py:class:`multiprocessing.Queue` objects, which receive messages from the spawned process for logging events and function returns. Args: None Note: Warnings are emitted to log if an invalid message is received. """ # Get main process logger logger = logging.getLogger() # Loop through queues... for dummymp_queue in config.dummymp_queues: # Make sure there's something to fetch from the queue! if not dummymp_queue.empty(): # Make a request to get the queue, with a timeout to ensure # no blocking (or long waiting) qout = dummymp_queue.get(timeout = 0.001) # Check if it's a list or not if type(qout) != list: logger.warning("WARNING: Received invalid message from process! This may be a bug! Message: %s" % str(qout)) continue # Check the message type IDs! # Format: [ [ DUMMYMP_MSG_TYPE_ID, SYSTEM_PID, INTERNAL_ID ], DATA... ] if qout[0][0] == config.DUMMYMP_LOG_ID: # Append PID info text qout[1].msg = ("[PID %i] " % qout[0][1]) + qout[1].msg # Emit the modified log record # BUG: seems like logger.handle() doesn't do any # filtering - therefore, we need to filter it ourselves. if logger.isEnabledFor(qout[1].levelno): logger.handle(qout[1]) elif qout[0][0] == config.DUMMYMP_RET_ID: # Store return into return dictionary config.dummymp_rets[qout[0][2]] = qout[1] else: logger.warning("WARNING: Received invalid message from process! (Invalid message type ID!) This may be a bug! Message: %s" % str(qout))
[docs]def process_process(): """Process the execution queue and inter-process messages. Process the execution queue by starting processes in said queue, handle processes that have completed, and process inter-process messages via :py:func:`process_queue()`. (In plain English: start the queued processes, check processes to see if they are done running, and grab any inter-process messages sent from the spawned process.) Args: None Returns: bool: A boolean indicating whether the execution queue has completed or not. Returns True if it has completed, False if it has not. This return value can be used in a while loop to block until processes have completed. (This is somewhat similar to multiprocessing's :py:meth:`multiprocessing.Process.join()` if used in a loop.) """ nproc = 0 # Get main process logger logger = logging.getLogger() # Loop through processes via index! while nproc < len(config.dummymp_procs): dummymp_proc = config.dummymp_procs[nproc] # Check if process is complete! (In this case, ensure that # the process is not in a start queue and it isn't alive # anymore!) if (not dummymp_proc in config.dummymp_start_procs) and (not dummymp_proc.is_alive()): # Run process_queue() to fetch the remaining queue items # from the process. process_queue() # Remove the queue and process pi = config.dummymp_procs.index(dummymp_proc) # Make sure to close the queue! # ...but first, check to make sure the queue is empty. if not config.dummymp_queues[pi].empty(): process_queue() # Once we're sure, let's close things up! config.dummymp_queues[pi].close() config.dummymp_queues.pop(pi) config.dummymp_procs.pop(pi) logger.debug("Process complete!") # Add to the completed count and remove from running count... config.total_completed += 1 config.total_running -= 1 # Make any callbacks, if necessary. if config.PROCESS_END_CALLBACK: config.PROCESS_END_CALLBACK(config.total_completed, config.total_running, config.total_procs) # Deincrement index counter, since we just removed a process # from the list. nproc -= 1 # Increment nproc += 1 # Fetch available CPUs avail_cpus = getCPUAvail() - config.total_running # Check if we need to update CPU avail if not needUpdateCPUAvail(): nproc = 0 # Loop through process execution queue while nproc < len(config.dummymp_start_procs): dummymp_proc_entry = config.dummymp_start_procs[nproc] # Check to make sure we can meet max_processes limit # (0 means no limit set) if (config.max_processes == 0) or (config.total_running < config.max_processes): # If there's no available CPUs, check to make sure that a # process isn't already running, and that the mode set is # not GENEROUS. if ((avail_cpus == 0) and (config.total_running == 0) and (config.DUMMYMP_MODE != config.DUMMYMP_GENEROUS)): # Force a single process to run! avail_cpus += 1 logger.debug("Not in generous mode, so forcing one task to run.") # Check if we have any available (or "available") CPUs! if avail_cpus > 0: logger.debug("%i CPUs available, spawning process!" % avail_cpus) # Deincrement counter avail_cpus -= 1 # Setup Queue # We create the Queue and Process here so that we can # prevent the error from opening too many Queue objects # in multiprocessing.Pipe: # IOError: handle out of range in select() # Bug: http://bugs.python.org/issue10527 q = Queue() # Extract internal PID, function, final_args, and # final_kwargs int_pid = dummymp_proc_entry[0] func = dummymp_proc_entry[1] final_args = dummymp_proc_entry[2] final_kwargs = dummymp_proc_entry[3] # Now add some arguments to the front: # Function to actually run final_args.insert(0, func) # Queue final_args.insert(0, q) # Process ID final_args.insert(0, int_pid) # Create Process object p = Process(target = _runner, args = final_args, kwargs = final_kwargs) # Save it config.dummymp_queues.append(q) config.dummymp_procs.append(p) # Start the process... p.start() # ...and remove it from the starting queue. config.dummymp_start_procs.remove(dummymp_proc_entry) # Increment running counter... config.total_running += 1 # Make any callbacks, if necessary. if config.PROCESS_START_CALLBACK: config.PROCESS_START_CALLBACK(config.total_completed, config.total_running, config.total_procs) # Deincrement index counter, since we just removed a process # from the start queue list. nproc -= 1 else: logger.debug("Max processes limit of %i reached, waiting for process to terminate." % config.max_processes) # Increment nproc += 1 # Check to see if we are done! if len(config.dummymp_procs) == 0: logger.debug("All processes complete, returning True.") return True return False
[docs]def process_until_done(): """Process the execution queue until all have been completed. Process the execution queue until it has indicated that all processes in the queue have been completed. (This is somewhat similar to multiprocessing's :py:meth:`multiprocessing.Process.join()`.) Args: None """ # Run process_queue() and process_process() until process_process() # returns False (when it completes the process queue) while not process_process(): process_queue() time.sleep(0.001)
Fork me on GitHub