mmap_dict.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144
  1. import json
  2. import mmap
  3. import os
  4. import struct
  5. _INITIAL_MMAP_SIZE = 1 << 20
  6. _pack_integer_func = struct.Struct(b'i').pack
  7. _pack_double_func = struct.Struct(b'd').pack
  8. _unpack_integer = struct.Struct(b'i').unpack_from
  9. _unpack_double = struct.Struct(b'd').unpack_from
  10. # struct.pack_into has atomicity issues because it will temporarily write 0 into
  11. # the mmap, resulting in false reads to 0 when experiencing a lot of writes.
  12. # Using direct assignment solves this issue.
  13. def _pack_double(data, pos, value):
  14. data[pos:pos + 8] = _pack_double_func(value)
  15. def _pack_integer(data, pos, value):
  16. data[pos:pos + 4] = _pack_integer_func(value)
  17. def _read_all_values(data, used=0):
  18. """Yield (key, value, pos). No locking is performed."""
  19. if used <= 0:
  20. # If not valid `used` value is passed in, read it from the file.
  21. used = _unpack_integer(data, 0)[0]
  22. pos = 8
  23. while pos < used:
  24. encoded_len = _unpack_integer(data, pos)[0]
  25. # check we are not reading beyond bounds
  26. if encoded_len + pos > used:
  27. raise RuntimeError('Read beyond file size detected, file is corrupted.')
  28. pos += 4
  29. encoded_key = data[pos : pos + encoded_len]
  30. padded_len = encoded_len + (8 - (encoded_len + 4) % 8)
  31. pos += padded_len
  32. value = _unpack_double(data, pos)[0]
  33. yield encoded_key.decode('utf-8'), value, pos
  34. pos += 8
  35. class MmapedDict(object):
  36. """A dict of doubles, backed by an mmapped file.
  37. The file starts with a 4 byte int, indicating how much of it is used.
  38. Then 4 bytes of padding.
  39. There's then a number of entries, consisting of a 4 byte int which is the
  40. size of the next field, a utf-8 encoded string key, padding to a 8 byte
  41. alignment, and then a 8 byte float which is the value.
  42. Not thread safe.
  43. """
  44. def __init__(self, filename, read_mode=False):
  45. self._f = open(filename, 'rb' if read_mode else 'a+b')
  46. self._fname = filename
  47. capacity = os.fstat(self._f.fileno()).st_size
  48. if capacity == 0:
  49. self._f.truncate(_INITIAL_MMAP_SIZE)
  50. capacity = _INITIAL_MMAP_SIZE
  51. self._capacity = capacity
  52. self._m = mmap.mmap(self._f.fileno(), self._capacity,
  53. access=mmap.ACCESS_READ if read_mode else mmap.ACCESS_WRITE)
  54. self._positions = {}
  55. self._used = _unpack_integer(self._m, 0)[0]
  56. if self._used == 0:
  57. self._used = 8
  58. _pack_integer(self._m, 0, self._used)
  59. else:
  60. if not read_mode:
  61. for key, _, pos in self._read_all_values():
  62. self._positions[key] = pos
  63. @staticmethod
  64. def read_all_values_from_file(filename):
  65. with open(filename, 'rb') as infp:
  66. # Read the first block of data, including the first 4 bytes which tell us
  67. # how much of the file (which is preallocated to _INITIAL_MMAP_SIZE bytes) is occupied.
  68. data = infp.read(mmap.PAGESIZE)
  69. used = _unpack_integer(data, 0)[0]
  70. if used > len(data): # Then read in the rest, if needed.
  71. data += infp.read(used - len(data))
  72. return _read_all_values(data, used)
  73. def _init_value(self, key):
  74. """Initialize a value. Lock must be held by caller."""
  75. encoded = key.encode('utf-8')
  76. # Pad to be 8-byte aligned.
  77. padded = encoded + (b' ' * (8 - (len(encoded) + 4) % 8))
  78. value = struct.pack('i{0}sd'.format(len(padded)).encode(), len(encoded), padded, 0.0)
  79. while self._used + len(value) > self._capacity:
  80. self._capacity *= 2
  81. self._f.truncate(self._capacity)
  82. self._m = mmap.mmap(self._f.fileno(), self._capacity)
  83. self._m[self._used:self._used + len(value)] = value
  84. # Update how much space we've used.
  85. self._used += len(value)
  86. _pack_integer(self._m, 0, self._used)
  87. self._positions[key] = self._used - 8
  88. def _read_all_values(self):
  89. """Yield (key, value, pos). No locking is performed."""
  90. return _read_all_values(data=self._m, used=self._used)
  91. def read_all_values(self):
  92. """Yield (key, value). No locking is performed."""
  93. for k, v, _ in self._read_all_values():
  94. yield k, v
  95. def read_value(self, key):
  96. if key not in self._positions:
  97. self._init_value(key)
  98. pos = self._positions[key]
  99. # We assume that reading from an 8 byte aligned value is atomic
  100. return _unpack_double(self._m, pos)[0]
  101. def write_value(self, key, value):
  102. if key not in self._positions:
  103. self._init_value(key)
  104. pos = self._positions[key]
  105. # We assume that writing to an 8 byte aligned value is atomic
  106. _pack_double(self._m, pos, value)
  107. def close(self):
  108. if self._f:
  109. self._m.close()
  110. self._m = None
  111. self._f.close()
  112. self._f = None
  113. def mmap_key(metric_name, name, labelnames, labelvalues):
  114. """Format a key for use in the mmap file."""
  115. # ensure labels are in consistent order for identity
  116. labels = dict(zip(labelnames, labelvalues))
  117. return json.dumps([metric_name, name, labels], sort_keys=True)