naming_storage.py 18 KB


  1. """
  2. Name Server persistent storage implementations.
  3. Pyro - Python Remote Objects. Copyright by Irmen de Jong (irmen@razorvine.net).
  4. """
  5. import re
  6. import logging
  7. import sys
  8. from collections import MutableMapping
  9. from contextlib import closing
  10. from Pyro4.threadutil import Lock
  11. from Pyro4.errors import NamingError
  12. try:
  13. import anydbm as dbm # python 2
  14. except ImportError:
  15. try:
  16. import dbm # python 3
  17. except ImportError:
  18. dbm = None
  19. except Exception as x:
  20. # pypy can generate a distutils error somehow if dbm is not available
  21. dbm = None
  22. try:
  23. import sqlite3
  24. except ImportError:
  25. sqlite3 = None
  26. __all__ = ["SqlStorage", "DbmStorage"]
  27. log = logging.getLogger("Pyro4.naming_storage")
  28. class SqlStorage(MutableMapping):
  29. """
  30. Sqlite-based storage, in just a single (name,uri) table.
  31. Sqlite db connection objects aren't thread-safe, so a new connection is created in every method.
  32. """
  33. def __init__(self, dbfile):
  34. if dbfile == ":memory:":
  35. raise ValueError("We don't support the sqlite :memory: database type. Just use the default volatile in-memory store.")
  36. self.dbfile = dbfile
  37. with closing(sqlite3.connect(dbfile)) as db:
  38. db.execute("PRAGMA foreign_keys=ON")
  39. try:
  40. db.execute("SELECT COUNT(*) FROM pyro_names").fetchone()
  41. except sqlite3.OperationalError:
  42. # the table does not yet exist
  43. self._create_schema(db)
  44. else:
  45. # check if we need to update the existing schema
  46. try:
  47. db.execute("SELECT COUNT(*) FROM pyro_metadata").fetchone()
  48. except sqlite3.OperationalError:
  49. # metadata schema needs to be created and existing data migrated
  50. db.execute("ALTER TABLE pyro_names RENAME TO pyro_names_old")
  51. self._create_schema(db)
  52. db.execute("INSERT INTO pyro_names(name, uri) SELECT name, uri FROM pyro_names_old")
  53. db.execute("DROP TABLE pyro_names_old")
  54. db.commit()
  55. def _create_schema(self, db):
  56. db.execute("""CREATE TABLE pyro_names
  57. (
  58. id integer PRIMARY KEY,
  59. name nvarchar NOT NULL UNIQUE,
  60. uri nvarchar NOT NULL
  61. );""")
  62. db.execute("""CREATE TABLE pyro_metadata
  63. (
  64. object integer NOT NULL,
  65. metadata nvarchar NOT NULL,
  66. FOREIGN KEY(object) REFERENCES pyro_names(id)
  67. );""")
  68. def __getattr__(self, item):
  69. raise NotImplementedError("SqlStorage doesn't implement method/attribute '"+item+"'")
  70. def __getitem__(self, item):
  71. try:
  72. with closing(sqlite3.connect(self.dbfile)) as db:
  73. result = db.execute("SELECT id, uri FROM pyro_names WHERE name=?", (item,)).fetchone()
  74. if result:
  75. dbid, uri = result
  76. metadata = {m[0] for m in db.execute("SELECT metadata FROM pyro_metadata WHERE object=?", (dbid,)).fetchall()}
  77. return uri, metadata
  78. else:
  79. raise KeyError(item)
  80. except sqlite3.DatabaseError as e:
  81. raise NamingError("sqlite error in getitem: "+str(e))
  82. def __setitem__(self, key, value):
  83. uri, metadata = value
  84. try:
  85. with closing(sqlite3.connect(self.dbfile)) as db:
  86. cursor = db.cursor()
  87. cursor.execute("PRAGMA foreign_keys=ON")
  88. dbid = cursor.execute("SELECT id FROM pyro_names WHERE name=?", (key,)).fetchone()
  89. if dbid:
  90. dbid = dbid[0]
  91. cursor.execute("DELETE FROM pyro_metadata WHERE object=?", (dbid,))
  92. cursor.execute("DELETE FROM pyro_names WHERE id=?", (dbid,))
  93. cursor.execute("INSERT INTO pyro_names(name, uri) VALUES(?,?)", (key, uri))
  94. if metadata:
  95. object_id = cursor.lastrowid
  96. for m in metadata:
  97. cursor.execute("INSERT INTO pyro_metadata(object, metadata) VALUES (?,?)", (object_id, m))
  98. cursor.close()
  99. db.commit()
  100. except sqlite3.DatabaseError as e:
  101. raise NamingError("sqlite error in setitem: "+str(e))
  102. def __len__(self):
  103. try:
  104. with closing(sqlite3.connect(self.dbfile)) as db:
  105. return db.execute("SELECT count(*) FROM pyro_names").fetchone()[0]
  106. except sqlite3.DatabaseError as e:
  107. raise NamingError("sqlite error in len: "+str(e))
  108. def __contains__(self, item):
  109. try:
  110. with closing(sqlite3.connect(self.dbfile)) as db:
  111. return db.execute("SELECT EXISTS(SELECT 1 FROM pyro_names WHERE name=? LIMIT 1)", (item,)).fetchone()[0]
  112. except sqlite3.DatabaseError as e:
  113. raise NamingError("sqlite error in contains: "+str(e))
  114. def __delitem__(self, key):
  115. try:
  116. with closing(sqlite3.connect(self.dbfile)) as db:
  117. db.execute("PRAGMA foreign_keys=ON")
  118. dbid = db.execute("SELECT id FROM pyro_names WHERE name=?", (key,)).fetchone()
  119. if dbid:
  120. dbid = dbid[0]
  121. db.execute("DELETE FROM pyro_metadata WHERE object=?", (dbid,))
  122. db.execute("DELETE FROM pyro_names WHERE id=?", (dbid,))
  123. db.commit()
  124. except sqlite3.DatabaseError as e:
  125. raise NamingError("sqlite error in delitem: "+str(e))
  126. def __iter__(self):
  127. try:
  128. with closing(sqlite3.connect(self.dbfile)) as db:
  129. result = db.execute("SELECT name FROM pyro_names")
  130. return iter([n[0] for n in result.fetchall()])
  131. except sqlite3.DatabaseError as e:
  132. raise NamingError("sqlite error in iter: "+str(e))
  133. def clear(self):
  134. try:
  135. with closing(sqlite3.connect(self.dbfile)) as db:
  136. db.execute("PRAGMA foreign_keys=ON")
  137. db.execute("DELETE FROM pyro_metadata")
  138. db.execute("DELETE FROM pyro_names")
  139. db.commit()
  140. with closing(sqlite3.connect(self.dbfile, isolation_level=None)) as db:
  141. db.execute("VACUUM") # this cannot run inside a transaction.
  142. except sqlite3.DatabaseError as e:
  143. raise NamingError("sqlite error in clear: "+str(e))
  144. def optimized_prefix_list(self, prefix, return_metadata=False):
  145. try:
  146. with closing(sqlite3.connect(self.dbfile)) as db:
  147. names = {}
  148. if return_metadata:
  149. for dbid, name, uri in db.execute("SELECT id, name, uri FROM pyro_names WHERE name LIKE ?", (prefix+'%',)).fetchall():
  150. metadata = {m[0] for m in db.execute("SELECT metadata FROM pyro_metadata WHERE object=?", (dbid,)).fetchall()}
  151. names[name] = uri, metadata
  152. else:
  153. for name, uri in db.execute("SELECT name, uri FROM pyro_names WHERE name LIKE ?", (prefix+'%',)).fetchall():
  154. names[name] = uri
  155. return names
  156. except sqlite3.DatabaseError as e:
  157. raise NamingError("sqlite error in optimized_prefix_list: "+str(e))
  158. def optimized_regex_list(self, regex, return_metadata=False):
  159. # defining a regex function isn't much better than simply regexing ourselves over the full table.
  160. return None
  161. def optimized_metadata_search(self, metadata_all=None, metadata_any=None, return_metadata=False):
  162. try:
  163. with closing(sqlite3.connect(self.dbfile)) as db:
  164. if metadata_any:
  165. # any of the given metadata
  166. params = list(metadata_any)
  167. sql = "SELECT id, name, uri FROM pyro_names WHERE id IN (SELECT object FROM pyro_metadata WHERE metadata IN ({seq}))" \
  168. .format(seq=",".join(['?']*len(metadata_any)))
  169. else:
  170. # all of the given metadata
  171. params = list(metadata_all)
  172. params.append(len(metadata_all))
  173. sql = "SELECT id, name, uri FROM pyro_names WHERE id IN (SELECT object FROM pyro_metadata WHERE metadata IN ({seq}) " \
  174. "GROUP BY object HAVING COUNT(metadata)=?)".format(seq=",".join(['?']*len(metadata_all)))
  175. result = db.execute(sql, params).fetchall()
  176. if return_metadata:
  177. names = {}
  178. for dbid, name, uri in result:
  179. metadata = {m[0] for m in db.execute("SELECT metadata FROM pyro_metadata WHERE object=?", (dbid,)).fetchall()}
  180. names[name] = uri, metadata
  181. else:
  182. names = {name: uri for (dbid, name, uri) in result}
  183. return names
  184. except sqlite3.DatabaseError as e:
  185. raise NamingError("sqlite error in optimized_metadata_search: "+str(e))
  186. def remove_items(self, items):
  187. try:
  188. with closing(sqlite3.connect(self.dbfile)) as db:
  189. db.execute("PRAGMA foreign_keys=ON")
  190. for item in items:
  191. dbid = db.execute("SELECT id FROM pyro_names WHERE name=?", (item,)).fetchone()
  192. if dbid:
  193. dbid = dbid[0]
  194. db.execute("DELETE FROM pyro_metadata WHERE object=?", (dbid,))
  195. db.execute("DELETE FROM pyro_names WHERE id=?", (dbid,))
  196. db.commit()
  197. except sqlite3.DatabaseError as e:
  198. raise NamingError("sqlite error in remove_items: "+str(e))
  199. def everything(self, return_metadata=False):
  200. try:
  201. with closing(sqlite3.connect(self.dbfile)) as db:
  202. names = {}
  203. if return_metadata:
  204. for dbid, name, uri in db.execute("SELECT id, name, uri FROM pyro_names").fetchall():
  205. metadata = {m[0] for m in db.execute("SELECT metadata FROM pyro_metadata WHERE object=?", (dbid,)).fetchall()}
  206. names[name] = uri, metadata
  207. else:
  208. for name, uri in db.execute("SELECT name, uri FROM pyro_names").fetchall():
  209. names[name] = uri
  210. return names
  211. except sqlite3.DatabaseError as e:
  212. raise NamingError("sqlite error in everything: "+str(e))
  213. def close(self):
  214. pass
  215. class DbmStorage(MutableMapping):
  216. """
  217. Storage implementation that uses a persistent dbm file.
  218. Because dbm only supports strings as key/value, we encode/decode them in utf-8.
  219. Dbm files cannot be accessed concurrently, so a strict concurrency model
  220. is used where only one operation is processed at the same time
  221. (this is very slow when compared to the in-memory storage)
  222. DbmStorage does NOT support storing metadata! It only accepts empty metadata,
  223. and always returns empty metadata.
  224. """
  225. def __init__(self, dbmfile):
  226. self.dbmfile = dbmfile
  227. db = dbm.open(self.dbmfile, "c", mode=0o600)
  228. db.close()
  229. self.lock = Lock()
  230. def __getattr__(self, item):
  231. raise NotImplementedError("DbmStorage doesn't implement method/attribute '"+item+"'")
  232. def __getitem__(self, item):
  233. item = item.encode("utf-8")
  234. with self.lock:
  235. try:
  236. with closing(dbm.open(self.dbmfile)) as db:
  237. return db[item].decode("utf-8"), frozenset() # always return empty metadata
  238. except dbm.error as e:
  239. raise NamingError("dbm error in getitem: "+str(e))
  240. def __setitem__(self, key, value):
  241. uri, metadata = value
  242. if metadata:
  243. log.warning("DbmStorage doesn't support metadata, silently discarded")
  244. key = key.encode("utf-8")
  245. uri = uri.encode("utf-8")
  246. with self.lock:
  247. try:
  248. with closing(dbm.open(self.dbmfile, "w")) as db:
  249. db[key] = uri
  250. except dbm.error as e:
  251. raise NamingError("dbm error in setitem: "+str(e))
  252. def __len__(self):
  253. with self.lock:
  254. try:
  255. with closing(dbm.open(self.dbmfile)) as db:
  256. return len(db)
  257. except dbm.error as e:
  258. raise NamingError("dbm error in len: "+str(e))
  259. def __contains__(self, item):
  260. item = item.encode("utf-8")
  261. with self.lock:
  262. try:
  263. with closing(dbm.open(self.dbmfile)) as db:
  264. return item in db
  265. except dbm.error as e:
  266. raise NamingError("dbm error in contains: "+str(e))
  267. def __delitem__(self, key):
  268. key = key.encode("utf-8")
  269. with self.lock:
  270. try:
  271. with closing(dbm.open(self.dbmfile, "w")) as db:
  272. del db[key]
  273. except dbm.error as e:
  274. raise NamingError("dbm error in delitem: "+str(e))
  275. def __iter__(self):
  276. with self.lock:
  277. try:
  278. with closing(dbm.open(self.dbmfile)) as db:
  279. return iter([key.decode("utf-8") for key in db.keys()])
  280. except dbm.error as e:
  281. raise NamingError("dbm error in iter: "+str(e))
  282. def clear(self):
  283. with self.lock:
  284. try:
  285. with closing(dbm.open(self.dbmfile, "w")) as db:
  286. if hasattr(db, "clear"):
  287. db.clear()
  288. else:
  289. for key in db.keys():
  290. del db[key]
  291. except dbm.error as e:
  292. raise NamingError("dbm error in clear: "+str(e))
  293. def optimized_prefix_list(self, prefix, return_metadata=False):
  294. with self.lock:
  295. try:
  296. with closing(dbm.open(self.dbmfile)) as db:
  297. result = {}
  298. if hasattr(db, "items"):
  299. for key, value in db.items():
  300. key = key.decode("utf-8")
  301. if key.startswith(prefix):
  302. uri = value.decode("utf-8")
  303. result[key] = (uri, frozenset()) if return_metadata else uri # always return empty metadata
  304. else:
  305. for key in db.keys():
  306. keystr = key.decode("utf-8")
  307. if keystr.startswith(prefix):
  308. uri = db[key].decode("utf-8")
  309. result[keystr] = (uri, frozenset()) if return_metadata else uri # always return empty metadata
  310. return result
  311. except dbm.error as e:
  312. raise NamingError("dbm error in optimized_prefix_list: "+str(e))
  313. def optimized_regex_list(self, regex, return_metadata=False):
  314. try:
  315. regex = re.compile(regex + "$") # add end of string marker
  316. except re.error:
  317. x = sys.exc_info()[1]
  318. raise NamingError("invalid regex: " + str(x))
  319. with self.lock:
  320. try:
  321. with closing(dbm.open(self.dbmfile)) as db:
  322. result = {}
  323. if hasattr(db, "items"):
  324. for key, value in db.items():
  325. key = key.decode("utf-8")
  326. if regex.match(key):
  327. uri = value.decode("utf-8")
  328. result[key] = (uri, frozenset()) if return_metadata else uri # always return empty metadata
  329. else:
  330. for key in db.keys():
  331. keystr = key.decode("utf-8")
  332. if regex.match(keystr):
  333. uri = db[key].decode("utf-8")
  334. result[keystr] = (uri, frozenset()) if return_metadata else uri # always return empty metadata
  335. return result
  336. except dbm.error as e:
  337. raise NamingError("dbm error in optimized_regex_list: "+str(e))
  338. def optimized_metadata_search(self, metadata_all=None, metadata_any=None, return_metadata=False):
  339. if metadata_all or metadata_any:
  340. raise NamingError("DbmStorage doesn't support metadata")
  341. return self.everything(return_metadata)
  342. def remove_items(self, items):
  343. with self.lock:
  344. try:
  345. with closing(dbm.open(self.dbmfile, "w")) as db:
  346. for item in items:
  347. try:
  348. del db[item.encode("utf-8")]
  349. except KeyError:
  350. pass
  351. except dbm.error as e:
  352. raise NamingError("dbm error in remove_items: "+str(e))
  353. def everything(self, return_metadata=False):
  354. with self.lock:
  355. try:
  356. with closing(dbm.open(self.dbmfile)) as db:
  357. result = {}
  358. if hasattr(db, "items"):
  359. for key, value in db.items():
  360. uri = value.decode("utf-8")
  361. result[key.decode("utf-8")] = (uri, frozenset()) if return_metadata else uri # always return empty metadata
  362. else:
  363. for key in db.keys():
  364. uri = db[key].decode("utf-8")
  365. result[key.decode("utf-8")] = (uri, frozenset()) if return_metadata else uri # always return empty metadata
  366. return result
  367. except dbm.error as e:
  368. raise NamingError("dbm error in everything: "+str(e))
  369. def close(self):
  370. pass