test_fileio.py 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131
  1. # encoding: utf-8
  2. """Tests for file IO"""
  3. # Copyright (c) Jupyter Development Team.
  4. # Distributed under the terms of the Modified BSD License.
  5. import io as stdlib_io
  6. import os.path
  7. import stat
  8. import nose.tools as nt
  9. from ipython_genutils.testing.decorators import skip_win32
  10. from ..fileio import atomic_writing
  11. from ipython_genutils.tempdir import TemporaryDirectory
  12. umask = 0
  13. def test_atomic_writing():
  14. class CustomExc(Exception): pass
  15. with TemporaryDirectory() as td:
  16. f1 = os.path.join(td, 'penguin')
  17. with stdlib_io.open(f1, 'w') as f:
  18. f.write(u'Before')
  19. if os.name != 'nt':
  20. os.chmod(f1, 0o701)
  21. orig_mode = stat.S_IMODE(os.stat(f1).st_mode)
  22. f2 = os.path.join(td, 'flamingo')
  23. try:
  24. os.symlink(f1, f2)
  25. have_symlink = True
  26. except (AttributeError, NotImplementedError, OSError):
  27. # AttributeError: Python doesn't support it
  28. # NotImplementedError: The system doesn't support it
  29. # OSError: The user lacks the privilege (Windows)
  30. have_symlink = False
  31. with nt.assert_raises(CustomExc):
  32. with atomic_writing(f1) as f:
  33. f.write(u'Failing write')
  34. raise CustomExc
  35. # Because of the exception, the file should not have been modified
  36. with stdlib_io.open(f1, 'r') as f:
  37. nt.assert_equal(f.read(), u'Before')
  38. with atomic_writing(f1) as f:
  39. f.write(u'Overwritten')
  40. with stdlib_io.open(f1, 'r') as f:
  41. nt.assert_equal(f.read(), u'Overwritten')
  42. if os.name != 'nt':
  43. mode = stat.S_IMODE(os.stat(f1).st_mode)
  44. nt.assert_equal(mode, orig_mode)
  45. if have_symlink:
  46. # Check that writing over a file preserves a symlink
  47. with atomic_writing(f2) as f:
  48. f.write(u'written from symlink')
  49. with stdlib_io.open(f1, 'r') as f:
  50. nt.assert_equal(f.read(), u'written from symlink')
  51. def _save_umask():
  52. global umask
  53. umask = os.umask(0)
  54. os.umask(umask)
  55. def _restore_umask():
  56. os.umask(umask)
  57. @skip_win32
  58. @nt.with_setup(_save_umask, _restore_umask)
  59. def test_atomic_writing_umask():
  60. with TemporaryDirectory() as td:
  61. os.umask(0o022)
  62. f1 = os.path.join(td, '1')
  63. with atomic_writing(f1) as f:
  64. f.write(u'1')
  65. mode = stat.S_IMODE(os.stat(f1).st_mode)
  66. nt.assert_equal(mode, 0o644, '{:o} != 644'.format(mode))
  67. os.umask(0o057)
  68. f2 = os.path.join(td, '2')
  69. with atomic_writing(f2) as f:
  70. f.write(u'2')
  71. mode = stat.S_IMODE(os.stat(f2).st_mode)
  72. nt.assert_equal(mode, 0o620, '{:o} != 620'.format(mode))
  73. def test_atomic_writing_newlines():
  74. with TemporaryDirectory() as td:
  75. path = os.path.join(td, 'testfile')
  76. lf = u'a\nb\nc\n'
  77. plat = lf.replace(u'\n', os.linesep)
  78. crlf = lf.replace(u'\n', u'\r\n')
  79. # test default
  80. with stdlib_io.open(path, 'w') as f:
  81. f.write(lf)
  82. with stdlib_io.open(path, 'r', newline='') as f:
  83. read = f.read()
  84. nt.assert_equal(read, plat)
  85. # test newline=LF
  86. with stdlib_io.open(path, 'w', newline='\n') as f:
  87. f.write(lf)
  88. with stdlib_io.open(path, 'r', newline='') as f:
  89. read = f.read()
  90. nt.assert_equal(read, lf)
  91. # test newline=CRLF
  92. with atomic_writing(path, newline='\r\n') as f:
  93. f.write(lf)
  94. with stdlib_io.open(path, 'r', newline='') as f:
  95. read = f.read()
  96. nt.assert_equal(read, crlf)
  97. # test newline=no convert
  98. text = u'crlf\r\ncr\rlf\n'
  99. with atomic_writing(path, newline='') as f:
  100. f.write(text)
  101. with stdlib_io.open(path, 'r', newline='') as f:
  102. read = f.read()
  103. nt.assert_equal(read, text)