test_gil.py 1.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667
  1. from __future__ import division, print_function, absolute_import
  2. import itertools
  3. import threading
  4. import time
  5. import numpy as np
  6. from numpy.testing import assert_equal
  7. import pytest
  8. import scipy.interpolate
  9. class TestGIL(object):
  10. """Check if the GIL is properly released by scipy.interpolate functions."""
  11. def setup_method(self):
  12. self.messages = []
  13. def log(self, message):
  14. self.messages.append(message)
  15. def make_worker_thread(self, target, args):
  16. log = self.log
  17. class WorkerThread(threading.Thread):
  18. def run(self):
  19. log('interpolation started')
  20. target(*args)
  21. log('interpolation complete')
  22. return WorkerThread()
  23. @pytest.mark.slow
  24. @pytest.mark.xfail(reason='race conditions, may depend on system load')
  25. def test_rectbivariatespline(self):
  26. def generate_params(n_points):
  27. x = y = np.linspace(0, 1000, n_points)
  28. x_grid, y_grid = np.meshgrid(x, y)
  29. z = x_grid * y_grid
  30. return x, y, z
  31. def calibrate_delay(requested_time):
  32. for n_points in itertools.count(5000, 1000):
  33. args = generate_params(n_points)
  34. time_started = time.time()
  35. interpolate(*args)
  36. if time.time() - time_started > requested_time:
  37. return args
  38. def interpolate(x, y, z):
  39. scipy.interpolate.RectBivariateSpline(x, y, z)
  40. args = calibrate_delay(requested_time=3)
  41. worker_thread = self.make_worker_thread(interpolate, args)
  42. worker_thread.start()
  43. for i in range(3):
  44. time.sleep(0.5)
  45. self.log('working')
  46. worker_thread.join()
  47. assert_equal(self.messages, [
  48. 'interpolation started',
  49. 'working',
  50. 'working',
  51. 'working',
  52. 'interpolation complete',
  53. ])