cmodulepullpipe.py 1.1 KB

123456789101112131415161718192021222324252627282930313233343536
  1. # -*- test-case-name: twisted.python.test.test_sendmsg -*-
  2. # Copyright (c) Twisted Matrix Laboratories.
  3. # See LICENSE for details.
  4. import sys, os
  5. from struct import unpack
  6. from twisted.python.sendmsg import recv1msg
  7. def recvfd(socketfd):
  8. """
  9. Receive a file descriptor from a L{send1msg} message on the given C{AF_UNIX}
  10. socket.
  11. @param socketfd: An C{AF_UNIX} socket, attached to another process waiting
  12. to send sockets via the ancillary data mechanism in L{send1msg}.
  13. @param fd: C{int}
  14. @return: a 2-tuple of (new file descriptor, description).
  15. @rtype: 2-tuple of (C{int}, C{str})
  16. """
  17. data, flags, ancillary = recv1msg(socketfd)
  18. [(cmsg_level, cmsg_type, packedFD)] = ancillary
  19. # cmsg_level and cmsg_type really need to be SOL_SOCKET / SCM_RIGHTS, but
  20. # since those are the *only* standard values, there's not much point in
  21. # checking.
  22. [unpackedFD] = unpack("i", packedFD)
  23. return (unpackedFD, data)
  24. if __name__ == '__main__':
  25. fd, description = recvfd(int(sys.argv[1]))
  26. os.write(fd, "Test fixture data: %s.\n" % (description,))
  27. os.close(fd)