1 import random
2 from random import shuffle
3 import threading
4 from threading import Thread
5 from time import sleep
6 import pdb
7 import collections
8 from collections import defaultdict
9 from copy import deepcopy
10
12 """
13 A Message
14
15 Attributes:
16 - content: The content of this Message
17 - algorithm: The Algorithm that required the sending of this Message
18 - author: The Process that sent it
19 """
20 - def __init__(self, algorithm, content= None):
21 """
22 @param algorithm: the Algorithm that required the sending of this Message
23 @param content: The content of this Message. Can be any type, including None.
24 """
25 assert isinstance(algorithm, Algorithm), "Message algorithm must be an instance of Algorithm"
26 self.content = content
27 self.algorithm = algorithm
28 self.author = None
29
31 return self.__class__.__name__+"("+str(self.content)+")"
32
33
35 """A computing element located at a node of a network graph.
36 Processes are identical except for their UID"""
37 - def __init__(self, UID, state = None, in_nbrs = [], out_nbrs = []):
38 self.UID = UID
39 self.state = defaultdict(dict)
40
41 self.in_nbrs = in_nbrs or []
42 self.out_nbrs = out_nbrs or []
43
44 self.in_channel = {}
45 self.algs = set()
46
48 """Adds a new outgoing neighbor of the Process"""
49 if new_out_nbr not in self.out_nbrs:
50 self.out_nbrs.append(new_out_nbr)
51 if self not in new_out_nbr.in_nbrs:
52 new_out_nbr.in_nbrs.append(self)
53
55 """Adds a new out_nbr of the Process, and adds the
56 Process as an out_nbr of that neighbor"""
57 self.link_to(nbr)
58 nbr.link_to(self)
59
60 - def output(self, key, val, verbose=True):
61 """
62 Sets the publicly visible value of self.state[key] to val
63
64 @param key: The state variable to set
65 @param val: The value to assign to it
66 @param verbose: Dictates whether or not to print this event to STDOUT
67 """
68 self.state[key] = val
69 if verbose:
70 if isinstance(val, list):
71 print str(self)+"."+str(key), "are", [str(v) for v in val]
72 else:
73 print str(self)+"."+str(key),"is", val
74
76 """Sends a message to all out_nbrs
77
78 @param msg: the message to send
79 """
80 self.send_msg(msg)
81
83 """
84 Sends a Message from Process to some subset of out_nbrs
85
86 @param msg: The message to send.
87 @param out_nbrs: The out_nbrs to send the message to. This may be a
88 subset of the Process's out_nbrs, or None, in which case the message
89 will be sent to all out_nbrs
90
91 Effects:
92 - Sets msg.author = self
93 """
94 msg.author = self
95 if out_nbrs is None:
96 out_nbrs = self.out_nbrs
97 elif isinstance(out_nbrs, Process):
98 out_nbrs = [out_nbrs]
99 if isinstance(out_nbrs, list):
100 msg.algorithm.count_msg(len(out_nbrs))
101 for nbr in out_nbrs:
102 if nbr.in_nbrs.index(self) in nbr.in_channel:
103 nbr.in_channel[nbr.in_nbrs.index(self)].append(msg)
104 else:
105 nbr.in_channel[nbr.in_nbrs.index(self)] = [msg]
106 else:
107 raise Exception("incorrect type for out_nbrs argument of Process.send_msg()")
108
109 - def get_msgs(self, algorithm, in_nbrs = None):
110 """Removes all Messages that relate to a particular Algorithm from the Process'
111 incoming channels (or from some subset of incoming channels). Returns them.
112
113 @param algorithm: the algorithm whose messages this returns
114 @param in_nbrs: the in_nbrs of the Process from whose channels we are getting
115 messages. If None, fetches messages from all channels
116 @return: A list of Messages, msgs, such that every message in msgs has Algorithm
117 algorithm, and author in in_nbrs
118 """
119 if in_nbrs is None:
120 in_nbrs = self.in_nbrs[:]
121 elif isinstance(in_nbrs, Process):
122 in_nbrs = [in_nbrs]
123
124 if isinstance(in_nbrs, list):
125 msgs = []
126 for in_nbr in in_nbrs:
127 idx = self.in_nbrs.index(in_nbr)
128 if idx in self.in_channel:
129 i = 0
130 while i < len(self.in_channel[idx]):
131 msg = self.in_channel[idx][i]
132 if msg.algorithm == algorithm:
133 self.in_channel[idx].pop(i)
134 msgs.append(msg)
135 else:
136 i+=1
137 return msgs
138 else:
139 raise Exception("incorrect type for in_nbrs argument of Process.get_msgs()")
140
141 - def add(self, algorithm):
142 """Causes the Process to wake up with respect to algorithm"""
143 self.algs.add(algorithm)
144 self.state[algorithm]["diam"] = self.state['n']
145
147 """Causes the Process to halt execution of algorithm"""
148 if algorithm in self.algs:
149 self.algs.remove(algorithm)
150
152 return "P"+str(self.UID)
153
155 return "P" + str(self.UID) + " -> {"+", ".join([str(process) for process in self.out_nbrs]) + "}"
156
157
159 """ A collection of Processes that know n, the # of processes in the network."""
160
162 self.processes = processes
163
164 - def __init__(self, n, index_to_UID = None):
165 """
166 Creates a network of n disconnected Processes,
167 with random distinct UIDs, or as specified by
168 the index_to_UID function
169 """
170 if index_to_UID is None:
171 self.processes = [Process(i) for i in range(n)]
172 shuffle(self.processes)
173 else:
174 self.processes = [Process(index_to_UID(i)) for i in range(n)]
175 for process in self:
176 process.state['n'] = n
177
178 - def add(self, algorithm):
179 """Awakens all Processes in the Network with respect to algorithm"""
180 for process in self:
181 process.add(algorithm)
182
183 - def run(self, algorithm):
184 """Runs algorithm on the Network"""
185 algorithm(self)
186
188 """Draws the Network"""
189 import math
190 from matplotlib import pyplot as plt
191 n = len(self)
192
193 vals = []
194
195 for k in range(n):
196 vals.append( [math.cos(2*k*math.pi/n), math.sin(2*k*math.pi/n) ] )
197
198 plt.xlim([-1.2, 1.2])
199 plt.ylim([-1.2, 1.2])
200 plt.plot( [v[0] for v in vals], [v[1] for v in vals], 'ro' )
201
202 def line(v1, v2):
203 plt.plot( (v1[0], v2[0]), (v1[1], v2[1] ))
204 for i in range(n):
205 for nbr in self[i].out_nbrs:
206 line(vals[i], vals[self.index(nbr)])
207 frame = plt.gca()
208 frame.axes.get_xaxis().set_visible(False)
209 frame.axes.get_yaxis().set_visible(False)
210 plt.show()
211
213 """
214 @return: A text representation of the state of all the Processes in the Network
215 """
216 return [(str(process), dict(process.state)) for process in self]
217
219 return deepcopy(self)
220
222 return self.processes[i]
223
225 return len(self.processes)
226
228 return str(self.processes)
229
231 return iter(self.processes)
232
234 return self.processes.index(p)
235
236
238 """Abstract superclass for a distributed algorithm."""
239
240 """Verbosity levels
241
242 Random updates >= VERBOSE
243 Process outputs >= DEFAULT
244 Algorithm results >= QUIET
245 SILENT : No output whatsoever
246 """
247 SILENT, QUIET, DEFAULT, VERBOSE = 0, 1, 2, 3
248
249 """Default initialization of self.params"""
250 DEFAULT_PARAMS = {'draw' : False, 'verbosity': DEFAULT}
251
252 - def __init__(self, network = None, params = {}, name = None):
253 """
254 @param network: [Optional] network. If specified, algorithm is immediately executed on network.
255 @param params: [Optional] runtime parameters.
256 @param name: [Optional] name of the Algorithm instance. Defaults to class name.
257 """
258 self.params = deepcopy(Algorithm.DEFAULT_PARAMS)
259 for param,value in params.items():
260 self.params[param] = value
261 self.message_count = 0
262
263 self.name = name
264 if name is None:
265 self.name = self.__class__.__name__
266 if network is not None:
267 self(network, self.params)
268
270 """Determines what messages a Process, p, will send."""
271 pass
272
274 """Determines what state transition a Process, p, will perform,
275 having received messages, msgs"""
276 pass
277
279 """Returns True iff Process p has halted execution of the algorithm"""
280 return self not in p.algs
281
283 """Determines what final state transition a Process, p, will perform,
284 after the algorithm terminates."""
285 pass
286
288 """Calls cleanup_i on all processes"""
289 for process in self.network:
290 self.cleanup_i(process)
291 if self in process.state:
292 del process.state[self]
293
294 - def __call__(self, network, params = {}):
295 """Same as run, allows an algorithm, A, to be executed like this: A()"""
296 self.run(network, params)
297
298 - def run(self, network, params = {}):
299 """
300 Executes the algorithm on the network
301
302 @param network: the parameter to run in
303 @param params: runtime parameters
304 """
305 for param,value in params.items():
306 self.params[param] = value
307
308 self.message_count = 0
309 if self.params['verbosity'] >= Algorithm.DEFAULT:
310 header = "Running " + self.name + " on"
311 print len(header)*"-"
312 print header
313 if 'draw' in self.params and self.params['draw']:
314 network.draw()
315 print str(network)
316
317 self.network = network
318 network.add(self)
319
326
328 print self.name+" Terminated"
329 msg_complexity = "Message Complexity: " + str(self.message_count)
330 print msg_complexity
331 print "-"*len(msg_complexity)
332
334 self.message_count += message_count
335
336 - def set(self, process, state, value):
338
341
342 - def has(self, process, state):
344
345 - def get(self, process, state):
348
349 - def delete(self, process, state):
352
353 - def output(self, process, key, val):
354 """
355 Sets the publicly visible value of process.state[key] to val
356
357 This command is verbose if Algorithm's verbosity is >= DEFAULT
358
359 @param key: The state variable to set
360 @param val: The value to assign to it
361 """
362 process.output(key, val, self.params['verbosity'] >= Algorithm.DEFAULT)
363
364
366 """
367 We assume that Processes take steps simultaneously,
368 that is, that execution proceeds in synchronous rounds.
369 """
370 - def run(self, network, params = {}):
373
375 self.halted = False
376 self.r = 0
377 while not self.halted:
378 self.r+=1
379 if self.params['verbosity'] >= Algorithm.DEFAULT:
380 print "Round",self.r
381 self.round()
382 self.halt()
383
385 """Executes a single round of the Synchronous Algorithm"""
386 self.msgs()
387 self.trans()
388
390 for process in self.network:
391 if self.halt_i(process): continue
392 self.msgs_i(process)
393
395 for process in self.network:
396 if self.halt_i(process): continue
397 try:
398 self.trans_i(process)
399 except TypeError:
400 self.trans_i(process, process.get_msgs(self))
401
403 print self.name+" Terminated"
404 msg_complexity = "Message Complexity: " + str(self.message_count)
405 print msg_complexity
406 time_complexity = "Time Complexity: " + str(self.r)
407 print time_complexity
408 print "-"*len(time_complexity)
409
410
413
414
416 """
417 We assume that the separate Processes take steps
418 in an arbitrary order, at arbitrary relative speeds.
419 """
420
421 - def run(self, network, params = {}):
422 Algorithm.run(self, network, params=params)
423
424 threads = []
425 for process in network.processes:
426 thread = Thread(target = self.run_process, args = (process,))
427 threads.append(thread)
428 thread.start()
429
430 for thread in threads: thread.join()
431 self.halt()
432
434 while True:
435 sleep((random.random()+1.)/5.)
436 self.msgs_i(process)
437 if self.halt_i(process):
438 break
439 try:
440 self.trans_i(process)
441 except TypeError:
442 self.trans_i(process, process.get_msgs(self))
443 if self.halt_i(process):
444 break
445
446
447 -class Compose(Synchronous_Algorithm):
448 """
449 A Synchonous_Algorithm that is the composition of two synchronous algorithms
450 running in parallel.
451 """
452
453 - def __init__(self, A, B, name = None, params = None):
454 """
455 @param A: an instance of Synchronous_Algorithm
456 @param B: an instance of Synchronous_Algorithm
457 @param name: [Optional] name of the Algorithm. Defaults to Compose(name of A, name of B)
458 @param params: [Optional] Runtime parameters
459 """
460 assert isinstance(A,Synchronous_Algorithm), "Not a Synchronous_Algorithm"
461 assert isinstance(B,Synchronous_Algorithm), "Not a Synchronous_Algorithm"
462 self.A=A
463 self.B=B
464 self.message_count = 0
465 self.params = params or deepcopy(Algorithm.DEFAULT_PARAMS)
466 if name is None:
467 name = self.name="Compose("+self.A.name+","+self.B.name+")"
468
470 self.A.r, self.B.r = self.r, self.r
471 self.A.msgs_i(p)
472 self.B.msgs_i(p)
473
478
480 self.message_count = self.A.message_count + self.B.message_count
481
482 return self.A.halt_i(p) and self.B.halt_i(p)
483
485 self.message_count = self.A.message_count + self.B.message_count
486 self.A.cleanup_i(p)
487 self.B.cleanup_i(p)
488 p.terminate(self)
489
490 - def run(self, network, params = {}):
495
497
498
500 """
501 An Algorithm that is the result of sequentially running two algorithms
502 """
503
504 - def __init__(self, A, B, name = None, params = None):
505 """
506 @param A: an instance of Algorithm
507 @param B: an instance of Algorithm
508 @param name: [Optional] name of the Algorithm. Defaults to Chain(A.name, B.name)
509 @param params: [Optional] Runtime parameters
510 """
511 assert isinstance(A,Algorithm), "Not an Algorithm"
512 assert isinstance(B,Algorithm), "Not an Algorithm"
513 self.params = params or deepcopy(Algorithm.DEFAULT_PARAMS)
514 self.A = A
515 self.B = B
516
517 self.name = name or "Chain(" + A.name+","+B.name+")"
518
519 - def run(self, network, params = {}):
520 Algorithm.run(self, network, params)
521 self.A.run(network, params=self.params)
522 self.B.run(network, params=self.params)
523 self.message_count = self.A.message_count + self.B.message_count
524 self.halt()
525 Algorithm.cleanup(self)
526
528