12345678910111213141516171819202122232425262728293031323334353637383940414243444546 |
- # -*- coding: utf-8 -*-
- # !/usr/bin/env python
- import os
- from base import init_env
- os.environ.setdefault("DJANGO_SETTINGS_MODULE", "configs.testing")
- init_env(interactive = False)
- from concurrent.futures import as_completed
- from faker import Faker
- from gevent.threadpool import ThreadPoolExecutor
- from apilib.utils_sys import MemcachedLock
- TEST_DATA_NUMBER = 10000 # 测试的数据条目
- TEST_WORKER = 20 # 测试数据的worker
- nameList = [Faker(locale = 'zh_CN').name() for i in range(100)]
- executor = ThreadPoolExecutor(max_workers=TEST_WORKER)
- def call_back(future):
- pass
- def task():
- from django.core.cache import cache
- from apilib.utils_string import get_random_str
- import random
- locker = MemcachedLock(key = get_random_str(random.randint(1, 32)), value = str(Faker(locale = 'zh_CN').name()), expire = 180)
- if not locker.acquire():
- print('acquire {} failure.'.format(repr(locker)))
- import time
- time.sleep(random.randint(1, 5))
- locker.release()
- futures = [executor.submit(task).add_done_callback(call_back) for i in range(TEST_DATA_NUMBER)]
- as_completed(futures)
- executor.shutdown(wait=True)
|