test_nginx.py 793 B

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950
  1. import requests
  2. import os
  3. from concurrent.futures import ThreadPoolExecutor, as_completed
  4. from base import init_env
  5. init_env(True)
  6. TOTAL = 10000
  7. WORKER = 100
  8. LOGICAL_CODE = "123456"
  9. domain = os.environ.get("MY_DOMAIN")
  10. URL = "http://{}/userLogin?l={}".format(domain, LOGICAL_CODE)
  11. HEADER = {
  12. "user-agent": ""
  13. }
  14. # print URL
  15. # raise Exception("123")
  16. def request_url():
  17. try:
  18. res = requests.get(url=URL, headers=HEADER, timeout = 15)
  19. except Exception as e:
  20. return "error"
  21. else:
  22. print res.status_code
  23. return res.status_code
  24. with ThreadPoolExecutor(max_workers=WORKER) as excutor:
  25. tasks = list()
  26. for i in range(TOTAL):
  27. tasks.append(excutor.submit(request_url))
  28. for task in as_completed(tasks):
  29. print task.result()