test_nmea.py 39 KB


  1. # Copyright (c) 2009-2011 Twisted Matrix Laboratories.
  2. # See LICENSE for details.
  3. """
  4. Test cases for using NMEA sentences.
  5. """
  6. from __future__ import absolute_import, division
  7. import datetime
  8. from operator import attrgetter
  9. from zope.interface import implementer
  10. from twisted.python.compat import iteritems, intToBytes
  11. from twisted.positioning import base, nmea, ipositioning
  12. from twisted.positioning.test.receiver import MockPositioningReceiver
  13. from twisted.trial.unittest import TestCase
  14. from twisted.positioning.base import Angles
  15. # Sample sentences
  16. GPGGA = b'$GPGGA,123519,4807.038,N,01131.000,E,1,08,0.9,545.4,M,46.9,M,,*47'
  17. GPRMC = b'$GPRMC,123519,A,4807.038,N,01131.000,E,022.4,084.4,230394,003.1,W*6A'
  18. GPGSA = b'$GPGSA,A,3,19,28,14,18,27,22,31,39,,,,,1.7,1.0,1.3*34'
  19. GPHDT = b'$GPHDT,038.005,T*3B'
  20. GPGLL = b'$GPGLL,4916.45,N,12311.12,W,225444,A*31'
  21. GPGLL_PARTIAL = b'$GPGLL,3751.65,S,14507.36,E*77'
  22. GPGSV_SINGLE = b'$GPGSV,1,1,11,03,03,111,00,04,15,270,00,06,01,010,00,,,,*4b'
  23. GPGSV_EMPTY_MIDDLE = b'$GPGSV,1,1,11,03,03,111,00,,,,,,,,,13,06,292,00*75'
  24. GPGSV_SEQ = GPGSV_FIRST, GPGSV_MIDDLE, GPGSV_LAST = b"""
  25. $GPGSV,3,1,11,03,03,111,00,04,15,270,00,06,01,010,00,13,06,292,00*74
  26. $GPGSV,3,2,11,14,25,170,00,16,57,208,39,18,67,296,40,19,40,246,00*74
  27. $GPGSV,3,3,11,22,42,067,42,24,14,311,43,27,05,244,00,,,,*4D
  28. """.split()
  29. @implementer(ipositioning.INMEAReceiver)
  30. class NMEATestReceiver(object):
  31. """
  32. An NMEA receiver for testing.
  33. Remembers the last sentence it has received.
  34. """
  35. def __init__(self):
  36. self.clear()
  37. def clear(self):
  38. """
  39. Forgets the received sentence (if any), by setting
  40. C{self.receivedSentence} to L{None}.
  41. """
  42. self.receivedSentence = None
  43. def sentenceReceived(self, sentence):
  44. self.receivedSentence = sentence
  45. class CallbackTests(TestCase):
  46. """
  47. Tests if the NMEA protocol correctly calls its sentence callback.
  48. @ivar protocol: The NMEA protocol under test.
  49. @type protocol: L{nmea.NMEAProtocol}
  50. @ivar sentenceTypes: The set of sentence types of all sentences the test's
  51. sentence callback function has been called with.
  52. @type sentenceTypes: C{set}
  53. """
  54. def setUp(self):
  55. receiver = NMEATestReceiver()
  56. self.protocol = nmea.NMEAProtocol(receiver, self._sentenceCallback)
  57. self.sentenceTypes = set()
  58. def _sentenceCallback(self, sentence):
  59. """
  60. Remembers that a sentence of this type was fired.
  61. """
  62. self.sentenceTypes.add(sentence.type)
  63. def test_callbacksCalled(self):
  64. """
  65. The correct callbacks fire, and that *only* those fire.
  66. """
  67. sentencesByType = {
  68. 'GPGGA': [b'$GPGGA*56'],
  69. 'GPGLL': [b'$GPGLL*50'],
  70. 'GPGSA': [b'$GPGSA*42'],
  71. 'GPGSV': [b'$GPGSV*55'],
  72. 'GPHDT': [b'$GPHDT*4f'],
  73. 'GPRMC': [b'$GPRMC*4b']
  74. }
  75. for sentenceType, sentences in iteritems(sentencesByType):
  76. for sentence in sentences:
  77. self.protocol.lineReceived(sentence)
  78. self.assertEqual(self.sentenceTypes, set([sentenceType]))
  79. self.sentenceTypes.clear()
  80. class BrokenSentenceCallbackTests(TestCase):
  81. """
  82. Tests for broken NMEA sentence callbacks.
  83. """
  84. def setUp(self):
  85. receiver = NMEATestReceiver()
  86. self.protocol = nmea.NMEAProtocol(receiver, self._sentenceCallback)
  87. def _sentenceCallback(self, sentence):
  88. """
  89. Raises C{AttributeError}.
  90. """
  91. raise AttributeError("ERROR!!!")
  92. def test_dontSwallowCallbackExceptions(self):
  93. """
  94. An C{AttributeError} in the sentence callback of an C{NMEAProtocol}
  95. doesn't get swallowed.
  96. """
  97. lineReceived = self.protocol.lineReceived
  98. self.assertRaises(AttributeError, lineReceived, b'$GPGGA*56')
  99. class SplitTests(TestCase):
  100. """
  101. Checks splitting of NMEA sentences.
  102. """
  103. def test_withChecksum(self):
  104. """
  105. An NMEA sentence with a checksum gets split correctly.
  106. """
  107. splitSentence = nmea._split(b"$GPGGA,spam,eggs*00")
  108. self.assertEqual(splitSentence, [b'GPGGA', b'spam', b'eggs'])
  109. def test_noCheckum(self):
  110. """
  111. An NMEA sentence without a checksum gets split correctly.
  112. """
  113. splitSentence = nmea._split(b"$GPGGA,spam,eggs*")
  114. self.assertEqual(splitSentence, [b'GPGGA', b'spam', b'eggs'])
  115. class ChecksumTests(TestCase):
  116. """
  117. NMEA sentence checksum verification tests.
  118. """
  119. def test_valid(self):
  120. """
  121. Sentences with valid checksums get validated.
  122. """
  123. nmea._validateChecksum(GPGGA)
  124. def test_missing(self):
  125. """
  126. Sentences with missing checksums get validated.
  127. """
  128. nmea._validateChecksum(GPGGA[:-2])
  129. def test_invalid(self):
  130. """
  131. Sentences with a bad checksum raise L{base.InvalidChecksum} when
  132. attempting to validate them.
  133. """
  134. validate = nmea._validateChecksum
  135. bareSentence, checksum = GPGGA.split(b"*")
  136. badChecksum = intToBytes(int(checksum, 16) + 1)
  137. sentences = [bareSentence + b"*" + badChecksum]
  138. for s in sentences:
  139. self.assertRaises(base.InvalidChecksum, validate, s)
  140. class NMEAReceiverSetup(object):
  141. """
  142. A mixin for tests that need an NMEA receiver (and a protocol attached to
  143. it).
  144. @ivar receiver: An NMEA receiver that remembers the last sentence.
  145. @type receiver: L{NMEATestReceiver}
  146. @ivar protocol: An NMEA protocol attached to the receiver.
  147. @type protocol: L{twisted.positioning.nmea.NMEAProtocol}
  148. """
  149. def setUp(self):
  150. """
  151. Sets up an NMEA receiver.
  152. """
  153. self.receiver = NMEATestReceiver()
  154. self.protocol = nmea.NMEAProtocol(self.receiver)
  155. class GSVSequenceTests(NMEAReceiverSetup, TestCase):
  156. """
  157. Tests for the interpretation of GSV sequences.
  158. """
  159. def test_firstSentence(self):
  160. """
  161. The first sentence in a GSV sequence is correctly identified.
  162. """
  163. self.protocol.lineReceived(GPGSV_FIRST)
  164. sentence = self.receiver.receivedSentence
  165. self.assertTrue(sentence._isFirstGSVSentence())
  166. self.assertFalse(sentence._isLastGSVSentence())
  167. def test_middleSentence(self):
  168. """
  169. A sentence in the middle of a GSV sequence is correctly
  170. identified (as being neither the last nor the first).
  171. """
  172. self.protocol.lineReceived(GPGSV_MIDDLE)
  173. sentence = self.receiver.receivedSentence
  174. self.assertFalse(sentence._isFirstGSVSentence())
  175. self.assertFalse(sentence._isLastGSVSentence())
  176. def test_lastSentence(self):
  177. """
  178. The last sentence in a GSV sequence is correctly identified.
  179. """
  180. self.protocol.lineReceived(GPGSV_LAST)
  181. sentence = self.receiver.receivedSentence
  182. self.assertFalse(sentence._isFirstGSVSentence())
  183. self.assertTrue(sentence._isLastGSVSentence())
  184. class BogusSentenceTests(NMEAReceiverSetup, TestCase):
  185. """
  186. Tests for verifying predictable failure for bogus NMEA sentences.
  187. """
  188. def assertRaisesOnSentence(self, exceptionClass, sentence):
  189. """
  190. Asserts that the protocol raises C{exceptionClass} when it receives
  191. C{sentence}.
  192. @param exceptionClass: The exception class expected to be raised.
  193. @type exceptionClass: C{Exception} subclass
  194. @param sentence: The (bogus) NMEA sentence.
  195. @type sentence: C{str}
  196. """
  197. self.assertRaises(exceptionClass, self.protocol.lineReceived, sentence)
  198. def test_raiseOnUnknownSentenceType(self):
  199. """
  200. Receiving a well-formed sentence of unknown type raises
  201. C{ValueError}.
  202. """
  203. self.assertRaisesOnSentence(ValueError, b"$GPBOGUS*5b")
  204. def test_raiseOnMalformedSentences(self):
  205. """
  206. Receiving a malformed sentence raises L{base.InvalidSentence}.
  207. """
  208. self.assertRaisesOnSentence(base.InvalidSentence, "GPBOGUS")
  209. class NMEASentenceTests(NMEAReceiverSetup, TestCase):
  210. """
  211. Tests for L{nmea.NMEASentence} objects.
  212. """
  213. def test_repr(self):
  214. """
  215. The C{repr} of L{nmea.NMEASentence} objects is correct.
  216. """
  217. sentencesWithExpectedRepr = [
  218. (GPGSA,
  219. "<NMEASentence (GPGSA) {"
  220. "dataMode: A, "
  221. "fixType: 3, "
  222. "horizontalDilutionOfPrecision: 1.0, "
  223. "positionDilutionOfPrecision: 1.7, "
  224. "usedSatellitePRN_0: 19, "
  225. "usedSatellitePRN_1: 28, "
  226. "usedSatellitePRN_2: 14, "
  227. "usedSatellitePRN_3: 18, "
  228. "usedSatellitePRN_4: 27, "
  229. "usedSatellitePRN_5: 22, "
  230. "usedSatellitePRN_6: 31, "
  231. "usedSatellitePRN_7: 39, "
  232. "verticalDilutionOfPrecision: 1.3"
  233. "}>"),
  234. ]
  235. for sentence, expectedRepr in sentencesWithExpectedRepr:
  236. self.protocol.lineReceived(sentence)
  237. received = self.receiver.receivedSentence
  238. self.assertEqual(repr(received), expectedRepr)
  239. class ParsingTests(NMEAReceiverSetup, TestCase):
  240. """
  241. Tests if raw NMEA sentences get parsed correctly.
  242. This doesn't really involve any interpretation, just turning ugly raw NMEA
  243. representations into objects that are more pleasant to work with.
  244. """
  245. def _parserTest(self, sentence, expected):
  246. """
  247. Passes a sentence to the protocol and gets the parsed sentence from
  248. the receiver. Then verifies that the parsed sentence contains the
  249. expected data.
  250. """
  251. self.protocol.lineReceived(sentence)
  252. received = self.receiver.receivedSentence
  253. self.assertEqual(expected, received._sentenceData)
  254. def test_fullRMC(self):
  255. """
  256. A full RMC sentence is correctly parsed.
  257. """
  258. expected = {
  259. 'type': 'GPRMC',
  260. 'latitudeFloat': '4807.038',
  261. 'latitudeHemisphere': 'N',
  262. 'longitudeFloat': '01131.000',
  263. 'longitudeHemisphere': 'E',
  264. 'magneticVariation': '003.1',
  265. 'magneticVariationDirection': 'W',
  266. 'speedInKnots': '022.4',
  267. 'timestamp': '123519',
  268. 'datestamp': '230394',
  269. 'trueHeading': '084.4',
  270. 'dataMode': 'A',
  271. }
  272. self._parserTest(GPRMC, expected)
  273. def test_fullGGA(self):
  274. """
  275. A full GGA sentence is correctly parsed.
  276. """
  277. expected = {
  278. 'type': 'GPGGA',
  279. 'altitude': '545.4',
  280. 'altitudeUnits': 'M',
  281. 'heightOfGeoidAboveWGS84': '46.9',
  282. 'heightOfGeoidAboveWGS84Units': 'M',
  283. 'horizontalDilutionOfPrecision': '0.9',
  284. 'latitudeFloat': '4807.038',
  285. 'latitudeHemisphere': 'N',
  286. 'longitudeFloat': '01131.000',
  287. 'longitudeHemisphere': 'E',
  288. 'numberOfSatellitesSeen': '08',
  289. 'timestamp': '123519',
  290. 'fixQuality': '1',
  291. }
  292. self._parserTest(GPGGA, expected)
  293. def test_fullGLL(self):
  294. """
  295. A full GLL sentence is correctly parsed.
  296. """
  297. expected = {
  298. 'type': 'GPGLL',
  299. 'latitudeFloat': '4916.45',
  300. 'latitudeHemisphere': 'N',
  301. 'longitudeFloat': '12311.12',
  302. 'longitudeHemisphere': 'W',
  303. 'timestamp': '225444',
  304. 'dataMode': 'A',
  305. }
  306. self._parserTest(GPGLL, expected)
  307. def test_partialGLL(self):
  308. """
  309. A partial GLL sentence is correctly parsed.
  310. """
  311. expected = {
  312. 'type': 'GPGLL',
  313. 'latitudeFloat': '3751.65',
  314. 'latitudeHemisphere': 'S',
  315. 'longitudeFloat': '14507.36',
  316. 'longitudeHemisphere': 'E',
  317. }
  318. self._parserTest(GPGLL_PARTIAL, expected)
  319. def test_fullGSV(self):
  320. """
  321. A full GSV sentence is correctly parsed.
  322. """
  323. expected = {
  324. 'type': 'GPGSV',
  325. 'GSVSentenceIndex': '1',
  326. 'numberOfGSVSentences': '3',
  327. 'numberOfSatellitesSeen': '11',
  328. 'azimuth_0': '111',
  329. 'azimuth_1': '270',
  330. 'azimuth_2': '010',
  331. 'azimuth_3': '292',
  332. 'elevation_0': '03',
  333. 'elevation_1': '15',
  334. 'elevation_2': '01',
  335. 'elevation_3': '06',
  336. 'satellitePRN_0': '03',
  337. 'satellitePRN_1': '04',
  338. 'satellitePRN_2': '06',
  339. 'satellitePRN_3': '13',
  340. 'signalToNoiseRatio_0': '00',
  341. 'signalToNoiseRatio_1': '00',
  342. 'signalToNoiseRatio_2': '00',
  343. 'signalToNoiseRatio_3': '00',
  344. }
  345. self._parserTest(GPGSV_FIRST, expected)
  346. def test_partialGSV(self):
  347. """
  348. A partial GSV sentence is correctly parsed.
  349. """
  350. expected = {
  351. 'type': 'GPGSV',
  352. 'GSVSentenceIndex': '3',
  353. 'numberOfGSVSentences': '3',
  354. 'numberOfSatellitesSeen': '11',
  355. 'azimuth_0': '067',
  356. 'azimuth_1': '311',
  357. 'azimuth_2': '244',
  358. 'elevation_0': '42',
  359. 'elevation_1': '14',
  360. 'elevation_2': '05',
  361. 'satellitePRN_0': '22',
  362. 'satellitePRN_1': '24',
  363. 'satellitePRN_2': '27',
  364. 'signalToNoiseRatio_0': '42',
  365. 'signalToNoiseRatio_1': '43',
  366. 'signalToNoiseRatio_2': '00',
  367. }
  368. self._parserTest(GPGSV_LAST, expected)
  369. def test_fullHDT(self):
  370. """
  371. A full HDT sentence is correctly parsed.
  372. """
  373. expected = {
  374. 'type': 'GPHDT',
  375. 'trueHeading': '038.005',
  376. }
  377. self._parserTest(GPHDT, expected)
  378. def test_typicalGSA(self):
  379. """
  380. A typical GSA sentence is correctly parsed.
  381. """
  382. expected = {
  383. 'type': 'GPGSA',
  384. 'dataMode': 'A',
  385. 'fixType': '3',
  386. 'usedSatellitePRN_0': '19',
  387. 'usedSatellitePRN_1': '28',
  388. 'usedSatellitePRN_2': '14',
  389. 'usedSatellitePRN_3': '18',
  390. 'usedSatellitePRN_4': '27',
  391. 'usedSatellitePRN_5': '22',
  392. 'usedSatellitePRN_6': '31',
  393. 'usedSatellitePRN_7': '39',
  394. 'positionDilutionOfPrecision': '1.7',
  395. 'horizontalDilutionOfPrecision': '1.0',
  396. 'verticalDilutionOfPrecision': '1.3',
  397. }
  398. self._parserTest(GPGSA, expected)
  399. class FixUnitsTests(TestCase):
  400. """
  401. Tests for the generic unit fixing method, L{nmea.NMEAAdapter._fixUnits}.
  402. @ivar adapter: The NMEA adapter.
  403. @type adapter: L{nmea.NMEAAdapter}
  404. """
  405. def setUp(self):
  406. self.adapter = nmea.NMEAAdapter(base.BasePositioningReceiver())
  407. def test_noValueKey(self):
  408. """
  409. Tests that when no C{valueKey} is provided, C{unitKey} is used, minus
  410. C{"Units"} at the end.
  411. """
  412. class FakeSentence(object):
  413. """
  414. A fake sentence that just has a "foo" attribute.
  415. """
  416. def __init__(self):
  417. self.foo = 1
  418. self.adapter.currentSentence = FakeSentence()
  419. self.adapter._fixUnits(unitKey="fooUnits", unit="N")
  420. self.assertNotEqual(self.adapter._sentenceData["foo"], 1)
  421. def test_unitKeyButNoUnit(self):
  422. """
  423. Tests that if a unit key is provided but the unit isn't, the unit is
  424. automatically determined from the unit key.
  425. """
  426. class FakeSentence(object):
  427. """
  428. A fake sentence that just has "foo" and "fooUnits" attributes.
  429. """
  430. def __init__(self):
  431. self.foo = 1
  432. self.fooUnits = "N"
  433. self.adapter.currentSentence = FakeSentence()
  434. self.adapter._fixUnits(unitKey="fooUnits")
  435. self.assertNotEqual(self.adapter._sentenceData["foo"], 1)
  436. def test_noValueKeyAndNoUnitKey(self):
  437. """
  438. Tests that when a unit is specified but neither C{valueKey} nor
  439. C{unitKey} is provided, C{ValueError} is raised.
  440. """
  441. self.assertRaises(ValueError, self.adapter._fixUnits, unit="K")
  442. class FixerTestMixin(object):
  443. """
  444. Mixin for tests for the fixers on L{nmea.NMEAAdapter} that adapt
  445. from NMEA-specific notations to generic Python objects.
  446. @ivar adapter: The NMEA adapter.
  447. @type adapter: L{nmea.NMEAAdapter}
  448. """
  449. def setUp(self):
  450. self.adapter = nmea.NMEAAdapter(base.BasePositioningReceiver())
  451. def _fixerTest(self, sentenceData, expected=None, exceptionClass=None):
  452. """
  453. A generic adapter fixer test.
  454. Creates a sentence from the C{sentenceData} and sends that to the
  455. adapter. If C{exceptionClass} is not passed, this is assumed to work,
  456. and C{expected} is compared with the adapter's internal state.
  457. Otherwise, passing the sentence to the adapter is checked to raise
  458. C{exceptionClass}.
  459. @param sentenceData: Raw sentence content.
  460. @type sentenceData: C{dict} mapping C{str} to C{str}
  461. @param expected: The expected state of the adapter.
  462. @type expected: C{dict} or L{None}
  463. @param exceptionClass: The exception to be raised by the adapter.
  464. @type exceptionClass: subclass of C{Exception}
  465. """
  466. sentence = nmea.NMEASentence(sentenceData)
  467. def receiveSentence():
  468. self.adapter.sentenceReceived(sentence)
  469. if exceptionClass is None:
  470. receiveSentence()
  471. self.assertEqual(self.adapter._state, expected)
  472. else:
  473. self.assertRaises(exceptionClass, receiveSentence)
  474. self.adapter.clear()
  475. class TimestampFixerTests(FixerTestMixin, TestCase):
  476. """
  477. Tests conversion from NMEA timestamps to C{datetime.time} objects.
  478. """
  479. def test_simple(self):
  480. """
  481. A simple timestamp is converted correctly.
  482. """
  483. data = {'timestamp': '123456'} # 12:34:56Z
  484. expected = {'_time': datetime.time(12, 34, 56)}
  485. self._fixerTest(data, expected)
  486. def test_broken(self):
  487. """
  488. A broken timestamp raises C{ValueError}.
  489. """
  490. badTimestamps = '993456', '129956', '123499'
  491. for t in badTimestamps:
  492. self._fixerTest({'timestamp': t}, exceptionClass=ValueError)
  493. class DatestampFixerTests(FixerTestMixin, TestCase):
  494. def test_defaultYearThreshold(self):
  495. """
  496. The default year threshold is 1980.
  497. """
  498. self.assertEqual(self.adapter.yearThreshold, 1980)
  499. def test_beforeThreshold(self):
  500. """
  501. Dates before the threshold are interpreted as being in the century
  502. after the threshold. (Since the threshold is the earliest possible
  503. date.)
  504. """
  505. datestring, date = '010115', datetime.date(2015, 1, 1)
  506. self._fixerTest({'datestamp': datestring}, {'_date': date})
  507. def test_afterThreshold(self):
  508. """
  509. Dates after the threshold are interpreted as being in the same century
  510. as the threshold.
  511. """
  512. datestring, date = '010195', datetime.date(1995, 1, 1)
  513. self._fixerTest({'datestamp': datestring}, {'_date': date})
  514. def test_invalidMonth(self):
  515. """
  516. A datestring with an invalid month (> 12) raises C{ValueError}.
  517. """
  518. self._fixerTest({'datestamp': '011301'}, exceptionClass=ValueError)
  519. def test_invalidDay(self):
  520. """
  521. A datestring with an invalid day (more days than there are in that
  522. month) raises C{ValueError}.
  523. """
  524. self._fixerTest({'datestamp': '320101'}, exceptionClass=ValueError)
  525. self._fixerTest({'datestamp': '300201'}, exceptionClass=ValueError)
  526. def _nmeaFloat(degrees, minutes):
  527. """
  528. Builds an NMEA float representation for a given angle in degrees and
  529. decimal minutes.
  530. @param degrees: The integer degrees for this angle.
  531. @type degrees: C{int}
  532. @param minutes: The decimal minutes value for this angle.
  533. @type minutes: C{float}
  534. @return: The NMEA float representation for this angle.
  535. @rtype: C{str}
  536. """
  537. return "%i%0.3f" % (degrees, minutes)
  538. def _coordinateSign(hemisphere):
  539. """
  540. Return the sign of a coordinate.
  541. This is C{1} if the coordinate is in the northern or eastern hemispheres,
  542. C{-1} otherwise.
  543. @param hemisphere: NMEA shorthand for the hemisphere. One of "NESW".
  544. @type hemisphere: C{str}
  545. @return: The sign of the coordinate value.
  546. @rtype: C{int}
  547. """
  548. return 1 if hemisphere in "NE" else -1
  549. def _coordinateType(hemisphere):
  550. """
  551. Return the type of a coordinate.
  552. This is L{Angles.LATITUDE} if the coordinate is in the northern or
  553. southern hemispheres, L{Angles.LONGITUDE} otherwise.
  554. @param hemisphere: NMEA shorthand for the hemisphere. One of "NESW".
  555. @type hemisphere: C{str}
  556. @return: The type of the coordinate (L{Angles.LATITUDE} or
  557. L{Angles.LONGITUDE})
  558. """
  559. return Angles.LATITUDE if hemisphere in "NS" else Angles.LONGITUDE
  560. class CoordinateFixerTests(FixerTestMixin, TestCase):
  561. """
  562. Tests turning NMEA coordinate notations into something more pleasant.
  563. """
  564. def test_north(self):
  565. """
  566. NMEA coordinate representations in the northern hemisphere
  567. convert correctly.
  568. """
  569. sentenceData = {"latitudeFloat": "1030.000", "latitudeHemisphere": "N"}
  570. state = {"latitude": base.Coordinate(10.5, Angles.LATITUDE)}
  571. self._fixerTest(sentenceData, state)
  572. def test_south(self):
  573. """
  574. NMEA coordinate representations in the southern hemisphere
  575. convert correctly.
  576. """
  577. sentenceData = {"latitudeFloat": "1030.000", "latitudeHemisphere": "S"}
  578. state = {"latitude": base.Coordinate(-10.5, Angles.LATITUDE)}
  579. self._fixerTest(sentenceData, state)
  580. def test_east(self):
  581. """
  582. NMEA coordinate representations in the eastern hemisphere
  583. convert correctly.
  584. """
  585. sentenceData = {"longitudeFloat": "1030.000", "longitudeHemisphere": "E"}
  586. state = {"longitude": base.Coordinate(10.5, Angles.LONGITUDE)}
  587. self._fixerTest(sentenceData, state)
  588. def test_west(self):
  589. """
  590. NMEA coordinate representations in the western hemisphere
  591. convert correctly.
  592. """
  593. sentenceData = {"longitudeFloat": "1030.000", "longitudeHemisphere": "W"}
  594. state = {"longitude": base.Coordinate(-10.5, Angles.LONGITUDE)}
  595. self._fixerTest(sentenceData, state)
  596. def test_badHemisphere(self):
  597. """
  598. NMEA coordinate representations for nonexistent hemispheres
  599. raise C{ValueError} when you attempt to parse them.
  600. """
  601. sentenceData = {'longitudeHemisphere': 'Q'}
  602. self._fixerTest(sentenceData, exceptionClass=ValueError)
  603. def test_badHemisphereSign(self):
  604. """
  605. NMEA coordinate repesentation parsing fails predictably
  606. when you pass nonexistent coordinate types (not latitude or
  607. longitude).
  608. """
  609. getSign = lambda: self.adapter._getHemisphereSign("BOGUS_VALUE")
  610. self.assertRaises(ValueError, getSign)
  611. class AltitudeFixerTests(FixerTestMixin, TestCase):
  612. """
  613. Tests that NMEA representations of altitudes are correctly converted.
  614. """
  615. def test_fixAltitude(self):
  616. """
  617. The NMEA representation of an altitude (above mean sea level)
  618. is correctly converted.
  619. """
  620. key, value = 'altitude', '545.4'
  621. altitude = base.Altitude(float(value))
  622. self._fixerTest({key: value}, {key: altitude})
  623. def test_heightOfGeoidAboveWGS84(self):
  624. """
  625. The NMEA representation of an altitude of the geoid (above the
  626. WGS84 reference level) is correctly converted.
  627. """
  628. key, value = 'heightOfGeoidAboveWGS84', '46.9'
  629. altitude = base.Altitude(float(value))
  630. self._fixerTest({key: value}, {key: altitude})
  631. class SpeedFixerTests(FixerTestMixin, TestCase):
  632. """
  633. Tests that NMEA representations of speeds are correctly converted.
  634. """
  635. def test_speedInKnots(self):
  636. """
  637. Speeds reported in knots correctly get converted to meters per
  638. second.
  639. """
  640. key, value, targetKey = "speedInKnots", "10", "speed"
  641. speed = base.Speed(float(value) * base.MPS_PER_KNOT)
  642. self._fixerTest({key: value}, {targetKey: speed})
  643. class VariationFixerTests(FixerTestMixin, TestCase):
  644. """
  645. Tests if the absolute values of magnetic variations on the heading
  646. and their sign get combined correctly, and if that value gets
  647. combined with a heading correctly.
  648. """
  649. def test_west(self):
  650. """
  651. Tests westward (negative) magnetic variation.
  652. """
  653. variation, direction = "1.34", "W"
  654. heading = base.Heading.fromFloats(variationValue=-1*float(variation))
  655. sentenceData = {'magneticVariation': variation,
  656. 'magneticVariationDirection': direction}
  657. self._fixerTest(sentenceData, {'heading': heading})
  658. def test_east(self):
  659. """
  660. Tests eastward (positive) magnetic variation.
  661. """
  662. variation, direction = "1.34", "E"
  663. heading = base.Heading.fromFloats(variationValue=float(variation))
  664. sentenceData = {'magneticVariation': variation,
  665. 'magneticVariationDirection': direction}
  666. self._fixerTest(sentenceData, {'heading': heading})
  667. def test_withHeading(self):
  668. """
  669. Variation values get combined with headings correctly.
  670. """
  671. trueHeading, variation, direction = "123.12", "1.34", "E"
  672. sentenceData = {'trueHeading': trueHeading,
  673. 'magneticVariation': variation,
  674. 'magneticVariationDirection': direction}
  675. heading = base.Heading.fromFloats(float(trueHeading),
  676. variationValue=float(variation))
  677. self._fixerTest(sentenceData, {'heading': heading})
  678. class PositionErrorFixerTests(FixerTestMixin, TestCase):
  679. """
  680. Position errors in NMEA are passed as dilutions of precision (DOP). This
  681. is a measure relative to some specified value of the GPS device as its
  682. "reference" precision. Unfortunately, there are very few ways of figuring
  683. this out from just the device (sans manual).
  684. There are two basic DOP values: vertical and horizontal. HDOP tells you
  685. how precise your location is on the face of the earth (pretending it's
  686. flat, at least locally). VDOP tells you how precise your altitude is
  687. known. PDOP (position DOP) is a dependent value defined as the Euclidean
  688. norm of those two, and gives you a more generic "goodness of fix" value.
  689. """
  690. def test_simple(self):
  691. self._fixerTest(
  692. {'horizontalDilutionOfPrecision': '11'},
  693. {'positionError': base.PositionError(hdop=11.)})
  694. def test_mixing(self):
  695. pdop, hdop, vdop = "1", "1", "1"
  696. positionError = base.PositionError(pdop=float(pdop),
  697. hdop=float(hdop),
  698. vdop=float(vdop))
  699. sentenceData = {'positionDilutionOfPrecision': pdop,
  700. 'horizontalDilutionOfPrecision': hdop,
  701. 'verticalDilutionOfPrecision': vdop}
  702. self._fixerTest(sentenceData, {"positionError": positionError})
  703. class ValidFixTests(FixerTestMixin, TestCase):
  704. """
  705. Tests that data reported from a valid fix is used.
  706. """
  707. def test_GGA(self):
  708. """
  709. GGA data with a valid fix is used.
  710. """
  711. sentenceData = {'type': 'GPGGA',
  712. 'altitude': '545.4',
  713. 'fixQuality': nmea.GPGGAFixQualities.GPS_FIX}
  714. expectedState = {'altitude': base.Altitude(545.4)}
  715. self._fixerTest(sentenceData, expectedState)
  716. def test_GLL(self):
  717. """
  718. GLL data with a valid data mode is used.
  719. """
  720. sentenceData = {'type': 'GPGLL',
  721. 'altitude': '545.4',
  722. 'dataMode': nmea.GPGLLGPRMCFixQualities.ACTIVE}
  723. expectedState = {'altitude': base.Altitude(545.4)}
  724. self._fixerTest(sentenceData, expectedState)
  725. class InvalidFixTests(FixerTestMixin, TestCase):
  726. """
  727. Tests that data being reported from a bad or incomplete fix isn't
  728. used. Although the specification dictates that GPSes shouldn't produce
  729. NMEA sentences with real-looking values for altitude or position in them
  730. unless they have at least some semblance of a GPS fix, this is widely
  731. ignored.
  732. """
  733. def _invalidFixTest(self, sentenceData):
  734. """
  735. Sentences with an invalid fix or data mode result in empty
  736. state (ie, the data isn't used).
  737. """
  738. self._fixerTest(sentenceData, {})
  739. def test_GGA(self):
  740. """
  741. GGA sentence data is unused when there is no fix.
  742. """
  743. sentenceData = {'type': 'GPGGA',
  744. 'altitude': '545.4',
  745. 'fixQuality': nmea.GPGGAFixQualities.INVALID_FIX}
  746. self._invalidFixTest(sentenceData)
  747. def test_GLL(self):
  748. """
  749. GLL sentence data is unused when the data is flagged as void.
  750. """
  751. sentenceData = {'type': 'GPGLL',
  752. 'altitude': '545.4',
  753. 'dataMode': nmea.GPGLLGPRMCFixQualities.VOID}
  754. self._invalidFixTest(sentenceData)
  755. def test_badGSADataMode(self):
  756. """
  757. GSA sentence data is not used when there is no GPS fix, but
  758. the data mode claims the data is "active". Some GPSes do
  759. this, unfortunately, and that means you shouldn't use the
  760. data.
  761. """
  762. sentenceData = {'type': 'GPGSA',
  763. 'altitude': '545.4',
  764. 'dataMode': nmea.GPGLLGPRMCFixQualities.ACTIVE,
  765. 'fixType': nmea.GPGSAFixTypes.GSA_NO_FIX}
  766. self._invalidFixTest(sentenceData)
  767. def test_badGSAFixType(self):
  768. """
  769. GSA sentence data is not used when the fix claims to be valid
  770. (albeit only 2D), but the data mode says the data is void.
  771. Some GPSes do this, unfortunately, and that means you
  772. shouldn't use the data.
  773. """
  774. sentenceData = {'type': 'GPGSA',
  775. 'altitude': '545.4',
  776. 'dataMode': nmea.GPGLLGPRMCFixQualities.VOID,
  777. 'fixType': nmea.GPGSAFixTypes.GSA_2D_FIX}
  778. self._invalidFixTest(sentenceData)
  779. def test_badGSADataModeAndFixType(self):
  780. """
  781. GSA sentence data is not use when neither the fix nor the data
  782. mode is any good.
  783. """
  784. sentenceData = {'type': 'GPGSA',
  785. 'altitude': '545.4',
  786. 'dataMode': nmea.GPGLLGPRMCFixQualities.VOID,
  787. 'fixType': nmea.GPGSAFixTypes.GSA_NO_FIX}
  788. self._invalidFixTest(sentenceData)
  789. class NMEAReceiverTests(TestCase):
  790. """
  791. Tests for the NMEA receiver.
  792. """
  793. def setUp(self):
  794. self.receiver = MockPositioningReceiver()
  795. self.adapter = nmea.NMEAAdapter(self.receiver)
  796. self.protocol = nmea.NMEAProtocol(self.adapter)
  797. def test_onlyFireWhenCurrentSentenceHasNewInformation(self):
  798. """
  799. If the current sentence does not contain any new fields for a
  800. particular callback, that callback is not called; even if all
  801. necessary information is still in the state from one or more
  802. previous messages.
  803. """
  804. self.protocol.lineReceived(GPGGA)
  805. gpggaCallbacks = set(['positionReceived',
  806. 'positionErrorReceived',
  807. 'altitudeReceived'])
  808. self.assertEqual(set(self.receiver.called.keys()), gpggaCallbacks)
  809. self.receiver.clear()
  810. self.assertNotEqual(self.adapter._state, {})
  811. # GPHDT contains heading information but not position,
  812. # altitude or anything like that; but that information is
  813. # still in the state.
  814. self.protocol.lineReceived(GPHDT)
  815. gphdtCallbacks = set(['headingReceived'])
  816. self.assertEqual(set(self.receiver.called.keys()), gphdtCallbacks)
  817. def _receiverTest(self, sentences, expectedFired=(), extraTest=None):
  818. """
  819. A generic test for NMEA receiver behavior.
  820. @param sentences: The sequence of sentences to simulate receiving.
  821. @type sentences: iterable of C{str}
  822. @param expectedFired: The names of the callbacks expected to fire.
  823. @type expectedFired: iterable of C{str}
  824. @param extraTest: An optional extra test hook.
  825. @type extraTest: nullary callable
  826. """
  827. for sentence in sentences:
  828. self.protocol.lineReceived(sentence)
  829. actuallyFired = self.receiver.called.keys()
  830. self.assertEqual(set(actuallyFired), set(expectedFired))
  831. if extraTest is not None:
  832. extraTest()
  833. self.receiver.clear()
  834. self.adapter.clear()
  835. def test_positionErrorUpdateAcrossStates(self):
  836. """
  837. The positioning error is updated across multiple states.
  838. """
  839. sentences = [GPGSA] + GPGSV_SEQ
  840. callbacksFired = ['positionErrorReceived', 'beaconInformationReceived']
  841. def _getIdentifiers(beacons):
  842. return sorted(map(attrgetter("identifier"), beacons))
  843. def checkBeaconInformation():
  844. beaconInformation = self.adapter._state['beaconInformation']
  845. seenIdentifiers = _getIdentifiers(beaconInformation.seenBeacons)
  846. expected = [3, 4, 6, 13, 14, 16, 18, 19, 22, 24, 27]
  847. self.assertEqual(seenIdentifiers, expected)
  848. usedIdentifiers = _getIdentifiers(beaconInformation.usedBeacons)
  849. # These are not actually all the PRNs in the sample GPGSA:
  850. # only the ones also reported by the GPGSV sequence. This
  851. # is just because the sample data doesn't come from the
  852. # same reporting cycle of a GPS device.
  853. self.assertEqual(usedIdentifiers, [14, 18, 19, 22, 27])
  854. self._receiverTest(sentences, callbacksFired, checkBeaconInformation)
  855. def test_emptyMiddleGSV(self):
  856. """
  857. A GSV sentence with empty entries in any position does not mean that
  858. entries in subsequent positions of the same GSV sentence are ignored.
  859. """
  860. sentences = [GPGSV_EMPTY_MIDDLE]
  861. callbacksFired = ['beaconInformationReceived']
  862. def checkBeaconInformation():
  863. beaconInformation = self.adapter._state['beaconInformation']
  864. seenBeacons = beaconInformation.seenBeacons
  865. self.assertEqual(len(seenBeacons), 2)
  866. self.assertIn(13, [b.identifier for b in seenBeacons])
  867. self._receiverTest(sentences, callbacksFired, checkBeaconInformation)
  868. def test_GGASentences(self):
  869. """
  870. A sequence of GGA sentences fires C{positionReceived},
  871. C{positionErrorReceived} and C{altitudeReceived}.
  872. """
  873. sentences = [GPGGA]
  874. callbacksFired = ['positionReceived',
  875. 'positionErrorReceived',
  876. 'altitudeReceived']
  877. self._receiverTest(sentences, callbacksFired)
  878. def test_GGAWithDateInState(self):
  879. """
  880. When receiving a GPGGA sentence and a date was already in the
  881. state, the new time (from the GPGGA sentence) is combined with
  882. that date.
  883. """
  884. self.adapter._state["_date"] = datetime.date(2014, 1, 1)
  885. sentences = [GPGGA]
  886. callbacksFired = ['positionReceived',
  887. 'positionErrorReceived',
  888. 'altitudeReceived',
  889. 'timeReceived']
  890. self._receiverTest(sentences, callbacksFired)
  891. def test_RMCSentences(self):
  892. """
  893. A sequence of RMC sentences fires C{positionReceived},
  894. C{speedReceived}, C{headingReceived} and C{timeReceived}.
  895. """
  896. sentences = [GPRMC]
  897. callbacksFired = ['headingReceived',
  898. 'speedReceived',
  899. 'positionReceived',
  900. 'timeReceived']
  901. self._receiverTest(sentences, callbacksFired)
  902. def test_GSVSentences(self):
  903. """
  904. A complete sequence of GSV sentences fires
  905. C{beaconInformationReceived}.
  906. """
  907. sentences = [GPGSV_FIRST, GPGSV_MIDDLE, GPGSV_LAST]
  908. callbacksFired = ['beaconInformationReceived']
  909. def checkPartialInformation():
  910. self.assertNotIn('_partialBeaconInformation', self.adapter._state)
  911. self._receiverTest(sentences, callbacksFired, checkPartialInformation)
  912. def test_emptyMiddleEntriesGSVSequence(self):
  913. """
  914. A complete sequence of GSV sentences with empty entries in the
  915. middle still fires C{beaconInformationReceived}.
  916. """
  917. sentences = [GPGSV_EMPTY_MIDDLE]
  918. self._receiverTest(sentences, ["beaconInformationReceived"])
  919. def test_incompleteGSVSequence(self):
  920. """
  921. An incomplete sequence of GSV sentences does not fire any callbacks.
  922. """
  923. sentences = [GPGSV_FIRST]
  924. self._receiverTest(sentences)
  925. def test_singleSentenceGSVSequence(self):
  926. """
  927. The parser does not fail badly when the sequence consists of
  928. only one sentence (but is otherwise complete).
  929. """
  930. sentences = [GPGSV_SINGLE]
  931. self._receiverTest(sentences, ["beaconInformationReceived"])
  932. def test_GLLSentences(self):
  933. """
  934. GLL sentences fire C{positionReceived}.
  935. """
  936. sentences = [GPGLL_PARTIAL, GPGLL]
  937. self._receiverTest(sentences, ['positionReceived'])
  938. def test_HDTSentences(self):
  939. """
  940. HDT sentences fire C{headingReceived}.
  941. """
  942. sentences = [GPHDT]
  943. self._receiverTest(sentences, ['headingReceived'])
  944. def test_mixedSentences(self):
  945. """
  946. A mix of sentences fires the correct callbacks.
  947. """
  948. sentences = [GPRMC, GPGGA]
  949. callbacksFired = ['altitudeReceived',
  950. 'speedReceived',
  951. 'positionReceived',
  952. 'positionErrorReceived',
  953. 'timeReceived',
  954. 'headingReceived']
  955. def checkTime():
  956. expectedDateTime = datetime.datetime(1994, 3, 23, 12, 35, 19)
  957. self.assertEqual(self.adapter._state['time'], expectedDateTime)
  958. self._receiverTest(sentences, callbacksFired, checkTime)
  959. def test_lotsOfMixedSentences(self):
  960. """
  961. Sends an entire gamut of sentences and verifies the
  962. appropriate callbacks fire. These are more than you'd expect
  963. from your average consumer GPS device. They have most of the
  964. important information, including beacon information and
  965. visibility.
  966. """
  967. sentences = [GPGSA] + GPGSV_SEQ + [GPRMC, GPGGA, GPGLL]
  968. callbacksFired = ['headingReceived',
  969. 'beaconInformationReceived',
  970. 'speedReceived',
  971. 'positionReceived',
  972. 'timeReceived',
  973. 'altitudeReceived',
  974. 'positionErrorReceived']
  975. self._receiverTest(sentences, callbacksFired)