hashed.py 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. from __future__ import absolute_import
  2. from kafka.vendor import six
  3. from .base import Partitioner
  4. class Murmur2Partitioner(Partitioner):
  5. """
  6. Implements a partitioner which selects the target partition based on
  7. the hash of the key. Attempts to apply the same hashing
  8. function as mainline java client.
  9. """
  10. def __call__(self, key, partitions=None, available=None):
  11. if available:
  12. return self.partition(key, available)
  13. return self.partition(key, partitions)
  14. def partition(self, key, partitions=None):
  15. if not partitions:
  16. partitions = self.partitions
  17. # https://github.com/apache/kafka/blob/0.8.2/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java#L69
  18. idx = (murmur2(key) & 0x7fffffff) % len(partitions)
  19. return partitions[idx]
  20. class LegacyPartitioner(object):
  21. """DEPRECATED -- See Issue 374
  22. Implements a partitioner which selects the target partition based on
  23. the hash of the key
  24. """
  25. def __init__(self, partitions):
  26. self.partitions = partitions
  27. def partition(self, key, partitions=None):
  28. if not partitions:
  29. partitions = self.partitions
  30. size = len(partitions)
  31. idx = hash(key) % size
  32. return partitions[idx]
  33. # Default will change to Murmur2 in 0.10 release
  34. HashedPartitioner = LegacyPartitioner
  35. # https://github.com/apache/kafka/blob/0.8.2/clients/src/main/java/org/apache/kafka/common/utils/Utils.java#L244
  36. def murmur2(data):
  37. """Pure-python Murmur2 implementation.
  38. Based on java client, see org.apache.kafka.common.utils.Utils.murmur2
  39. Args:
  40. data (bytes): opaque bytes
  41. Returns: MurmurHash2 of data
  42. """
  43. # Python2 bytes is really a str, causing the bitwise operations below to fail
  44. # so convert to bytearray.
  45. if six.PY2:
  46. data = bytearray(bytes(data))
  47. length = len(data)
  48. seed = 0x9747b28c
  49. # 'm' and 'r' are mixing constants generated offline.
  50. # They're not really 'magic', they just happen to work well.
  51. m = 0x5bd1e995
  52. r = 24
  53. # Initialize the hash to a random value
  54. h = seed ^ length
  55. length4 = length // 4
  56. for i in range(length4):
  57. i4 = i * 4
  58. k = ((data[i4 + 0] & 0xff) +
  59. ((data[i4 + 1] & 0xff) << 8) +
  60. ((data[i4 + 2] & 0xff) << 16) +
  61. ((data[i4 + 3] & 0xff) << 24))
  62. k &= 0xffffffff
  63. k *= m
  64. k &= 0xffffffff
  65. k ^= (k % 0x100000000) >> r # k ^= k >>> r
  66. k &= 0xffffffff
  67. k *= m
  68. k &= 0xffffffff
  69. h *= m
  70. h &= 0xffffffff
  71. h ^= k
  72. h &= 0xffffffff
  73. # Handle the last few bytes of the input array
  74. extra_bytes = length % 4
  75. if extra_bytes >= 3:
  76. h ^= (data[(length & ~3) + 2] & 0xff) << 16
  77. h &= 0xffffffff
  78. if extra_bytes >= 2:
  79. h ^= (data[(length & ~3) + 1] & 0xff) << 8
  80. h &= 0xffffffff
  81. if extra_bytes >= 1:
  82. h ^= (data[length & ~3] & 0xff)
  83. h &= 0xffffffff
  84. h *= m
  85. h &= 0xffffffff
  86. h ^= (h % 0x100000000) >> 13 # h >>> 13;
  87. h &= 0xffffffff
  88. h *= m
  89. h &= 0xffffffff
  90. h ^= (h % 0x100000000) >> 15 # h >>> 15;
  91. h &= 0xffffffff
  92. return h