test_memcached_lock.py 1.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546
  1. # -*- coding: utf-8 -*-
  2. # !/usr/bin/env python
  3. import os
  4. from base import init_env
  5. os.environ.setdefault("DJANGO_SETTINGS_MODULE", "configs.testing")
  6. init_env(interactive = False)
  7. from concurrent.futures import as_completed
  8. from faker import Faker
  9. from gevent.threadpool import ThreadPoolExecutor
  10. from apilib.utils_sys import MemcachedLock
  11. TEST_DATA_NUMBER = 10000 # 测试的数据条目
  12. TEST_WORKER = 20 # 测试数据的worker
  13. nameList = [Faker(locale = 'zh_CN').name() for i in range(100)]
  14. executor = ThreadPoolExecutor(max_workers=TEST_WORKER)
  15. def call_back(future):
  16. pass
  17. def task():
  18. from django.core.cache import cache
  19. from apilib.utils_string import get_random_str
  20. import random
  21. locker = MemcachedLock(key = get_random_str(random.randint(1, 32)), value = str(Faker(locale = 'zh_CN').name()), expire = 180)
  22. if not locker.acquire():
  23. print('acquire {} failure.'.format(repr(locker)))
  24. import time
  25. time.sleep(random.randint(1, 5))
  26. locker.release()
  27. futures = [executor.submit(task).add_done_callback(call_back) for i in range(TEST_DATA_NUMBER)]
  28. as_completed(futures)
  29. executor.shutdown(wait=True)