# barrier1.py import thread import pybarrier from pybarrier import barrier from pybarrier import event # The rest of the file is a test case, that runs a number of parallelized # quicksorts in parallel. If it works, you'll get about 600 lines of # tracing output, with a line like # test passed! 209 threads created in all # as the last line. The content and order of preceding lines will # vary across runs. def _new_thread(func, *args): global TID tid.acquire(); id = TID = TID+1; tid.release() io.acquire(); alive.append(id); \ print 'starting thread', id, '--', len(alive), 'alive'; \ io.release() thread.start_new_thread( func, (id,) + args ) def _qsort(tid, a, l, r, finished): # sort a[l:r]; post finished when done io.acquire(); print 'thread', tid, 'qsort', l, r; io.release() if r-l > 1: pivot = a[l] j = l+1 # make a[l:j] <= pivot, and a[j:r] > pivot for i in range(j, r): if a[i] <= pivot: a[j], a[i] = a[i], a[j] j = j + 1 a[l], a[j-1] = a[j-1], pivot l_subarray_sorted = event() r_subarray_sorted = event() _new_thread(_qsort, a, l, j-1, l_subarray_sorted) _new_thread(_qsort, a, j, r, r_subarray_sorted) l_subarray_sorted.wait() r_subarray_sorted.wait() io.acquire(); print 'thread', tid, 'qsort done'; \ alive.remove(tid); io.release() finished.post() def _randarray(tid, a, finished): io.acquire(); print 'thread', tid, 'randomizing array'; \ io.release() for i in range(1, len(a)): wh.acquire(); j = randint(0,i); wh.release() a[i], a[j] = a[j], a[i] io.acquire(); print 'thread', tid, 'randomizing done'; \ alive.remove(tid); io.release() finished.post() def _check_sort(a): if a != range(len(a)): raise ValueError, ('a not sorted', a) def _run_one_sort(tid, a, bar, done): # randomize a, and quicksort it # for variety, all the threads running this enter a barrier # at the end, and post `done' after the barrier exits io.acquire(); print 'thread', tid, 'randomizing', a; \ io.release() finished = event() _new_thread(_randarray, a, finished) finished.wait() io.acquire(); print 'thread', tid, 'sorting', a; io.release() finished.clear() _new_thread(_qsort, a, 0, len(a), finished) finished.wait() _check_sort(a) io.acquire(); print 'thread', tid, 'entering barrier'; \ io.release() bar.enter() io.acquire(); print 'thread', tid, 'leaving barrier'; \ io.release() io.acquire(); alive.remove(tid); io.release() bar.enter() # make sure they've all removed themselves from alive ## before 'done' is posted bar.enter() # just to be cruel done.post() def test(): global TID, tid, io, wh, randint, alive import random randint = random.randint TID = 0 # thread ID (1, 2, ...) tid = thread.allocate_lock() # for changing TID io = thread.allocate_lock() # for printing, and 'alive' wh = thread.allocate_lock() # for calls to random alive = [] # IDs of active threads NSORTS = 5 arrays = [] for i in range(NSORTS): arrays.append( range( (i+1)*10 ) ) bar = barrier(NSORTS) finished = event() for i in range(NSORTS): _new_thread(_run_one_sort, arrays[i], bar, finished) finished.wait() print 'all threads done, and checking results ...' if alive: raise ValueError, ('threads still alive at end', alive) for i in range(NSORTS): a = arrays[i] if len(a) != (i+1)*10: raise ValueError, ('length of array', i, 'screwed up') _check_sort(a) print 'test passed!', TID, 'threads created in all' if __name__ == '__main__': test() # end of module