123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308 |
- # Copyright (c) Twisted Matrix Laboratories.
- # See LICENSE for details.
- """
- Tests for L{twisted._threads._threadworker}.
- """
- from __future__ import absolute_import, division, print_function
- import gc
- import weakref
- from twisted.trial.unittest import SynchronousTestCase
- from threading import ThreadError, local
- from .. import ThreadWorker, LockWorker, AlreadyQuit
- class FakeQueueEmpty(Exception):
- """
- L{FakeQueue}'s C{get} has exhausted the queue.
- """
- class WouldDeadlock(Exception):
- """
- If this were a real lock, you'd be deadlocked because the lock would be
- double-acquired.
- """
- class FakeThread(object):
- """
- A fake L{threading.Thread}.
- @ivar target: A target function to run.
- @type target: L{callable}
- @ivar started: Has this thread been started?
- @type started: L{bool}
- """
- def __init__(self, target):
- """
- Create a L{FakeThread} with a target.
- """
- self.target = target
- self.started = False
- def start(self):
- """
- Set the "started" flag.
- """
- self.started = True
- class FakeQueue(object):
- """
- A fake L{Queue} implementing C{put} and C{get}.
- @ivar items: A lit of items placed by C{put} but not yet retrieved by
- C{get}.
- @type items: L{list}
- """
- def __init__(self):
- """
- Create a L{FakeQueue}.
- """
- self.items = []
- def put(self, item):
- """
- Put an item into the queue for later retrieval by L{FakeQueue.get}.
- @param item: any object
- """
- self.items.append(item)
- def get(self):
- """
- Get an item.
- @return: an item previously put by C{put}.
- """
- if not self.items:
- raise FakeQueueEmpty()
- return self.items.pop(0)
- class FakeLock(object):
- """
- A stand-in for L{threading.Lock}.
- @ivar acquired: Whether this lock is presently acquired.
- """
- def __init__(self):
- """
- Create a lock in the un-acquired state.
- """
- self.acquired = False
- def acquire(self):
- """
- Acquire the lock. Raise an exception if the lock is already acquired.
- """
- if self.acquired:
- raise WouldDeadlock()
- self.acquired = True
- def release(self):
- """
- Release the lock. Raise an exception if the lock is not presently
- acquired.
- """
- if not self.acquired:
- raise ThreadError()
- self.acquired = False
- class ThreadWorkerTests(SynchronousTestCase):
- """
- Tests for L{ThreadWorker}.
- """
- def setUp(self):
- """
- Create a worker with fake threads.
- """
- self.fakeThreads = []
- self.fakeQueue = FakeQueue()
- def startThread(target):
- newThread = FakeThread(target=target)
- newThread.start()
- self.fakeThreads.append(newThread)
- return newThread
- self.worker = ThreadWorker(startThread, self.fakeQueue)
- def test_startsThreadAndPerformsWork(self):
- """
- L{ThreadWorker} calls its C{createThread} callable to create a thread,
- its C{createQueue} callable to create a queue, and then the thread's
- target pulls work from that queue.
- """
- self.assertEqual(len(self.fakeThreads), 1)
- self.assertEqual(self.fakeThreads[0].started, True)
- def doIt():
- doIt.done = True
- doIt.done = False
- self.worker.do(doIt)
- self.assertEqual(doIt.done, False)
- self.assertRaises(FakeQueueEmpty, self.fakeThreads[0].target)
- self.assertEqual(doIt.done, True)
- def test_quitPreventsFutureCalls(self):
- """
- L{ThreadWorker.quit} causes future calls to L{ThreadWorker.do} and
- L{ThreadWorker.quit} to raise L{AlreadyQuit}.
- """
- self.worker.quit()
- self.assertRaises(AlreadyQuit, self.worker.quit)
- self.assertRaises(AlreadyQuit, self.worker.do, list)
- class LockWorkerTests(SynchronousTestCase):
- """
- Tests for L{LockWorker}.
- """
- def test_fakeDeadlock(self):
- """
- The L{FakeLock} test fixture will alert us if there's a potential
- deadlock.
- """
- lock = FakeLock()
- lock.acquire()
- self.assertRaises(WouldDeadlock, lock.acquire)
- def test_fakeDoubleRelease(self):
- """
- The L{FakeLock} test fixture will alert us if there's a potential
- double-release.
- """
- lock = FakeLock()
- self.assertRaises(ThreadError, lock.release)
- lock.acquire()
- self.assertEqual(None, lock.release())
- self.assertRaises(ThreadError, lock.release)
- def test_doExecutesImmediatelyWithLock(self):
- """
- L{LockWorker.do} immediately performs the work it's given, while the
- lock is acquired.
- """
- storage = local()
- lock = FakeLock()
- worker = LockWorker(lock, storage)
- def work():
- work.done = True
- work.acquired = lock.acquired
- work.done = False
- worker.do(work)
- self.assertEqual(work.done, True)
- self.assertEqual(work.acquired, True)
- self.assertEqual(lock.acquired, False)
- def test_doUnwindsReentrancy(self):
- """
- If L{LockWorker.do} is called recursively, it postpones the inner call
- until the outer one is complete.
- """
- lock = FakeLock()
- worker = LockWorker(lock, local())
- levels = []
- acquired = []
- def work():
- work.level += 1
- levels.append(work.level)
- acquired.append(lock.acquired)
- if len(levels) < 2:
- worker.do(work)
- work.level -= 1
- work.level = 0
- worker.do(work)
- self.assertEqual(levels, [1, 1])
- self.assertEqual(acquired, [True, True])
- def test_quit(self):
- """
- L{LockWorker.quit} frees the resources associated with its lock and
- causes further calls to C{do} and C{quit} to fail.
- """
- lock = FakeLock()
- ref = weakref.ref(lock)
- worker = LockWorker(lock, local())
- lock = None
- self.assertIsNot(ref(), None)
- worker.quit()
- gc.collect()
- self.assertIs(ref(), None)
- self.assertRaises(AlreadyQuit, worker.quit)
- self.assertRaises(AlreadyQuit, worker.do, list)
- def test_quitWhileWorking(self):
- """
- If L{LockWorker.quit} is invoked during a call to L{LockWorker.do}, all
- recursive work scheduled with L{LockWorker.do} will be completed and
- the lock will be released.
- """
- lock = FakeLock()
- ref = weakref.ref(lock)
- worker = LockWorker(lock, local())
- def phase1():
- worker.do(phase2)
- worker.quit()
- self.assertRaises(AlreadyQuit, worker.do, list)
- phase1.complete = True
- phase1.complete = False
- def phase2():
- phase2.complete = True
- phase2.acquired = lock.acquired
- phase2.complete = False
- worker.do(phase1)
- self.assertEqual(phase1.complete, True)
- self.assertEqual(phase2.complete, True)
- self.assertEqual(lock.acquired, False)
- lock = None
- gc.collect()
- self.assertIs(ref(), None)
- def test_quitWhileGettingLock(self):
- """
- If L{LockWorker.do} is called concurrently with L{LockWorker.quit}, and
- C{quit} wins the race before C{do} gets the lock attribute, then
- L{AlreadyQuit} will be raised.
- """
- class RacyLockWorker(LockWorker):
- def _lock_get(self):
- self.quit()
- return self.__dict__['_lock']
- def _lock_set(self, value):
- self.__dict__['_lock'] = value
- _lock = property(_lock_get, _lock_set)
- worker = RacyLockWorker(FakeLock(), local())
- self.assertRaises(AlreadyQuit, worker.do, list)
|