Package datk :: Package core :: Module distalgs
[hide private]
[frames] | no frames]

Source Code for Module datk.core.distalgs

  1  import random 
  2  from random import shuffle 
  3  import threading 
  4  from threading import Thread 
  5  from time import sleep 
  6  import pdb 
  7  import collections 
  8  from collections import defaultdict 
  9  from copy import deepcopy 
 10   
11 -class Message:
12 """ 13 A Message 14 15 Attributes: 16 - content: The content of this Message 17 - algorithm: The Algorithm that required the sending of this Message 18 - author: The Process that sent it 19 """
20 - def __init__(self, algorithm, content= None):
21 """ 22 @param algorithm: the Algorithm that required the sending of this Message 23 @param content: The content of this Message. Can be any type, including None. 24 """ 25 assert isinstance(algorithm, Algorithm), "Message algorithm must be an instance of Algorithm" 26 self.content = content 27 self.algorithm = algorithm 28 self.author = None
29
30 - def __str__(self):
31 return self.__class__.__name__+"("+str(self.content)+")"
32 33
34 -class Process:
35 """A computing element located at a node of a network graph. 36 Processes are identical except for their UID"""
37 - def __init__(self, UID, state = None, in_nbrs = [], out_nbrs = []):
38 self.UID = UID 39 self.state = defaultdict(dict) # algorithm : state dict 40 41 self.in_nbrs = in_nbrs or [] # Don't remove or [] 42 self.out_nbrs = out_nbrs or [] # Don't remove or [] 43 44 self.in_channel = {} 45 self.algs = set()
46 53 59
60 - def output(self, key, val, verbose=True):
61 """ 62 Sets the publicly visible value of self.state[key] to val 63 64 @param key: The state variable to set 65 @param val: The value to assign to it 66 @param verbose: Dictates whether or not to print this event to STDOUT 67 """ 68 self.state[key] = val 69 if verbose: 70 if isinstance(val, list): 71 print str(self)+"."+str(key), "are", [str(v) for v in val] 72 else: 73 print str(self)+"."+str(key),"is", val
74
75 - def send_to_all_neighbors(self, msg):
76 """Sends a message to all out_nbrs 77 78 @param msg: the message to send 79 """ 80 self.send_msg(msg)
81
82 - def send_msg(self, msg, out_nbrs=None):
83 """ 84 Sends a Message from Process to some subset of out_nbrs 85 86 @param msg: The message to send. 87 @param out_nbrs: The out_nbrs to send the message to. This may be a 88 subset of the Process's out_nbrs, or None, in which case the message 89 will be sent to all out_nbrs 90 91 Effects: 92 - Sets msg.author = self 93 """ 94 msg.author = self 95 if out_nbrs is None: 96 out_nbrs = self.out_nbrs 97 elif isinstance(out_nbrs, Process): 98 out_nbrs = [out_nbrs] 99 if isinstance(out_nbrs, list): 100 msg.algorithm.count_msg(len(out_nbrs)) 101 for nbr in out_nbrs: 102 if nbr.in_nbrs.index(self) in nbr.in_channel: 103 nbr.in_channel[nbr.in_nbrs.index(self)].append(msg) 104 else: 105 nbr.in_channel[nbr.in_nbrs.index(self)] = [msg] 106 else: 107 raise Exception("incorrect type for out_nbrs argument of Process.send_msg()")
108
109 - def get_msgs(self, algorithm, in_nbrs = None):
110 """Removes all Messages that relate to a particular Algorithm from the Process' 111 incoming channels (or from some subset of incoming channels). Returns them. 112 113 @param algorithm: the algorithm whose messages this returns 114 @param in_nbrs: the in_nbrs of the Process from whose channels we are getting 115 messages. If None, fetches messages from all channels 116 @return: A list of Messages, msgs, such that every message in msgs has Algorithm 117 algorithm, and author in in_nbrs 118 """ 119 if in_nbrs is None: 120 in_nbrs = self.in_nbrs[:] 121 elif isinstance(in_nbrs, Process): 122 in_nbrs = [in_nbrs] 123 124 if isinstance(in_nbrs, list): 125 msgs = [] 126 for in_nbr in in_nbrs: 127 idx = self.in_nbrs.index(in_nbr) 128 if idx in self.in_channel: 129 i = 0 130 while i < len(self.in_channel[idx]): 131 msg = self.in_channel[idx][i] 132 if msg.algorithm == algorithm: 133 self.in_channel[idx].pop(i) 134 msgs.append(msg) 135 else: 136 i+=1 137 return msgs 138 else: 139 raise Exception("incorrect type for in_nbrs argument of Process.get_msgs()")
140
141 - def add(self, algorithm):
142 """Causes the Process to wake up with respect to algorithm""" 143 self.algs.add(algorithm) 144 self.state[algorithm]["diam"] = self.state['n']
145
146 - def terminate(self, algorithm):
147 """Causes the Process to halt execution of algorithm""" 148 if algorithm in self.algs: 149 self.algs.remove(algorithm)
150
151 - def __str__(self):
152 return "P"+str(self.UID)
153
154 - def __repr__(self):
155 return "P" + str(self.UID) + " -> {"+", ".join([str(process) for process in self.out_nbrs]) + "}"
156 157
158 -class Network:
159 """ A collection of Processes that know n, the # of processes in the network.""" 160
161 - def __init__(self, processes):
162 self.processes = processes
163
164 - def __init__(self, n, index_to_UID = None):
165 """ 166 Creates a network of n disconnected Processes, 167 with random distinct UIDs, or as specified by 168 the index_to_UID function 169 """ 170 if index_to_UID is None: 171 self.processes = [Process(i) for i in range(n)] 172 shuffle(self.processes) 173 else: 174 self.processes = [Process(index_to_UID(i)) for i in range(n)] 175 for process in self: 176 process.state['n'] = n
177
178 - def add(self, algorithm):
179 """Awakens all Processes in the Network with respect to algorithm""" 180 for process in self: 181 process.add(algorithm)
182
183 - def run(self, algorithm):
184 """Runs algorithm on the Network""" 185 algorithm(self)
186
187 - def draw(self):
188 """Draws the Network""" 189 import math 190 from matplotlib import pyplot as plt 191 n = len(self) 192 193 vals = [] 194 195 for k in range(n): 196 vals.append( [math.cos(2*k*math.pi/n), math.sin(2*k*math.pi/n) ] ) 197 198 plt.xlim([-1.2, 1.2]) 199 plt.ylim([-1.2, 1.2]) 200 plt.plot( [v[0] for v in vals], [v[1] for v in vals], 'ro' ) 201 202 def line(v1, v2): 203 plt.plot( (v1[0], v2[0]), (v1[1], v2[1] ))
204 for i in range(n): 205 for nbr in self[i].out_nbrs: 206 line(vals[i], vals[self.index(nbr)]) 207 frame = plt.gca() 208 frame.axes.get_xaxis().set_visible(False) 209 frame.axes.get_yaxis().set_visible(False) 210 plt.show()
211
212 - def state(self):
213 """ 214 @return: A text representation of the state of all the Processes in the Network 215 """ 216 return [(str(process), dict(process.state)) for process in self]
217
218 - def clone(self):
219 return deepcopy(self)
220
221 - def __getitem__(self, i):
222 return self.processes[i]
223
224 - def __len__(self):
225 return len(self.processes)
226
227 - def __repr__(self):
228 return str(self.processes)
229
230 - def __iter__(self):
231 return iter(self.processes)
232
233 - def index(self, p):
234 return self.processes.index(p)
235 236
237 -class Algorithm:
238 """Abstract superclass for a distributed algorithm.""" 239 240 """Verbosity levels 241 242 Random updates >= VERBOSE 243 Process outputs >= DEFAULT 244 Algorithm results >= QUIET 245 SILENT : No output whatsoever 246 """ 247 SILENT, QUIET, DEFAULT, VERBOSE = 0, 1, 2, 3 248 249 """Default initialization of self.params""" 250 DEFAULT_PARAMS = {'draw' : False, 'verbosity': DEFAULT} 251
252 - def __init__(self, network = None, params = {}, name = None):
253 """ 254 @param network: [Optional] network. If specified, algorithm is immediately executed on network. 255 @param params: [Optional] runtime parameters. 256 @param name: [Optional] name of the Algorithm instance. Defaults to class name. 257 """ 258 self.params = deepcopy(Algorithm.DEFAULT_PARAMS) 259 for param,value in params.items(): 260 self.params[param] = value 261 self.message_count = 0 262 263 self.name = name 264 if name is None: 265 self.name = self.__class__.__name__ 266 if network is not None: 267 self(network, self.params)
268
269 - def msgs_i(self, p):
270 """Determines what messages a Process, p, will send.""" 271 pass
272
273 - def trans_i(self, p, msgs):
274 """Determines what state transition a Process, p, will perform, 275 having received messages, msgs""" 276 pass
277
278 - def halt_i(self, p):
279 """Returns True iff Process p has halted execution of the algorithm""" 280 return self not in p.algs
281
282 - def cleanup_i(self,p):
283 """Determines what final state transition a Process, p, will perform, 284 after the algorithm terminates.""" 285 pass
286
287 - def cleanup(self):
288 """Calls cleanup_i on all processes""" 289 for process in self.network: 290 self.cleanup_i(process) 291 if self in process.state: 292 del process.state[self]
293
294 - def __call__(self, network, params = {}):
295 """Same as run, allows an algorithm, A, to be executed like this: A()""" 296 self.run(network, params)
297
298 - def run(self, network, params = {}):
299 """ 300 Executes the algorithm on the network 301 302 @param network: the parameter to run in 303 @param params: runtime parameters 304 """ 305 for param,value in params.items(): 306 self.params[param] = value 307 308 self.message_count = 0 309 if self.params['verbosity'] >= Algorithm.DEFAULT: 310 header = "Running " + self.name + " on" 311 print len(header)*"-" 312 print header 313 if 'draw' in self.params and self.params['draw']: 314 network.draw() 315 print str(network) 316 317 self.network = network 318 network.add(self)
319
320 - def halt(self):
321 if all([self.halt_i(process) for process in self.network]): 322 self.halted = True 323 self.cleanup() 324 if self.params['verbosity'] >= Algorithm.QUIET: 325 self.print_algorithm_terminated()
326
328 print self.name+" Terminated" 329 msg_complexity = "Message Complexity: " + str(self.message_count) 330 print msg_complexity 331 print "-"*len(msg_complexity)
332
333 - def count_msg(self, message_count):
334 self.message_count += message_count
335
336 - def set(self, process, state, value):
337 process.state[self][state] = value
338
339 - def increment(self, process, state, inc=1):
340 process.state[self][state] += inc
341
342 - def has(self, process, state):
343 return state in process.state[self]
344
345 - def get(self, process, state):
346 if self.has(process, state): 347 return process.state[self][state]
348
349 - def delete(self, process, state):
350 if self.has(process, state): 351 del process.state[self][state]
352
353 - def output(self, process, key, val):
354 """ 355 Sets the publicly visible value of process.state[key] to val 356 357 This command is verbose if Algorithm's verbosity is >= DEFAULT 358 359 @param key: The state variable to set 360 @param val: The value to assign to it 361 """ 362 process.output(key, val, self.params['verbosity'] >= Algorithm.DEFAULT)
363 364
365 -class Synchronous_Algorithm(Algorithm):
366 """ 367 We assume that Processes take steps simultaneously, 368 that is, that execution proceeds in synchronous rounds. 369 """
370 - def run(self, network, params = {}):
371 Algorithm.run(self, network, params) 372 self.execute()
373
374 - def execute(self):
375 self.halted = False 376 self.r = 0 377 while not self.halted: 378 self.r+=1 379 if self.params['verbosity'] >= Algorithm.DEFAULT: 380 print "Round",self.r 381 self.round() 382 self.halt()
383
384 - def round(self):
385 """Executes a single round of the Synchronous Algorithm""" 386 self.msgs() 387 self.trans()
388
389 - def msgs(self):
390 for process in self.network: 391 if self.halt_i(process): continue 392 self.msgs_i(process)
393
394 - def trans(self):
395 for process in self.network: 396 if self.halt_i(process): continue 397 try: #Checks if function trans_i(self, p) is defined 398 self.trans_i(process) 399 except TypeError: #Otherwise, tries function trans_i(self, p, msgs) 400 self.trans_i(process, process.get_msgs(self))
401
403 print self.name+" Terminated" 404 msg_complexity = "Message Complexity: " + str(self.message_count) 405 print msg_complexity 406 time_complexity = "Time Complexity: " + str(self.r) 407 print time_complexity 408 print "-"*len(time_complexity)
409 410
411 -class Do_Nothing(Synchronous_Algorithm):
412 - def trans_i(self, p, messages): p.terminate(self)
413 414
415 -class Asynchronous_Algorithm(Algorithm):
416 """ 417 We assume that the separate Processes take steps 418 in an arbitrary order, at arbitrary relative speeds. 419 """ 420
421 - def run(self, network, params = {}):
422 Algorithm.run(self, network, params=params) 423 424 threads = [] 425 for process in network.processes: 426 thread = Thread(target = self.run_process, args = (process,)) 427 threads.append(thread) 428 thread.start() 429 430 for thread in threads: thread.join() 431 self.halt()
432
433 - def run_process(self, process):
434 while True: 435 sleep((random.random()+1.)/5.) 436 self.msgs_i(process) 437 if self.halt_i(process): 438 break 439 try: #Checks if function trans_i(self, p) is defined 440 self.trans_i(process) 441 except TypeError: #Otherwise, tries function trans_i(self, p, msgs) 442 self.trans_i(process, process.get_msgs(self)) 443 if self.halt_i(process): 444 break
445 446
447 -class Compose(Synchronous_Algorithm):
448 """ 449 A Synchonous_Algorithm that is the composition of two synchronous algorithms 450 running in parallel. 451 """ 452
453 - def __init__(self, A, B, name = None, params = None):
454 """ 455 @param A: an instance of Synchronous_Algorithm 456 @param B: an instance of Synchronous_Algorithm 457 @param name: [Optional] name of the Algorithm. Defaults to Compose(name of A, name of B) 458 @param params: [Optional] Runtime parameters 459 """ 460 assert isinstance(A,Synchronous_Algorithm), "Not a Synchronous_Algorithm" 461 assert isinstance(B,Synchronous_Algorithm), "Not a Synchronous_Algorithm" 462 self.A=A 463 self.B=B 464 self.message_count = 0 465 self.params = params or deepcopy(Algorithm.DEFAULT_PARAMS) 466 if name is None: 467 name = self.name="Compose("+self.A.name+","+self.B.name+")"
468
469 - def msgs_i(self, p):
470 self.A.r, self.B.r = self.r, self.r 471 self.A.msgs_i(p) 472 self.B.msgs_i(p)
473
474 - def trans_i(self, p, msgs):
475 self.A.r, self.B.r = self.r, self.r 476 self.A.trans_i(p, p.get_msgs(self.A)) 477 self.B.trans_i(p, p.get_msgs(self.B))
478
479 - def halt_i(self, p):
480 self.message_count = self.A.message_count + self.B.message_count 481 482 return self.A.halt_i(p) and self.B.halt_i(p)
483
484 - def cleanup_i(self, p):
485 self.message_count = self.A.message_count + self.B.message_count 486 self.A.cleanup_i(p) 487 self.B.cleanup_i(p) 488 p.terminate(self)
489
490 - def run(self, network, params = {}):
491 Algorithm.run(self, network, params) 492 self.network.add(self.A) 493 self.network.add(self.B) 494 self.execute()
495
496 - def __repr__(self): return self.name
497 498
499 -class Chain(Algorithm):
500 """ 501 An Algorithm that is the result of sequentially running two algorithms 502 """ 503
504 - def __init__(self, A, B, name = None, params = None):
505 """ 506 @param A: an instance of Algorithm 507 @param B: an instance of Algorithm 508 @param name: [Optional] name of the Algorithm. Defaults to Chain(A.name, B.name) 509 @param params: [Optional] Runtime parameters 510 """ 511 assert isinstance(A,Algorithm), "Not an Algorithm" 512 assert isinstance(B,Algorithm), "Not an Algorithm" 513 self.params = params or deepcopy(Algorithm.DEFAULT_PARAMS) 514 self.A = A 515 self.B = B 516 517 self.name = name or "Chain(" + A.name+","+B.name+")"
518
519 - def run(self, network, params = {}):
520 Algorithm.run(self, network, params) 521 self.A.run(network, params=self.params) 522 self.B.run(network, params=self.params) 523 self.message_count = self.A.message_count + self.B.message_count 524 self.halt() 525 Algorithm.cleanup(self)
526
527 - def __repr__(self): return self.name
528