123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154 |
- from __future__ import division
- # use cPickle when using python2 for better performance
- try:
- import cPickle as pickle
- except ImportError:
- import pickle
- import logging
- import os
- import random
- import re
- from collections import defaultdict
- PUNCTUATION = re.compile(r"([\.,;!?])")
- def tokenize(text):
- text = PUNCTUATION.sub(r" \1 ", text)
- return text.split()
- class StringContinuationImpossibleError(Exception):
- pass
- def _wordIter(text, separator='.'):
- """
- An iterator over the 'words' in the given text, as defined by
- the regular expression given as separator.
- """
- exp = re.compile(separator)
- pos = 0
- for occ in exp.finditer(text):
- sub = text[pos:occ.start()].strip()
- if sub:
- yield sub
- pos = occ.start() + 1
- if pos < len(text):
- # take case of the last part
- sub = text[pos:].strip()
- if sub:
- yield sub
- class MarkovChain(object):
- def __init__(self, dbFilePath=None):
- self.dbFilePath = dbFilePath
- if not dbFilePath:
- self.dbFilePath = os.path.join(os.path.dirname(__file__), "markovdb")
- try:
- with open(self.dbFilePath, 'rb') as dbfile:
- self.db = pickle.load(dbfile)
- except (IOError, ValueError):
- logging.warn('Database file corrupt or not found, using empty database')
- self.db = defaultdict(lambda: defaultdict(lambda: 1.0))
- def generateDatabase(self, textSample, sentenceSep='[.!?\n]', n=2):
- """ Generate word probability database from raw content string """
- # I'm using the database to temporarily store word counts
- textSample = _wordIter(textSample, sentenceSep) # get an iterator for the 'sentences'
- # We're using '' as special symbol for the beginning
- # of a sentence
- self.db[('',)][''] = 0.0
- for line in textSample:
- words = line.strip().split() # split words in line
- if len(words) == 0:
- continue
- # first word follows a sentence end
- self.db[("",)][words[0]] += 1
- for order in range(1, n+1):
- for i in range(len(words) - 1):
- if i + order >= len(words):
- continue
- word = tuple(words[i:i + order])
- self.db[word][words[i + order]] += 1
- # last word precedes a sentence end
- self.db[tuple(words[len(words) - order:len(words)])][""] += 1
- # We've now got the db filled with parametrized word counts
- # We still need to normalize this to represent probabilities
- for word in self.db:
- wordsum = 0
- for nextword in self.db[word]:
- wordsum += self.db[word][nextword]
- if wordsum != 0:
- for nextword in self.db[word]:
- self.db[word][nextword] /= wordsum
- def dumpdb(self):
- try:
- with open(self.dbFilePath, 'wb') as dbfile:
- pickle.dump(self.db, dbfile)
- # It looks like db was written successfully
- return True
- except IOError:
- logging.warn('Database file could not be written')
- return False
- def generateString(self):
- """ Generate a "sentence" with the database of known text """
- return self._accumulateWithSeed(('',))
- def generateStringWithSeed(self, seed):
- """ Generate a "sentence" with the database and a given word """
- # using str.split here means we're contructing the list in memory
- # but as the generated sentence only depends on the last word of the seed
- # I'm assuming seeds tend to be rather short.
- words = seed.split()
- if (words[-1],) not in self.db:
- # The only possible way it won't work is if the last word is not known
- raise StringContinuationImpossibleError('Could not continue string: '
- + seed)
- return self._accumulateWithSeed(words)
- def _accumulateWithSeed(self, seed):
- """ Accumulate the generated sentence with a given single word as a
- seed """
- nextWord = self._nextWord(seed)
- sentence = list(seed) if seed else []
- while nextWord:
- sentence.append(nextWord)
- nextWord = self._nextWord(sentence)
- return ' '.join(sentence).strip()
- def _nextWord(self, lastwords):
- lastwords = tuple(lastwords)
- if lastwords != ('',):
- while lastwords not in self.db:
- lastwords = lastwords[1:]
- if not lastwords:
- return ''
- probmap = self.db[lastwords]
- sample = random.random()
- # since rounding errors might make us miss out on some words
- maxprob = 0.0
- maxprobword = ""
- for candidate in probmap:
- # remember which word had the highest probability
- # this is the word we'll default to if we can't find anythin else
- if probmap[candidate] > maxprob:
- maxprob = probmap[candidate]
- maxprobword = candidate
- if sample > probmap[candidate]:
- sample -= probmap[candidate]
- else:
- return candidate
- # getting here means we haven't found a matching word. :(
- return maxprobword
- # pylama:ignore=D
|