historyapp.py 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162
  1. # encoding: utf-8
  2. """
  3. An application for managing IPython history.
  4. To be invoked as the `ipython history` subcommand.
  5. """
  6. from __future__ import print_function
  7. import os
  8. import sqlite3
  9. from traitlets.config.application import Application
  10. from IPython.core.application import BaseIPythonApplication
  11. from traitlets import Bool, Int, Dict
  12. from IPython.utils.io import ask_yes_no
  13. trim_hist_help = """Trim the IPython history database to the last 1000 entries.
  14. This actually copies the last 1000 entries to a new database, and then replaces
  15. the old file with the new. Use the `--keep=` argument to specify a number
  16. other than 1000.
  17. """
  18. clear_hist_help = """Clear the IPython history database, deleting all entries.
  19. Because this is a destructive operation, IPython will prompt the user if they
  20. really want to do this. Passing a `-f` flag will force clearing without a
  21. prompt.
  22. This is an handy alias to `ipython history trim --keep=0`
  23. """
  24. class HistoryTrim(BaseIPythonApplication):
  25. description = trim_hist_help
  26. backup = Bool(False,
  27. help="Keep the old history file as history.sqlite.<N>"
  28. ).tag(config=True)
  29. keep = Int(1000,
  30. help="Number of recent lines to keep in the database."
  31. ).tag(config=True)
  32. flags = Dict(dict(
  33. backup = ({'HistoryTrim' : {'backup' : True}},
  34. backup.help
  35. )
  36. ))
  37. aliases=Dict(dict(
  38. keep = 'HistoryTrim.keep'
  39. ))
  40. def start(self):
  41. profile_dir = self.profile_dir.location
  42. hist_file = os.path.join(profile_dir, 'history.sqlite')
  43. con = sqlite3.connect(hist_file)
  44. # Grab the recent history from the current database.
  45. inputs = list(con.execute('SELECT session, line, source, source_raw FROM '
  46. 'history ORDER BY session DESC, line DESC LIMIT ?', (self.keep+1,)))
  47. if len(inputs) <= self.keep:
  48. print("There are already at most %d entries in the history database." % self.keep)
  49. print("Not doing anything. Use --keep= argument to keep fewer entries")
  50. return
  51. print("Trimming history to the most recent %d entries." % self.keep)
  52. inputs.pop() # Remove the extra element we got to check the length.
  53. inputs.reverse()
  54. if inputs:
  55. first_session = inputs[0][0]
  56. outputs = list(con.execute('SELECT session, line, output FROM '
  57. 'output_history WHERE session >= ?', (first_session,)))
  58. sessions = list(con.execute('SELECT session, start, end, num_cmds, remark FROM '
  59. 'sessions WHERE session >= ?', (first_session,)))
  60. con.close()
  61. # Create the new history database.
  62. new_hist_file = os.path.join(profile_dir, 'history.sqlite.new')
  63. i = 0
  64. while os.path.exists(new_hist_file):
  65. # Make sure we don't interfere with an existing file.
  66. i += 1
  67. new_hist_file = os.path.join(profile_dir, 'history.sqlite.new'+str(i))
  68. new_db = sqlite3.connect(new_hist_file)
  69. new_db.execute("""CREATE TABLE IF NOT EXISTS sessions (session integer
  70. primary key autoincrement, start timestamp,
  71. end timestamp, num_cmds integer, remark text)""")
  72. new_db.execute("""CREATE TABLE IF NOT EXISTS history
  73. (session integer, line integer, source text, source_raw text,
  74. PRIMARY KEY (session, line))""")
  75. new_db.execute("""CREATE TABLE IF NOT EXISTS output_history
  76. (session integer, line integer, output text,
  77. PRIMARY KEY (session, line))""")
  78. new_db.commit()
  79. if inputs:
  80. with new_db:
  81. # Add the recent history into the new database.
  82. new_db.executemany('insert into sessions values (?,?,?,?,?)', sessions)
  83. new_db.executemany('insert into history values (?,?,?,?)', inputs)
  84. new_db.executemany('insert into output_history values (?,?,?)', outputs)
  85. new_db.close()
  86. if self.backup:
  87. i = 1
  88. backup_hist_file = os.path.join(profile_dir, 'history.sqlite.old.%d' % i)
  89. while os.path.exists(backup_hist_file):
  90. i += 1
  91. backup_hist_file = os.path.join(profile_dir, 'history.sqlite.old.%d' % i)
  92. os.rename(hist_file, backup_hist_file)
  93. print("Backed up longer history file to", backup_hist_file)
  94. else:
  95. os.remove(hist_file)
  96. os.rename(new_hist_file, hist_file)
  97. class HistoryClear(HistoryTrim):
  98. description = clear_hist_help
  99. keep = Int(0,
  100. help="Number of recent lines to keep in the database.")
  101. force = Bool(False,
  102. help="Don't prompt user for confirmation"
  103. ).tag(config=True)
  104. flags = Dict(dict(
  105. force = ({'HistoryClear' : {'force' : True}},
  106. force.help),
  107. f = ({'HistoryTrim' : {'force' : True}},
  108. force.help
  109. )
  110. ))
  111. aliases = Dict()
  112. def start(self):
  113. if self.force or ask_yes_no("Really delete all ipython history? ",
  114. default="no", interrupt="no"):
  115. HistoryTrim.start(self)
  116. class HistoryApp(Application):
  117. name = u'ipython-history'
  118. description = "Manage the IPython history database."
  119. subcommands = Dict(dict(
  120. trim = (HistoryTrim, HistoryTrim.description.splitlines()[0]),
  121. clear = (HistoryClear, HistoryClear.description.splitlines()[0]),
  122. ))
  123. def start(self):
  124. if self.subapp is None:
  125. print("No subcommand specified. Must specify one of: %s" % \
  126. (self.subcommands.keys()))
  127. print()
  128. self.print_description()
  129. self.print_subcommands()
  130. self.exit(1)
  131. else:
  132. return self.subapp.start()