sftp_file.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556
  1. # Copyright (C) 2003-2007 Robey Pointer <robeypointer@gmail.com>
  2. #
  3. # This file is part of paramiko.
  4. #
  5. # Paramiko is free software; you can redistribute it and/or modify it under the
  6. # terms of the GNU Lesser General Public License as published by the Free
  7. # Software Foundation; either version 2.1 of the License, or (at your option)
  8. # any later version.
  9. #
  10. # Paramiko is distributed in the hope that it will be useful, but WITHOUT ANY
  11. # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
  12. # A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
  13. # details.
  14. #
  15. # You should have received a copy of the GNU Lesser General Public License
  16. # along with Paramiko; if not, write to the Free Software Foundation, Inc.,
  17. # 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
  18. """
  19. SFTP file object
  20. """
  21. from __future__ import with_statement
  22. from binascii import hexlify
  23. from collections import deque
  24. import socket
  25. import threading
  26. import time
  27. from paramiko.common import DEBUG
  28. from paramiko.file import BufferedFile
  29. from paramiko.py3compat import long
  30. from paramiko.sftp import (
  31. CMD_CLOSE, CMD_READ, CMD_DATA, SFTPError, CMD_WRITE, CMD_STATUS, CMD_FSTAT,
  32. CMD_ATTRS, CMD_FSETSTAT, CMD_EXTENDED,
  33. )
  34. from paramiko.sftp_attr import SFTPAttributes
  35. class SFTPFile (BufferedFile):
  36. """
  37. Proxy object for a file on the remote server, in client mode SFTP.
  38. Instances of this class may be used as context managers in the same way
  39. that built-in Python file objects are.
  40. """
  41. # Some sftp servers will choke if you send read/write requests larger than
  42. # this size.
  43. MAX_REQUEST_SIZE = 32768
  44. def __init__(self, sftp, handle, mode='r', bufsize=-1):
  45. BufferedFile.__init__(self)
  46. self.sftp = sftp
  47. self.handle = handle
  48. BufferedFile._set_mode(self, mode, bufsize)
  49. self.pipelined = False
  50. self._prefetching = False
  51. self._prefetch_done = False
  52. self._prefetch_data = {}
  53. self._prefetch_extents = {}
  54. self._prefetch_lock = threading.Lock()
  55. self._saved_exception = None
  56. self._reqs = deque()
  57. def __del__(self):
  58. self._close(async=True)
  59. def close(self):
  60. """
  61. Close the file.
  62. """
  63. self._close(async=False)
  64. def _close(self, async=False):
  65. # We allow double-close without signaling an error, because real
  66. # Python file objects do. However, we must protect against actually
  67. # sending multiple CMD_CLOSE packets, because after we close our
  68. # handle, the same handle may be re-allocated by the server, and we
  69. # may end up mysteriously closing some random other file. (This is
  70. # especially important because we unconditionally call close() from
  71. # __del__.)
  72. if self._closed:
  73. return
  74. self.sftp._log(DEBUG, 'close(%s)' % hexlify(self.handle))
  75. if self.pipelined:
  76. self.sftp._finish_responses(self)
  77. BufferedFile.close(self)
  78. try:
  79. if async:
  80. # GC'd file handle could be called from an arbitrary thread
  81. # -- don't wait for a response
  82. self.sftp._async_request(type(None), CMD_CLOSE, self.handle)
  83. else:
  84. self.sftp._request(CMD_CLOSE, self.handle)
  85. except EOFError:
  86. # may have outlived the Transport connection
  87. pass
  88. except (IOError, socket.error):
  89. # may have outlived the Transport connection
  90. pass
  91. def _data_in_prefetch_requests(self, offset, size):
  92. k = [x for x in list(self._prefetch_extents.values())
  93. if x[0] <= offset]
  94. if len(k) == 0:
  95. return False
  96. k.sort(key=lambda x: x[0])
  97. buf_offset, buf_size = k[-1]
  98. if buf_offset + buf_size <= offset:
  99. # prefetch request ends before this one begins
  100. return False
  101. if buf_offset + buf_size >= offset + size:
  102. # inclusive
  103. return True
  104. # well, we have part of the request. see if another chunk has
  105. # the rest.
  106. return self._data_in_prefetch_requests(
  107. buf_offset + buf_size,
  108. offset + size - buf_offset - buf_size)
  109. def _data_in_prefetch_buffers(self, offset):
  110. """
  111. if a block of data is present in the prefetch buffers, at the given
  112. offset, return the offset of the relevant prefetch buffer. otherwise,
  113. return None. this guarantees nothing about the number of bytes
  114. collected in the prefetch buffer so far.
  115. """
  116. k = [i for i in self._prefetch_data.keys() if i <= offset]
  117. if len(k) == 0:
  118. return None
  119. index = max(k)
  120. buf_offset = offset - index
  121. if buf_offset >= len(self._prefetch_data[index]):
  122. # it's not here
  123. return None
  124. return index
  125. def _read_prefetch(self, size):
  126. """
  127. read data out of the prefetch buffer, if possible. if the data isn't
  128. in the buffer, return None. otherwise, behaves like a normal read.
  129. """
  130. # while not closed, and haven't fetched past the current position,
  131. # and haven't reached EOF...
  132. while True:
  133. offset = self._data_in_prefetch_buffers(self._realpos)
  134. if offset is not None:
  135. break
  136. if self._prefetch_done or self._closed:
  137. break
  138. self.sftp._read_response()
  139. self._check_exception()
  140. if offset is None:
  141. self._prefetching = False
  142. return None
  143. prefetch = self._prefetch_data[offset]
  144. del self._prefetch_data[offset]
  145. buf_offset = self._realpos - offset
  146. if buf_offset > 0:
  147. self._prefetch_data[offset] = prefetch[:buf_offset]
  148. prefetch = prefetch[buf_offset:]
  149. if size < len(prefetch):
  150. self._prefetch_data[self._realpos + size] = prefetch[size:]
  151. prefetch = prefetch[:size]
  152. return prefetch
  153. def _read(self, size):
  154. size = min(size, self.MAX_REQUEST_SIZE)
  155. if self._prefetching:
  156. data = self._read_prefetch(size)
  157. if data is not None:
  158. return data
  159. t, msg = self.sftp._request(
  160. CMD_READ,
  161. self.handle,
  162. long(self._realpos),
  163. int(size)
  164. )
  165. if t != CMD_DATA:
  166. raise SFTPError('Expected data')
  167. return msg.get_string()
  168. def _write(self, data):
  169. # may write less than requested if it would exceed max packet size
  170. chunk = min(len(data), self.MAX_REQUEST_SIZE)
  171. sftp_async_request = self.sftp._async_request(
  172. type(None),
  173. CMD_WRITE,
  174. self.handle,
  175. long(self._realpos),
  176. data[:chunk]
  177. )
  178. self._reqs.append(sftp_async_request)
  179. if (
  180. not self.pipelined or
  181. (len(self._reqs) > 100 and self.sftp.sock.recv_ready())
  182. ):
  183. while len(self._reqs):
  184. req = self._reqs.popleft()
  185. t, msg = self.sftp._read_response(req)
  186. if t != CMD_STATUS:
  187. raise SFTPError('Expected status')
  188. # convert_status already called
  189. return chunk
  190. def settimeout(self, timeout):
  191. """
  192. Set a timeout on read/write operations on the underlying socket or
  193. ssh `.Channel`.
  194. :param float timeout:
  195. seconds to wait for a pending read/write operation before raising
  196. ``socket.timeout``, or ``None`` for no timeout
  197. .. seealso:: `.Channel.settimeout`
  198. """
  199. self.sftp.sock.settimeout(timeout)
  200. def gettimeout(self):
  201. """
  202. Returns the timeout in seconds (as a `float`) associated with the
  203. socket or ssh `.Channel` used for this file.
  204. .. seealso:: `.Channel.gettimeout`
  205. """
  206. return self.sftp.sock.gettimeout()
  207. def setblocking(self, blocking):
  208. """
  209. Set blocking or non-blocking mode on the underiying socket or ssh
  210. `.Channel`.
  211. :param int blocking:
  212. 0 to set non-blocking mode; non-0 to set blocking mode.
  213. .. seealso:: `.Channel.setblocking`
  214. """
  215. self.sftp.sock.setblocking(blocking)
  216. def seekable(self):
  217. """
  218. Check if the file supports random access.
  219. :return:
  220. `True` if the file supports random access. If `False`,
  221. :meth:`seek` will raise an exception
  222. """
  223. return True
  224. def seek(self, offset, whence=0):
  225. """
  226. Set the file's current position.
  227. See `file.seek` for details.
  228. """
  229. self.flush()
  230. if whence == self.SEEK_SET:
  231. self._realpos = self._pos = offset
  232. elif whence == self.SEEK_CUR:
  233. self._pos += offset
  234. self._realpos = self._pos
  235. else:
  236. self._realpos = self._pos = self._get_size() + offset
  237. self._rbuffer = bytes()
  238. def stat(self):
  239. """
  240. Retrieve information about this file from the remote system. This is
  241. exactly like `.SFTPClient.stat`, except that it operates on an
  242. already-open file.
  243. :returns:
  244. an `.SFTPAttributes` object containing attributes about this file.
  245. """
  246. t, msg = self.sftp._request(CMD_FSTAT, self.handle)
  247. if t != CMD_ATTRS:
  248. raise SFTPError('Expected attributes')
  249. return SFTPAttributes._from_msg(msg)
  250. def chmod(self, mode):
  251. """
  252. Change the mode (permissions) of this file. The permissions are
  253. unix-style and identical to those used by Python's `os.chmod`
  254. function.
  255. :param int mode: new permissions
  256. """
  257. self.sftp._log(DEBUG, 'chmod(%s, %r)' % (hexlify(self.handle), mode))
  258. attr = SFTPAttributes()
  259. attr.st_mode = mode
  260. self.sftp._request(CMD_FSETSTAT, self.handle, attr)
  261. def chown(self, uid, gid):
  262. """
  263. Change the owner (``uid``) and group (``gid``) of this file. As with
  264. Python's `os.chown` function, you must pass both arguments, so if you
  265. only want to change one, use `stat` first to retrieve the current
  266. owner and group.
  267. :param int uid: new owner's uid
  268. :param int gid: new group id
  269. """
  270. self.sftp._log(
  271. DEBUG,
  272. 'chown(%s, %r, %r)' % (hexlify(self.handle), uid, gid))
  273. attr = SFTPAttributes()
  274. attr.st_uid, attr.st_gid = uid, gid
  275. self.sftp._request(CMD_FSETSTAT, self.handle, attr)
  276. def utime(self, times):
  277. """
  278. Set the access and modified times of this file. If
  279. ``times`` is ``None``, then the file's access and modified times are
  280. set to the current time. Otherwise, ``times`` must be a 2-tuple of
  281. numbers, of the form ``(atime, mtime)``, which is used to set the
  282. access and modified times, respectively. This bizarre API is mimicked
  283. from Python for the sake of consistency -- I apologize.
  284. :param tuple times:
  285. ``None`` or a tuple of (access time, modified time) in standard
  286. internet epoch time (seconds since 01 January 1970 GMT)
  287. """
  288. if times is None:
  289. times = (time.time(), time.time())
  290. self.sftp._log(DEBUG, 'utime(%s, %r)' % (hexlify(self.handle), times))
  291. attr = SFTPAttributes()
  292. attr.st_atime, attr.st_mtime = times
  293. self.sftp._request(CMD_FSETSTAT, self.handle, attr)
  294. def truncate(self, size):
  295. """
  296. Change the size of this file. This usually extends
  297. or shrinks the size of the file, just like the ``truncate()`` method on
  298. Python file objects.
  299. :param size: the new size of the file
  300. """
  301. self.sftp._log(
  302. DEBUG,
  303. 'truncate(%s, %r)' % (hexlify(self.handle), size))
  304. attr = SFTPAttributes()
  305. attr.st_size = size
  306. self.sftp._request(CMD_FSETSTAT, self.handle, attr)
  307. def check(self, hash_algorithm, offset=0, length=0, block_size=0):
  308. """
  309. Ask the server for a hash of a section of this file. This can be used
  310. to verify a successful upload or download, or for various rsync-like
  311. operations.
  312. The file is hashed from ``offset``, for ``length`` bytes.
  313. If ``length`` is 0, the remainder of the file is hashed. Thus, if both
  314. ``offset`` and ``length`` are zero, the entire file is hashed.
  315. Normally, ``block_size`` will be 0 (the default), and this method will
  316. return a byte string representing the requested hash (for example, a
  317. string of length 16 for MD5, or 20 for SHA-1). If a non-zero
  318. ``block_size`` is given, each chunk of the file (from ``offset`` to
  319. ``offset + length``) of ``block_size`` bytes is computed as a separate
  320. hash. The hash results are all concatenated and returned as a single
  321. string.
  322. For example, ``check('sha1', 0, 1024, 512)`` will return a string of
  323. length 40. The first 20 bytes will be the SHA-1 of the first 512 bytes
  324. of the file, and the last 20 bytes will be the SHA-1 of the next 512
  325. bytes.
  326. :param str hash_algorithm:
  327. the name of the hash algorithm to use (normally ``"sha1"`` or
  328. ``"md5"``)
  329. :param offset:
  330. offset into the file to begin hashing (0 means to start from the
  331. beginning)
  332. :param length:
  333. number of bytes to hash (0 means continue to the end of the file)
  334. :param int block_size:
  335. number of bytes to hash per result (must not be less than 256; 0
  336. means to compute only one hash of the entire segment)
  337. :return:
  338. `str` of bytes representing the hash of each block, concatenated
  339. together
  340. :raises:
  341. ``IOError`` -- if the server doesn't support the "check-file"
  342. extension, or possibly doesn't support the hash algorithm requested
  343. .. note:: Many (most?) servers don't support this extension yet.
  344. .. versionadded:: 1.4
  345. """
  346. t, msg = self.sftp._request(
  347. CMD_EXTENDED, 'check-file', self.handle,
  348. hash_algorithm, long(offset), long(length), block_size)
  349. msg.get_text() # ext
  350. msg.get_text() # alg
  351. data = msg.get_remainder()
  352. return data
  353. def set_pipelined(self, pipelined=True):
  354. """
  355. Turn on/off the pipelining of write operations to this file. When
  356. pipelining is on, paramiko won't wait for the server response after
  357. each write operation. Instead, they're collected as they come in. At
  358. the first non-write operation (including `.close`), all remaining
  359. server responses are collected. This means that if there was an error
  360. with one of your later writes, an exception might be thrown from within
  361. `.close` instead of `.write`.
  362. By default, files are not pipelined.
  363. :param bool pipelined:
  364. ``True`` if pipelining should be turned on for this file; ``False``
  365. otherwise
  366. .. versionadded:: 1.5
  367. """
  368. self.pipelined = pipelined
  369. def prefetch(self, file_size=None):
  370. """
  371. Pre-fetch the remaining contents of this file in anticipation of future
  372. `.read` calls. If reading the entire file, pre-fetching can
  373. dramatically improve the download speed by avoiding roundtrip latency.
  374. The file's contents are incrementally buffered in a background thread.
  375. The prefetched data is stored in a buffer until read via the `.read`
  376. method. Once data has been read, it's removed from the buffer. The
  377. data may be read in a random order (using `.seek`); chunks of the
  378. buffer that haven't been read will continue to be buffered.
  379. :param int file_size:
  380. When this is ``None`` (the default), this method calls `stat` to
  381. determine the remote file size. In some situations, doing so can
  382. cause exceptions or hangs (see `#562
  383. <https://github.com/paramiko/paramiko/pull/562>`_); as a
  384. workaround, one may call `stat` explicitly and pass its value in
  385. via this parameter.
  386. .. versionadded:: 1.5.1
  387. .. versionchanged:: 1.16.0
  388. The ``file_size`` parameter was added (with no default value).
  389. .. versionchanged:: 1.16.1
  390. The ``file_size`` parameter was made optional for backwards
  391. compatibility.
  392. """
  393. if file_size is None:
  394. file_size = self.stat().st_size
  395. # queue up async reads for the rest of the file
  396. chunks = []
  397. n = self._realpos
  398. while n < file_size:
  399. chunk = min(self.MAX_REQUEST_SIZE, file_size - n)
  400. chunks.append((n, chunk))
  401. n += chunk
  402. if len(chunks) > 0:
  403. self._start_prefetch(chunks)
  404. def readv(self, chunks):
  405. """
  406. Read a set of blocks from the file by (offset, length). This is more
  407. efficient than doing a series of `.seek` and `.read` calls, since the
  408. prefetch machinery is used to retrieve all the requested blocks at
  409. once.
  410. :param chunks:
  411. a list of ``(offset, length)`` tuples indicating which sections of
  412. the file to read
  413. :return: a list of blocks read, in the same order as in ``chunks``
  414. .. versionadded:: 1.5.4
  415. """
  416. self.sftp._log(DEBUG, 'readv(%s, %r)' % (hexlify(self.handle), chunks))
  417. read_chunks = []
  418. for offset, size in chunks:
  419. # don't fetch data that's already in the prefetch buffer
  420. if (
  421. self._data_in_prefetch_buffers(offset) or
  422. self._data_in_prefetch_requests(offset, size)
  423. ):
  424. continue
  425. # break up anything larger than the max read size
  426. while size > 0:
  427. chunk_size = min(size, self.MAX_REQUEST_SIZE)
  428. read_chunks.append((offset, chunk_size))
  429. offset += chunk_size
  430. size -= chunk_size
  431. self._start_prefetch(read_chunks)
  432. # now we can just devolve to a bunch of read()s :)
  433. for x in chunks:
  434. self.seek(x[0])
  435. yield self.read(x[1])
  436. # ...internals...
  437. def _get_size(self):
  438. try:
  439. return self.stat().st_size
  440. except:
  441. return 0
  442. def _start_prefetch(self, chunks):
  443. self._prefetching = True
  444. self._prefetch_done = False
  445. t = threading.Thread(target=self._prefetch_thread, args=(chunks,))
  446. t.setDaemon(True)
  447. t.start()
  448. def _prefetch_thread(self, chunks):
  449. # do these read requests in a temporary thread because there may be
  450. # a lot of them, so it may block.
  451. for offset, length in chunks:
  452. num = self.sftp._async_request(
  453. self,
  454. CMD_READ,
  455. self.handle,
  456. long(offset),
  457. int(length))
  458. with self._prefetch_lock:
  459. self._prefetch_extents[num] = (offset, length)
  460. def _async_response(self, t, msg, num):
  461. if t == CMD_STATUS:
  462. # save exception and re-raise it on next file operation
  463. try:
  464. self.sftp._convert_status(msg)
  465. except Exception as e:
  466. self._saved_exception = e
  467. return
  468. if t != CMD_DATA:
  469. raise SFTPError('Expected data')
  470. data = msg.get_string()
  471. while True:
  472. with self._prefetch_lock:
  473. # spin if in race with _prefetch_thread
  474. if num in self._prefetch_extents:
  475. offset, length = self._prefetch_extents[num]
  476. self._prefetch_data[offset] = data
  477. del self._prefetch_extents[num]
  478. if len(self._prefetch_extents) == 0:
  479. self._prefetch_done = True
  480. break
  481. def _check_exception(self):
  482. """if there's a saved exception, raise & clear it"""
  483. if self._saved_exception is not None:
  484. x = self._saved_exception
  485. self._saved_exception = None
  486. raise x