Source code for bytestag.events_test
from bytestag.events import EventReactor, Observer, EventID, asynchronous
import threading
import unittest
[docs]class TestEventReactor(unittest.TestCase):
[docs] def test_reactor(self):
'''It should process 1 event and then stop'''
my_id = EventID('my_id')
self.test_value = False
def my_callback(event_id):
self.assertEqual(my_id, event_id)
self.test_value = True
event_reactor = EventReactor()
event_reactor.register_handler(my_id, my_callback)
event_reactor.put(my_id)
event_reactor.put(EventReactor.STOP_ID)
event_reactor.start()
self.assertTrue(self.test_value)
[docs]class TestObserver(unittest.TestCase):
# TODO: test one shot
[docs] def test_observer(self):
'''It should activate the callback functions'''
observer = Observer()
def f1(s):
self.f1 = s
def f2(s):
self.f2 = s
observer.register(f1)
observer.register(f2)
observer('kitteh')
self.assertEqual('kitteh', self.f1)
self.assertEqual('kitteh', self.f2)
[docs]class TestAsync(unittest.TestCase):
[docs] def test_async(self):
v = None
@asynchronous()
def f():
nonlocal v
v = True # @UnusedVariable
thread = f()
self.assertIsInstance(thread, threading.Thread)
thread.join()
self.assertEqual(v, True)
@unittest.skip('Causes exception printout on Nose unit test')
[docs] def test_async_error(self):
v = None
@asynchronous()
def f():
nonlocal v
v = True # @UnusedVariable
raise Exception('my intentional exception')
thread = f()
self.assertIsInstance(thread, threading.Thread)
thread.join()
self.assertEqual(v, True)