rsync.py 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206
  1. """
  2. 1:N rsync implemenation on top of execnet.
  3. (c) 2006-2009, Armin Rigo, Holger Krekel, Maciej Fijalkowski
  4. """
  5. import os
  6. import stat
  7. from hashlib import md5
  8. try:
  9. from queue import Queue
  10. except ImportError:
  11. from Queue import Queue
  12. import execnet.rsync_remote
  13. class RSync(object):
  14. """ This class allows to send a directory structure (recursively)
  15. to one or multiple remote filesystems.
  16. There is limited support for symlinks, which means that symlinks
  17. pointing to the sourcetree will be send "as is" while external
  18. symlinks will be just copied (regardless of existance of such
  19. a path on remote side).
  20. """
  21. def __init__(self, sourcedir, callback=None, verbose=True):
  22. self._sourcedir = str(sourcedir)
  23. self._verbose = verbose
  24. assert callback is None or hasattr(callback, '__call__')
  25. self._callback = callback
  26. self._channels = {}
  27. self._receivequeue = Queue()
  28. self._links = []
  29. def filter(self, path):
  30. return True
  31. def _end_of_channel(self, channel):
  32. if channel in self._channels:
  33. # too early! we must have got an error
  34. channel.waitclose()
  35. # or else we raise one
  36. raise IOError('connection unexpectedly closed: {} '.format(
  37. channel.gateway))
  38. def _process_link(self, channel):
  39. for link in self._links:
  40. channel.send(link)
  41. # completion marker, this host is done
  42. channel.send(42)
  43. def _done(self, channel):
  44. """ Call all callbacks
  45. """
  46. finishedcallback = self._channels.pop(channel)
  47. if finishedcallback:
  48. finishedcallback()
  49. channel.waitclose()
  50. def _list_done(self, channel):
  51. # sum up all to send
  52. if self._callback:
  53. s = sum([self._paths[i] for i in self._to_send[channel]])
  54. self._callback("list", s, channel)
  55. def _send_item(self, channel, data):
  56. """ Send one item
  57. """
  58. modified_rel_path, checksum = data
  59. modifiedpath = os.path.join(self._sourcedir, *modified_rel_path)
  60. try:
  61. f = open(modifiedpath, 'rb')
  62. data = f.read()
  63. except IOError:
  64. data = None
  65. # provide info to progress callback function
  66. modified_rel_path = "/".join(modified_rel_path)
  67. if data is not None:
  68. self._paths[modified_rel_path] = len(data)
  69. else:
  70. self._paths[modified_rel_path] = 0
  71. if channel not in self._to_send:
  72. self._to_send[channel] = []
  73. self._to_send[channel].append(modified_rel_path)
  74. # print "sending", modified_rel_path, data and len(data) or 0, checksum
  75. if data is not None:
  76. f.close()
  77. if checksum is not None and checksum == md5(data).digest():
  78. data = None # not really modified
  79. else:
  80. self._report_send_file(channel.gateway, modified_rel_path)
  81. channel.send(data)
  82. def _report_send_file(self, gateway, modified_rel_path):
  83. if self._verbose:
  84. print("{} <= {}".format(gateway, modified_rel_path))
  85. def send(self, raises=True):
  86. """ Sends a sourcedir to all added targets. Flag indicates
  87. whether to raise an error or return in case of lack of
  88. targets
  89. """
  90. if not self._channels:
  91. if raises:
  92. raise IOError("no targets available, maybe you "
  93. "are trying call send() twice?")
  94. return
  95. # normalize a trailing '/' away
  96. self._sourcedir = os.path.dirname(os.path.join(self._sourcedir, 'x'))
  97. # send directory structure and file timestamps/sizes
  98. self._send_directory_structure(self._sourcedir)
  99. # paths and to_send are only used for doing
  100. # progress-related callbacks
  101. self._paths = {}
  102. self._to_send = {}
  103. # send modified file to clients
  104. while self._channels:
  105. channel, req = self._receivequeue.get()
  106. if req is None:
  107. self._end_of_channel(channel)
  108. else:
  109. command, data = req
  110. if command == "links":
  111. self._process_link(channel)
  112. elif command == "done":
  113. self._done(channel)
  114. elif command == "ack":
  115. if self._callback:
  116. self._callback("ack", self._paths[data], channel)
  117. elif command == "list_done":
  118. self._list_done(channel)
  119. elif command == "send":
  120. self._send_item(channel, data)
  121. del data
  122. else:
  123. assert "Unknown command %s" % command
  124. def add_target(self, gateway, destdir,
  125. finishedcallback=None, **options):
  126. """ Adds a remote target specified via a gateway
  127. and a remote destination directory.
  128. """
  129. for name in options:
  130. assert name in ('delete',)
  131. def itemcallback(req):
  132. self._receivequeue.put((channel, req))
  133. channel = gateway.remote_exec(execnet.rsync_remote)
  134. channel.reconfigure(py2str_as_py3str=False, py3str_as_py2str=False)
  135. channel.setcallback(itemcallback, endmarker=None)
  136. channel.send((str(destdir), options))
  137. self._channels[channel] = finishedcallback
  138. def _broadcast(self, msg):
  139. for channel in self._channels:
  140. channel.send(msg)
  141. def _send_link(self, linktype, basename, linkpoint):
  142. self._links.append((linktype, basename, linkpoint))
  143. def _send_directory(self, path):
  144. # dir: send a list of entries
  145. names = []
  146. subpaths = []
  147. for name in os.listdir(path):
  148. p = os.path.join(path, name)
  149. if self.filter(p):
  150. names.append(name)
  151. subpaths.append(p)
  152. mode = os.lstat(path).st_mode
  153. self._broadcast([mode] + names)
  154. for p in subpaths:
  155. self._send_directory_structure(p)
  156. def _send_link_structure(self, path):
  157. linkpoint = os.readlink(path)
  158. basename = path[len(self._sourcedir) + 1:]
  159. if linkpoint.startswith(self._sourcedir):
  160. self._send_link(
  161. "linkbase", basename,
  162. linkpoint[len(self._sourcedir) + 1:])
  163. else:
  164. # relative or absolute link, just send it
  165. self._send_link("link", basename, linkpoint)
  166. self._broadcast(None)
  167. def _send_directory_structure(self, path):
  168. try:
  169. st = os.lstat(path)
  170. except OSError:
  171. self._broadcast((None, 0, 0))
  172. return
  173. if stat.S_ISREG(st.st_mode):
  174. # regular file: send a mode/timestamp/size pair
  175. self._broadcast((st.st_mode, st.st_mtime, st.st_size))
  176. elif stat.S_ISDIR(st.st_mode):
  177. self._send_directory(path)
  178. elif stat.S_ISLNK(st.st_mode):
  179. self._send_link_structure(path)
  180. else:
  181. raise ValueError("cannot sync {!r}".format(path))