12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471247224732474247524762477247824792480248124822483248424852486248724882489249024912492249324942495249624972498249925002501250225032504250525062507250825092510251125122513251425152516251725182519252025212522252325242525252625272528252925302531253225332534253525362537253825392540254125422543254425452546254725482549255025512552255325542555255625572558255925602561256225632564256525662567256825692570257125722573257425752576257725782579258025812582258325842585258625872588258925902591259225932594259525962597259825992600260126022603260426052606260726082609261026112612261326142615261626172618261926202621262226232624262526262627262826292630263126322633263426352636263726382639264026412642264326442645264626472648264926502651265226532654265526562657265826592660266126622663266426652666266726682669267026712672267326742675267626772678267926802681268226832684268526862687268826892690269126922693269426952696269726982699270027012702270327042705270627072708 |
- """SQL io tests
- The SQL tests are broken down in different classes:
- - `PandasSQLTest`: base class with common methods for all test classes
- - Tests for the public API (only tests with sqlite3)
- - `_TestSQLApi` base class
- - `TestSQLApi`: test the public API with sqlalchemy engine
- - `TestSQLiteFallbackApi`: test the public API with a sqlite DBAPI
- connection
- - Tests for the different SQL flavors (flavor specific type conversions)
- - Tests for the sqlalchemy mode: `_TestSQLAlchemy` is the base class with
- common methods, `_TestSQLAlchemyConn` tests the API with a SQLAlchemy
- Connection object. The different tested flavors (sqlite3, MySQL,
- PostgreSQL) derive from the base class
- - Tests for the fallback mode (`TestSQLiteFallback`)
- """
- from __future__ import print_function
- import csv
- from datetime import date, datetime, time
- import sqlite3
- import warnings
- import numpy as np
- import pytest
- import pandas.compat as compat
- from pandas.compat import PY36, lrange, range, string_types
- from pandas.core.dtypes.common import (
- is_datetime64_dtype, is_datetime64tz_dtype)
- import pandas as pd
- from pandas import (
- DataFrame, Index, MultiIndex, Series, Timestamp, concat, date_range, isna,
- to_datetime, to_timedelta)
- import pandas.util.testing as tm
- import pandas.io.sql as sql
- from pandas.io.sql import read_sql_query, read_sql_table
- try:
- import sqlalchemy
- import sqlalchemy.schema
- import sqlalchemy.sql.sqltypes as sqltypes
- from sqlalchemy.ext import declarative
- from sqlalchemy.orm import session as sa_session
- SQLALCHEMY_INSTALLED = True
- except ImportError:
- SQLALCHEMY_INSTALLED = False
- SQL_STRINGS = {
- 'create_iris': {
- 'sqlite': """CREATE TABLE iris (
- "SepalLength" REAL,
- "SepalWidth" REAL,
- "PetalLength" REAL,
- "PetalWidth" REAL,
- "Name" TEXT
- )""",
- 'mysql': """CREATE TABLE iris (
- `SepalLength` DOUBLE,
- `SepalWidth` DOUBLE,
- `PetalLength` DOUBLE,
- `PetalWidth` DOUBLE,
- `Name` VARCHAR(200)
- )""",
- 'postgresql': """CREATE TABLE iris (
- "SepalLength" DOUBLE PRECISION,
- "SepalWidth" DOUBLE PRECISION,
- "PetalLength" DOUBLE PRECISION,
- "PetalWidth" DOUBLE PRECISION,
- "Name" VARCHAR(200)
- )"""
- },
- 'insert_iris': {
- 'sqlite': """INSERT INTO iris VALUES(?, ?, ?, ?, ?)""",
- 'mysql': """INSERT INTO iris VALUES(%s, %s, %s, %s, "%s");""",
- 'postgresql': """INSERT INTO iris VALUES(%s, %s, %s, %s, %s);"""
- },
- 'create_test_types': {
- 'sqlite': """CREATE TABLE types_test_data (
- "TextCol" TEXT,
- "DateCol" TEXT,
- "IntDateCol" INTEGER,
- "IntDateOnlyCol" INTEGER,
- "FloatCol" REAL,
- "IntCol" INTEGER,
- "BoolCol" INTEGER,
- "IntColWithNull" INTEGER,
- "BoolColWithNull" INTEGER
- )""",
- 'mysql': """CREATE TABLE types_test_data (
- `TextCol` TEXT,
- `DateCol` DATETIME,
- `IntDateCol` INTEGER,
- `IntDateOnlyCol` INTEGER,
- `FloatCol` DOUBLE,
- `IntCol` INTEGER,
- `BoolCol` BOOLEAN,
- `IntColWithNull` INTEGER,
- `BoolColWithNull` BOOLEAN
- )""",
- 'postgresql': """CREATE TABLE types_test_data (
- "TextCol" TEXT,
- "DateCol" TIMESTAMP,
- "DateColWithTz" TIMESTAMP WITH TIME ZONE,
- "IntDateCol" INTEGER,
- "IntDateOnlyCol" INTEGER,
- "FloatCol" DOUBLE PRECISION,
- "IntCol" INTEGER,
- "BoolCol" BOOLEAN,
- "IntColWithNull" INTEGER,
- "BoolColWithNull" BOOLEAN
- )"""
- },
- 'insert_test_types': {
- 'sqlite': {
- 'query': """
- INSERT INTO types_test_data
- VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?)
- """,
- 'fields': (
- 'TextCol', 'DateCol', 'IntDateCol', 'IntDateOnlyCol',
- 'FloatCol', 'IntCol', 'BoolCol', 'IntColWithNull',
- 'BoolColWithNull'
- )
- },
- 'mysql': {
- 'query': """
- INSERT INTO types_test_data
- VALUES("%s", %s, %s, %s, %s, %s, %s, %s, %s)
- """,
- 'fields': (
- 'TextCol', 'DateCol', 'IntDateCol', 'IntDateOnlyCol',
- 'FloatCol', 'IntCol', 'BoolCol', 'IntColWithNull',
- 'BoolColWithNull'
- )
- },
- 'postgresql': {
- 'query': """
- INSERT INTO types_test_data
- VALUES(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
- """,
- 'fields': (
- 'TextCol', 'DateCol', 'DateColWithTz',
- 'IntDateCol', 'IntDateOnlyCol', 'FloatCol',
- 'IntCol', 'BoolCol', 'IntColWithNull', 'BoolColWithNull'
- )
- },
- },
- 'read_parameters': {
- 'sqlite': "SELECT * FROM iris WHERE Name=? AND SepalLength=?",
- 'mysql': 'SELECT * FROM iris WHERE `Name`="%s" AND `SepalLength`=%s',
- 'postgresql': 'SELECT * FROM iris WHERE "Name"=%s AND "SepalLength"=%s'
- },
- 'read_named_parameters': {
- 'sqlite': """
- SELECT * FROM iris WHERE Name=:name AND SepalLength=:length
- """,
- 'mysql': """
- SELECT * FROM iris WHERE
- `Name`="%(name)s" AND `SepalLength`=%(length)s
- """,
- 'postgresql': """
- SELECT * FROM iris WHERE
- "Name"=%(name)s AND "SepalLength"=%(length)s
- """
- },
- 'create_view': {
- 'sqlite': """
- CREATE VIEW iris_view AS
- SELECT * FROM iris
- """
- }
- }
- class MixInBase(object):
- def teardown_method(self, method):
- # if setup fails, there may not be a connection to close.
- if hasattr(self, 'conn'):
- for tbl in self._get_all_tables():
- self.drop_table(tbl)
- self._close_conn()
- class MySQLMixIn(MixInBase):
- def drop_table(self, table_name):
- cur = self.conn.cursor()
- cur.execute("DROP TABLE IF EXISTS %s" %
- sql._get_valid_mysql_name(table_name))
- self.conn.commit()
- def _get_all_tables(self):
- cur = self.conn.cursor()
- cur.execute('SHOW TABLES')
- return [table[0] for table in cur.fetchall()]
- def _close_conn(self):
- from pymysql.err import Error
- try:
- self.conn.close()
- except Error:
- pass
- class SQLiteMixIn(MixInBase):
- def drop_table(self, table_name):
- self.conn.execute("DROP TABLE IF EXISTS %s" %
- sql._get_valid_sqlite_name(table_name))
- self.conn.commit()
- def _get_all_tables(self):
- c = self.conn.execute(
- "SELECT name FROM sqlite_master WHERE type='table'")
- return [table[0] for table in c.fetchall()]
- def _close_conn(self):
- self.conn.close()
- class SQLAlchemyMixIn(MixInBase):
- def drop_table(self, table_name):
- sql.SQLDatabase(self.conn).drop_table(table_name)
- def _get_all_tables(self):
- meta = sqlalchemy.schema.MetaData(bind=self.conn)
- meta.reflect()
- table_list = meta.tables.keys()
- return table_list
- def _close_conn(self):
- pass
- class PandasSQLTest(object):
- """
- Base class with common private methods for SQLAlchemy and fallback cases.
- """
- def _get_exec(self):
- if hasattr(self.conn, 'execute'):
- return self.conn
- else:
- return self.conn.cursor()
- @pytest.fixture(params=[('io', 'data', 'iris.csv')])
- def load_iris_data(self, datapath, request):
- import io
- iris_csv_file = datapath(*request.param)
- if not hasattr(self, 'conn'):
- self.setup_connect()
- self.drop_table('iris')
- self._get_exec().execute(SQL_STRINGS['create_iris'][self.flavor])
- with io.open(iris_csv_file, mode='r', newline=None) as iris_csv:
- r = csv.reader(iris_csv)
- next(r) # skip header row
- ins = SQL_STRINGS['insert_iris'][self.flavor]
- for row in r:
- self._get_exec().execute(ins, row)
- def _load_iris_view(self):
- self.drop_table('iris_view')
- self._get_exec().execute(SQL_STRINGS['create_view'][self.flavor])
- def _check_iris_loaded_frame(self, iris_frame):
- pytype = iris_frame.dtypes[0].type
- row = iris_frame.iloc[0]
- assert issubclass(pytype, np.floating)
- tm.equalContents(row.values, [5.1, 3.5, 1.4, 0.2, 'Iris-setosa'])
- def _load_test1_data(self):
- columns = ['index', 'A', 'B', 'C', 'D']
- data = [(
- '2000-01-03 00:00:00', 0.980268513777, 3.68573087906,
- -0.364216805298, -1.15973806169),
- ('2000-01-04 00:00:00', 1.04791624281, -
- 0.0412318367011, -0.16181208307, 0.212549316967),
- ('2000-01-05 00:00:00', 0.498580885705,
- 0.731167677815, -0.537677223318, 1.34627041952),
- ('2000-01-06 00:00:00', 1.12020151869, 1.56762092543,
- 0.00364077397681, 0.67525259227)]
- self.test_frame1 = DataFrame(data, columns=columns)
- def _load_test2_data(self):
- df = DataFrame(dict(A=[4, 1, 3, 6],
- B=['asd', 'gsq', 'ylt', 'jkl'],
- C=[1.1, 3.1, 6.9, 5.3],
- D=[False, True, True, False],
- E=['1990-11-22', '1991-10-26',
- '1993-11-26', '1995-12-12']))
- df['E'] = to_datetime(df['E'])
- self.test_frame2 = df
- def _load_test3_data(self):
- columns = ['index', 'A', 'B']
- data = [(
- '2000-01-03 00:00:00', 2 ** 31 - 1, -1.987670),
- ('2000-01-04 00:00:00', -29, -0.0412318367011),
- ('2000-01-05 00:00:00', 20000, 0.731167677815),
- ('2000-01-06 00:00:00', -290867, 1.56762092543)]
- self.test_frame3 = DataFrame(data, columns=columns)
- def _load_raw_sql(self):
- self.drop_table('types_test_data')
- self._get_exec().execute(SQL_STRINGS['create_test_types'][self.flavor])
- ins = SQL_STRINGS['insert_test_types'][self.flavor]
- data = [
- {
- 'TextCol': 'first',
- 'DateCol': '2000-01-03 00:00:00',
- 'DateColWithTz': '2000-01-01 00:00:00-08:00',
- 'IntDateCol': 535852800,
- 'IntDateOnlyCol': 20101010,
- 'FloatCol': 10.10,
- 'IntCol': 1,
- 'BoolCol': False,
- 'IntColWithNull': 1,
- 'BoolColWithNull': False,
- },
- {
- 'TextCol': 'first',
- 'DateCol': '2000-01-04 00:00:00',
- 'DateColWithTz': '2000-06-01 00:00:00-07:00',
- 'IntDateCol': 1356998400,
- 'IntDateOnlyCol': 20101212,
- 'FloatCol': 10.10,
- 'IntCol': 1,
- 'BoolCol': False,
- 'IntColWithNull': None,
- 'BoolColWithNull': None,
- },
- ]
- for d in data:
- self._get_exec().execute(
- ins['query'],
- [d[field] for field in ins['fields']]
- )
- def _count_rows(self, table_name):
- result = self._get_exec().execute(
- "SELECT count(*) AS count_1 FROM %s" % table_name).fetchone()
- return result[0]
- def _read_sql_iris(self):
- iris_frame = self.pandasSQL.read_query("SELECT * FROM iris")
- self._check_iris_loaded_frame(iris_frame)
- def _read_sql_iris_parameter(self):
- query = SQL_STRINGS['read_parameters'][self.flavor]
- params = ['Iris-setosa', 5.1]
- iris_frame = self.pandasSQL.read_query(query, params=params)
- self._check_iris_loaded_frame(iris_frame)
- def _read_sql_iris_named_parameter(self):
- query = SQL_STRINGS['read_named_parameters'][self.flavor]
- params = {'name': 'Iris-setosa', 'length': 5.1}
- iris_frame = self.pandasSQL.read_query(query, params=params)
- self._check_iris_loaded_frame(iris_frame)
- def _to_sql(self, method=None):
- self.drop_table('test_frame1')
- self.pandasSQL.to_sql(self.test_frame1, 'test_frame1', method=method)
- assert self.pandasSQL.has_table('test_frame1')
- num_entries = len(self.test_frame1)
- num_rows = self._count_rows('test_frame1')
- assert num_rows == num_entries
- # Nuke table
- self.drop_table('test_frame1')
- def _to_sql_empty(self):
- self.drop_table('test_frame1')
- self.pandasSQL.to_sql(self.test_frame1.iloc[:0], 'test_frame1')
- def _to_sql_fail(self):
- self.drop_table('test_frame1')
- self.pandasSQL.to_sql(
- self.test_frame1, 'test_frame1', if_exists='fail')
- assert self.pandasSQL.has_table('test_frame1')
- pytest.raises(ValueError, self.pandasSQL.to_sql,
- self.test_frame1, 'test_frame1', if_exists='fail')
- self.drop_table('test_frame1')
- def _to_sql_replace(self):
- self.drop_table('test_frame1')
- self.pandasSQL.to_sql(
- self.test_frame1, 'test_frame1', if_exists='fail')
- # Add to table again
- self.pandasSQL.to_sql(
- self.test_frame1, 'test_frame1', if_exists='replace')
- assert self.pandasSQL.has_table('test_frame1')
- num_entries = len(self.test_frame1)
- num_rows = self._count_rows('test_frame1')
- assert num_rows == num_entries
- self.drop_table('test_frame1')
- def _to_sql_append(self):
- # Nuke table just in case
- self.drop_table('test_frame1')
- self.pandasSQL.to_sql(
- self.test_frame1, 'test_frame1', if_exists='fail')
- # Add to table again
- self.pandasSQL.to_sql(
- self.test_frame1, 'test_frame1', if_exists='append')
- assert self.pandasSQL.has_table('test_frame1')
- num_entries = 2 * len(self.test_frame1)
- num_rows = self._count_rows('test_frame1')
- assert num_rows == num_entries
- self.drop_table('test_frame1')
- def _to_sql_method_callable(self):
- check = [] # used to double check function below is really being used
- def sample(pd_table, conn, keys, data_iter):
- check.append(1)
- data = [dict(zip(keys, row)) for row in data_iter]
- conn.execute(pd_table.table.insert(), data)
- self.drop_table('test_frame1')
- self.pandasSQL.to_sql(self.test_frame1, 'test_frame1', method=sample)
- assert self.pandasSQL.has_table('test_frame1')
- assert check == [1]
- num_entries = len(self.test_frame1)
- num_rows = self._count_rows('test_frame1')
- assert num_rows == num_entries
- # Nuke table
- self.drop_table('test_frame1')
- def _roundtrip(self):
- self.drop_table('test_frame_roundtrip')
- self.pandasSQL.to_sql(self.test_frame1, 'test_frame_roundtrip')
- result = self.pandasSQL.read_query(
- 'SELECT * FROM test_frame_roundtrip')
- result.set_index('level_0', inplace=True)
- # result.index.astype(int)
- result.index.name = None
- tm.assert_frame_equal(result, self.test_frame1)
- def _execute_sql(self):
- # drop_sql = "DROP TABLE IF EXISTS test" # should already be done
- iris_results = self.pandasSQL.execute("SELECT * FROM iris")
- row = iris_results.fetchone()
- tm.equalContents(row, [5.1, 3.5, 1.4, 0.2, 'Iris-setosa'])
- def _to_sql_save_index(self):
- df = DataFrame.from_records([(1, 2.1, 'line1'), (2, 1.5, 'line2')],
- columns=['A', 'B', 'C'], index=['A'])
- self.pandasSQL.to_sql(df, 'test_to_sql_saves_index')
- ix_cols = self._get_index_columns('test_to_sql_saves_index')
- assert ix_cols == [['A', ], ]
- def _transaction_test(self):
- self.pandasSQL.execute("CREATE TABLE test_trans (A INT, B TEXT)")
- ins_sql = "INSERT INTO test_trans (A,B) VALUES (1, 'blah')"
- # Make sure when transaction is rolled back, no rows get inserted
- try:
- with self.pandasSQL.run_transaction() as trans:
- trans.execute(ins_sql)
- raise Exception('error')
- except Exception:
- # ignore raised exception
- pass
- res = self.pandasSQL.read_query('SELECT * FROM test_trans')
- assert len(res) == 0
- # Make sure when transaction is committed, rows do get inserted
- with self.pandasSQL.run_transaction() as trans:
- trans.execute(ins_sql)
- res2 = self.pandasSQL.read_query('SELECT * FROM test_trans')
- assert len(res2) == 1
- # -----------------------------------------------------------------------------
- # -- Testing the public API
- class _TestSQLApi(PandasSQLTest):
- """
- Base class to test the public API.
- From this two classes are derived to run these tests for both the
- sqlalchemy mode (`TestSQLApi`) and the fallback mode
- (`TestSQLiteFallbackApi`). These tests are run with sqlite3. Specific
- tests for the different sql flavours are included in `_TestSQLAlchemy`.
- Notes:
- flavor can always be passed even in SQLAlchemy mode,
- should be correctly ignored.
- we don't use drop_table because that isn't part of the public api
- """
- flavor = 'sqlite'
- mode = None
- def setup_connect(self):
- self.conn = self.connect()
- @pytest.fixture(autouse=True)
- def setup_method(self, load_iris_data):
- self.load_test_data_and_sql()
- def load_test_data_and_sql(self):
- self._load_iris_view()
- self._load_test1_data()
- self._load_test2_data()
- self._load_test3_data()
- self._load_raw_sql()
- def test_read_sql_iris(self):
- iris_frame = sql.read_sql_query(
- "SELECT * FROM iris", self.conn)
- self._check_iris_loaded_frame(iris_frame)
- def test_read_sql_view(self):
- iris_frame = sql.read_sql_query(
- "SELECT * FROM iris_view", self.conn)
- self._check_iris_loaded_frame(iris_frame)
- def test_to_sql(self):
- sql.to_sql(self.test_frame1, 'test_frame1', self.conn)
- assert sql.has_table('test_frame1', self.conn)
- def test_to_sql_fail(self):
- sql.to_sql(self.test_frame1, 'test_frame2',
- self.conn, if_exists='fail')
- assert sql.has_table('test_frame2', self.conn)
- pytest.raises(ValueError, sql.to_sql, self.test_frame1,
- 'test_frame2', self.conn, if_exists='fail')
- def test_to_sql_replace(self):
- sql.to_sql(self.test_frame1, 'test_frame3',
- self.conn, if_exists='fail')
- # Add to table again
- sql.to_sql(self.test_frame1, 'test_frame3',
- self.conn, if_exists='replace')
- assert sql.has_table('test_frame3', self.conn)
- num_entries = len(self.test_frame1)
- num_rows = self._count_rows('test_frame3')
- assert num_rows == num_entries
- def test_to_sql_append(self):
- sql.to_sql(self.test_frame1, 'test_frame4',
- self.conn, if_exists='fail')
- # Add to table again
- sql.to_sql(self.test_frame1, 'test_frame4',
- self.conn, if_exists='append')
- assert sql.has_table('test_frame4', self.conn)
- num_entries = 2 * len(self.test_frame1)
- num_rows = self._count_rows('test_frame4')
- assert num_rows == num_entries
- def test_to_sql_type_mapping(self):
- sql.to_sql(self.test_frame3, 'test_frame5', self.conn, index=False)
- result = sql.read_sql("SELECT * FROM test_frame5", self.conn)
- tm.assert_frame_equal(self.test_frame3, result)
- def test_to_sql_series(self):
- s = Series(np.arange(5, dtype='int64'), name='series')
- sql.to_sql(s, "test_series", self.conn, index=False)
- s2 = sql.read_sql_query("SELECT * FROM test_series", self.conn)
- tm.assert_frame_equal(s.to_frame(), s2)
- @pytest.mark.filterwarnings("ignore:\\nPanel:FutureWarning")
- def test_to_sql_panel(self):
- panel = tm.makePanel()
- pytest.raises(NotImplementedError, sql.to_sql, panel,
- 'test_panel', self.conn)
- def test_roundtrip(self):
- sql.to_sql(self.test_frame1, 'test_frame_roundtrip',
- con=self.conn)
- result = sql.read_sql_query(
- 'SELECT * FROM test_frame_roundtrip',
- con=self.conn)
- # HACK!
- result.index = self.test_frame1.index
- result.set_index('level_0', inplace=True)
- result.index.astype(int)
- result.index.name = None
- tm.assert_frame_equal(result, self.test_frame1)
- def test_roundtrip_chunksize(self):
- sql.to_sql(self.test_frame1, 'test_frame_roundtrip', con=self.conn,
- index=False, chunksize=2)
- result = sql.read_sql_query(
- 'SELECT * FROM test_frame_roundtrip',
- con=self.conn)
- tm.assert_frame_equal(result, self.test_frame1)
- def test_execute_sql(self):
- # drop_sql = "DROP TABLE IF EXISTS test" # should already be done
- iris_results = sql.execute("SELECT * FROM iris", con=self.conn)
- row = iris_results.fetchone()
- tm.equalContents(row, [5.1, 3.5, 1.4, 0.2, 'Iris-setosa'])
- def test_date_parsing(self):
- # Test date parsing in read_sql
- # No Parsing
- df = sql.read_sql_query("SELECT * FROM types_test_data", self.conn)
- assert not issubclass(df.DateCol.dtype.type, np.datetime64)
- df = sql.read_sql_query("SELECT * FROM types_test_data", self.conn,
- parse_dates=['DateCol'])
- assert issubclass(df.DateCol.dtype.type, np.datetime64)
- assert df.DateCol.tolist() == [
- pd.Timestamp(2000, 1, 3, 0, 0, 0),
- pd.Timestamp(2000, 1, 4, 0, 0, 0)
- ]
- df = sql.read_sql_query("SELECT * FROM types_test_data", self.conn,
- parse_dates={'DateCol': '%Y-%m-%d %H:%M:%S'})
- assert issubclass(df.DateCol.dtype.type, np.datetime64)
- assert df.DateCol.tolist() == [
- pd.Timestamp(2000, 1, 3, 0, 0, 0),
- pd.Timestamp(2000, 1, 4, 0, 0, 0)
- ]
- df = sql.read_sql_query("SELECT * FROM types_test_data", self.conn,
- parse_dates=['IntDateCol'])
- assert issubclass(df.IntDateCol.dtype.type, np.datetime64)
- assert df.IntDateCol.tolist() == [
- pd.Timestamp(1986, 12, 25, 0, 0, 0),
- pd.Timestamp(2013, 1, 1, 0, 0, 0)
- ]
- df = sql.read_sql_query("SELECT * FROM types_test_data", self.conn,
- parse_dates={'IntDateCol': 's'})
- assert issubclass(df.IntDateCol.dtype.type, np.datetime64)
- assert df.IntDateCol.tolist() == [
- pd.Timestamp(1986, 12, 25, 0, 0, 0),
- pd.Timestamp(2013, 1, 1, 0, 0, 0)
- ]
- df = sql.read_sql_query("SELECT * FROM types_test_data", self.conn,
- parse_dates={'IntDateOnlyCol': '%Y%m%d'})
- assert issubclass(df.IntDateOnlyCol.dtype.type, np.datetime64)
- assert df.IntDateOnlyCol.tolist() == [
- pd.Timestamp('2010-10-10'),
- pd.Timestamp('2010-12-12')
- ]
- def test_date_and_index(self):
- # Test case where same column appears in parse_date and index_col
- df = sql.read_sql_query("SELECT * FROM types_test_data", self.conn,
- index_col='DateCol',
- parse_dates=['DateCol', 'IntDateCol'])
- assert issubclass(df.index.dtype.type, np.datetime64)
- assert issubclass(df.IntDateCol.dtype.type, np.datetime64)
- def test_timedelta(self):
- # see #6921
- df = to_timedelta(
- Series(['00:00:01', '00:00:03'], name='foo')).to_frame()
- with tm.assert_produces_warning(UserWarning):
- df.to_sql('test_timedelta', self.conn)
- result = sql.read_sql_query('SELECT * FROM test_timedelta', self.conn)
- tm.assert_series_equal(result['foo'], df['foo'].astype('int64'))
- def test_complex(self):
- df = DataFrame({'a': [1 + 1j, 2j]})
- # Complex data type should raise error
- pytest.raises(ValueError, df.to_sql, 'test_complex', self.conn)
- def test_to_sql_index_label(self):
- temp_frame = DataFrame({'col1': range(4)})
- # no index name, defaults to 'index'
- sql.to_sql(temp_frame, 'test_index_label', self.conn)
- frame = sql.read_sql_query('SELECT * FROM test_index_label', self.conn)
- assert frame.columns[0] == 'index'
- # specifying index_label
- sql.to_sql(temp_frame, 'test_index_label', self.conn,
- if_exists='replace', index_label='other_label')
- frame = sql.read_sql_query('SELECT * FROM test_index_label', self.conn)
- assert frame.columns[0] == "other_label"
- # using the index name
- temp_frame.index.name = 'index_name'
- sql.to_sql(temp_frame, 'test_index_label', self.conn,
- if_exists='replace')
- frame = sql.read_sql_query('SELECT * FROM test_index_label', self.conn)
- assert frame.columns[0] == "index_name"
- # has index name, but specifying index_label
- sql.to_sql(temp_frame, 'test_index_label', self.conn,
- if_exists='replace', index_label='other_label')
- frame = sql.read_sql_query('SELECT * FROM test_index_label', self.conn)
- assert frame.columns[0] == "other_label"
- # index name is integer
- temp_frame.index.name = 0
- sql.to_sql(temp_frame, 'test_index_label', self.conn,
- if_exists='replace')
- frame = sql.read_sql_query('SELECT * FROM test_index_label', self.conn)
- assert frame.columns[0] == "0"
- temp_frame.index.name = None
- sql.to_sql(temp_frame, 'test_index_label', self.conn,
- if_exists='replace', index_label=0)
- frame = sql.read_sql_query('SELECT * FROM test_index_label', self.conn)
- assert frame.columns[0] == "0"
- def test_to_sql_index_label_multiindex(self):
- temp_frame = DataFrame({'col1': range(4)},
- index=MultiIndex.from_product(
- [('A0', 'A1'), ('B0', 'B1')]))
- # no index name, defaults to 'level_0' and 'level_1'
- sql.to_sql(temp_frame, 'test_index_label', self.conn)
- frame = sql.read_sql_query('SELECT * FROM test_index_label', self.conn)
- assert frame.columns[0] == 'level_0'
- assert frame.columns[1] == 'level_1'
- # specifying index_label
- sql.to_sql(temp_frame, 'test_index_label', self.conn,
- if_exists='replace', index_label=['A', 'B'])
- frame = sql.read_sql_query('SELECT * FROM test_index_label', self.conn)
- assert frame.columns[:2].tolist() == ['A', 'B']
- # using the index name
- temp_frame.index.names = ['A', 'B']
- sql.to_sql(temp_frame, 'test_index_label', self.conn,
- if_exists='replace')
- frame = sql.read_sql_query('SELECT * FROM test_index_label', self.conn)
- assert frame.columns[:2].tolist() == ['A', 'B']
- # has index name, but specifying index_label
- sql.to_sql(temp_frame, 'test_index_label', self.conn,
- if_exists='replace', index_label=['C', 'D'])
- frame = sql.read_sql_query('SELECT * FROM test_index_label', self.conn)
- assert frame.columns[:2].tolist() == ['C', 'D']
- # wrong length of index_label
- pytest.raises(ValueError, sql.to_sql, temp_frame,
- 'test_index_label', self.conn, if_exists='replace',
- index_label='C')
- def test_multiindex_roundtrip(self):
- df = DataFrame.from_records([(1, 2.1, 'line1'), (2, 1.5, 'line2')],
- columns=['A', 'B', 'C'], index=['A', 'B'])
- df.to_sql('test_multiindex_roundtrip', self.conn)
- result = sql.read_sql_query('SELECT * FROM test_multiindex_roundtrip',
- self.conn, index_col=['A', 'B'])
- tm.assert_frame_equal(df, result, check_index_type=True)
- def test_integer_col_names(self):
- df = DataFrame([[1, 2], [3, 4]], columns=[0, 1])
- sql.to_sql(df, "test_frame_integer_col_names", self.conn,
- if_exists='replace')
- def test_get_schema(self):
- create_sql = sql.get_schema(self.test_frame1, 'test', con=self.conn)
- assert 'CREATE' in create_sql
- def test_get_schema_dtypes(self):
- float_frame = DataFrame({'a': [1.1, 1.2], 'b': [2.1, 2.2]})
- dtype = sqlalchemy.Integer if self.mode == 'sqlalchemy' else 'INTEGER'
- create_sql = sql.get_schema(float_frame, 'test',
- con=self.conn, dtype={'b': dtype})
- assert 'CREATE' in create_sql
- assert 'INTEGER' in create_sql
- def test_get_schema_keys(self):
- frame = DataFrame({'Col1': [1.1, 1.2], 'Col2': [2.1, 2.2]})
- create_sql = sql.get_schema(frame, 'test', con=self.conn, keys='Col1')
- constraint_sentence = 'CONSTRAINT test_pk PRIMARY KEY ("Col1")'
- assert constraint_sentence in create_sql
- # multiple columns as key (GH10385)
- create_sql = sql.get_schema(self.test_frame1, 'test',
- con=self.conn, keys=['A', 'B'])
- constraint_sentence = 'CONSTRAINT test_pk PRIMARY KEY ("A", "B")'
- assert constraint_sentence in create_sql
- def test_chunksize_read(self):
- df = DataFrame(np.random.randn(22, 5), columns=list('abcde'))
- df.to_sql('test_chunksize', self.conn, index=False)
- # reading the query in one time
- res1 = sql.read_sql_query("select * from test_chunksize", self.conn)
- # reading the query in chunks with read_sql_query
- res2 = DataFrame()
- i = 0
- sizes = [5, 5, 5, 5, 2]
- for chunk in sql.read_sql_query("select * from test_chunksize",
- self.conn, chunksize=5):
- res2 = concat([res2, chunk], ignore_index=True)
- assert len(chunk) == sizes[i]
- i += 1
- tm.assert_frame_equal(res1, res2)
- # reading the query in chunks with read_sql_query
- if self.mode == 'sqlalchemy':
- res3 = DataFrame()
- i = 0
- sizes = [5, 5, 5, 5, 2]
- for chunk in sql.read_sql_table("test_chunksize", self.conn,
- chunksize=5):
- res3 = concat([res3, chunk], ignore_index=True)
- assert len(chunk) == sizes[i]
- i += 1
- tm.assert_frame_equal(res1, res3)
- def test_categorical(self):
- # GH8624
- # test that categorical gets written correctly as dense column
- df = DataFrame(
- {'person_id': [1, 2, 3],
- 'person_name': ['John P. Doe', 'Jane Dove', 'John P. Doe']})
- df2 = df.copy()
- df2['person_name'] = df2['person_name'].astype('category')
- df2.to_sql('test_categorical', self.conn, index=False)
- res = sql.read_sql_query('SELECT * FROM test_categorical', self.conn)
- tm.assert_frame_equal(res, df)
- def test_unicode_column_name(self):
- # GH 11431
- df = DataFrame([[1, 2], [3, 4]], columns=[u'\xe9', u'b'])
- df.to_sql('test_unicode', self.conn, index=False)
- def test_escaped_table_name(self):
- # GH 13206
- df = DataFrame({'A': [0, 1, 2], 'B': [0.2, np.nan, 5.6]})
- df.to_sql('d1187b08-4943-4c8d-a7f6', self.conn, index=False)
- res = sql.read_sql_query('SELECT * FROM `d1187b08-4943-4c8d-a7f6`',
- self.conn)
- tm.assert_frame_equal(res, df)
- @pytest.mark.single
- class TestSQLApi(SQLAlchemyMixIn, _TestSQLApi):
- """
- Test the public API as it would be used directly
- Tests for `read_sql_table` are included here, as this is specific for the
- sqlalchemy mode.
- """
- flavor = 'sqlite'
- mode = 'sqlalchemy'
- def connect(self):
- if SQLALCHEMY_INSTALLED:
- return sqlalchemy.create_engine('sqlite:///:memory:')
- else:
- pytest.skip('SQLAlchemy not installed')
- def test_read_table_columns(self):
- # test columns argument in read_table
- sql.to_sql(self.test_frame1, 'test_frame', self.conn)
- cols = ['A', 'B']
- result = sql.read_sql_table('test_frame', self.conn, columns=cols)
- assert result.columns.tolist() == cols
- def test_read_table_index_col(self):
- # test columns argument in read_table
- sql.to_sql(self.test_frame1, 'test_frame', self.conn)
- result = sql.read_sql_table('test_frame', self.conn, index_col="index")
- assert result.index.names == ["index"]
- result = sql.read_sql_table(
- 'test_frame', self.conn, index_col=["A", "B"])
- assert result.index.names == ["A", "B"]
- result = sql.read_sql_table('test_frame', self.conn,
- index_col=["A", "B"],
- columns=["C", "D"])
- assert result.index.names == ["A", "B"]
- assert result.columns.tolist() == ["C", "D"]
- def test_read_sql_delegate(self):
- iris_frame1 = sql.read_sql_query(
- "SELECT * FROM iris", self.conn)
- iris_frame2 = sql.read_sql(
- "SELECT * FROM iris", self.conn)
- tm.assert_frame_equal(iris_frame1, iris_frame2)
- iris_frame1 = sql.read_sql_table('iris', self.conn)
- iris_frame2 = sql.read_sql('iris', self.conn)
- tm.assert_frame_equal(iris_frame1, iris_frame2)
- def test_not_reflect_all_tables(self):
- # create invalid table
- qry = """CREATE TABLE invalid (x INTEGER, y UNKNOWN);"""
- self.conn.execute(qry)
- qry = """CREATE TABLE other_table (x INTEGER, y INTEGER);"""
- self.conn.execute(qry)
- with warnings.catch_warnings(record=True) as w:
- # Cause all warnings to always be triggered.
- warnings.simplefilter("always")
- # Trigger a warning.
- sql.read_sql_table('other_table', self.conn)
- sql.read_sql_query('SELECT * FROM other_table', self.conn)
- # Verify some things
- assert len(w) == 0
- def test_warning_case_insensitive_table_name(self):
- # see gh-7815
- #
- # We can't test that this warning is triggered, a the database
- # configuration would have to be altered. But here we test that
- # the warning is certainly NOT triggered in a normal case.
- with warnings.catch_warnings(record=True) as w:
- # Cause all warnings to always be triggered.
- warnings.simplefilter("always")
- # This should not trigger a Warning
- self.test_frame1.to_sql('CaseSensitive', self.conn)
- # Verify some things
- assert len(w) == 0
- def _get_index_columns(self, tbl_name):
- from sqlalchemy.engine import reflection
- insp = reflection.Inspector.from_engine(self.conn)
- ixs = insp.get_indexes('test_index_saved')
- ixs = [i['column_names'] for i in ixs]
- return ixs
- def test_sqlalchemy_type_mapping(self):
- # Test Timestamp objects (no datetime64 because of timezone) (GH9085)
- df = DataFrame({'time': to_datetime(['201412120154', '201412110254'],
- utc=True)})
- db = sql.SQLDatabase(self.conn)
- table = sql.SQLTable("test_type", db, frame=df)
- # GH 9086: TIMESTAMP is the suggested type for datetimes with timezones
- assert isinstance(table.table.c['time'].type, sqltypes.TIMESTAMP)
- def test_database_uri_string(self):
- # Test read_sql and .to_sql method with a database URI (GH10654)
- test_frame1 = self.test_frame1
- # db_uri = 'sqlite:///:memory:' # raises
- # sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) near
- # "iris": syntax error [SQL: 'iris']
- with tm.ensure_clean() as name:
- db_uri = 'sqlite:///' + name
- table = 'iris'
- test_frame1.to_sql(table, db_uri, if_exists='replace', index=False)
- test_frame2 = sql.read_sql(table, db_uri)
- test_frame3 = sql.read_sql_table(table, db_uri)
- query = 'SELECT * FROM iris'
- test_frame4 = sql.read_sql_query(query, db_uri)
- tm.assert_frame_equal(test_frame1, test_frame2)
- tm.assert_frame_equal(test_frame1, test_frame3)
- tm.assert_frame_equal(test_frame1, test_frame4)
- # using driver that will not be installed on Travis to trigger error
- # in sqlalchemy.create_engine -> test passing of this error to user
- try:
- # the rest of this test depends on pg8000's being absent
- import pg8000 # noqa
- pytest.skip("pg8000 is installed")
- except ImportError:
- pass
- db_uri = "postgresql+pg8000://user:pass@host/dbname"
- with pytest.raises(ImportError, match="pg8000"):
- sql.read_sql("select * from table", db_uri)
- def _make_iris_table_metadata(self):
- sa = sqlalchemy
- metadata = sa.MetaData()
- iris = sa.Table('iris', metadata,
- sa.Column('SepalLength', sa.REAL),
- sa.Column('SepalWidth', sa.REAL),
- sa.Column('PetalLength', sa.REAL),
- sa.Column('PetalWidth', sa.REAL),
- sa.Column('Name', sa.TEXT)
- )
- return iris
- def test_query_by_text_obj(self):
- # WIP : GH10846
- name_text = sqlalchemy.text('select * from iris where name=:name')
- iris_df = sql.read_sql(name_text, self.conn, params={
- 'name': 'Iris-versicolor'})
- all_names = set(iris_df['Name'])
- assert all_names == {'Iris-versicolor'}
- def test_query_by_select_obj(self):
- # WIP : GH10846
- iris = self._make_iris_table_metadata()
- name_select = sqlalchemy.select([iris]).where(
- iris.c.Name == sqlalchemy.bindparam('name'))
- iris_df = sql.read_sql(name_select, self.conn,
- params={'name': 'Iris-setosa'})
- all_names = set(iris_df['Name'])
- assert all_names == {'Iris-setosa'}
- class _EngineToConnMixin(object):
- """
- A mixin that causes setup_connect to create a conn rather than an engine.
- """
- @pytest.fixture(autouse=True)
- def setup_method(self, load_iris_data):
- super(_EngineToConnMixin, self).load_test_data_and_sql()
- engine = self.conn
- conn = engine.connect()
- self.__tx = conn.begin()
- self.pandasSQL = sql.SQLDatabase(conn)
- self.__engine = engine
- self.conn = conn
- yield
- self.__tx.rollback()
- self.conn.close()
- self.conn = self.__engine
- self.pandasSQL = sql.SQLDatabase(self.__engine)
- # XXX:
- # super(_EngineToConnMixin, self).teardown_method(method)
- @pytest.mark.single
- class TestSQLApiConn(_EngineToConnMixin, TestSQLApi):
- pass
- @pytest.mark.single
- class TestSQLiteFallbackApi(SQLiteMixIn, _TestSQLApi):
- """
- Test the public sqlite connection fallback API
- """
- flavor = 'sqlite'
- mode = 'fallback'
- def connect(self, database=":memory:"):
- return sqlite3.connect(database)
- def test_sql_open_close(self):
- # Test if the IO in the database still work if the connection closed
- # between the writing and reading (as in many real situations).
- with tm.ensure_clean() as name:
- conn = self.connect(name)
- sql.to_sql(self.test_frame3, "test_frame3_legacy",
- conn, index=False)
- conn.close()
- conn = self.connect(name)
- result = sql.read_sql_query("SELECT * FROM test_frame3_legacy;",
- conn)
- conn.close()
- tm.assert_frame_equal(self.test_frame3, result)
- def test_con_string_import_error(self):
- if not SQLALCHEMY_INSTALLED:
- conn = 'mysql://root@localhost/pandas_nosetest'
- pytest.raises(ImportError, sql.read_sql, "SELECT * FROM iris",
- conn)
- else:
- pytest.skip('SQLAlchemy is installed')
- def test_read_sql_delegate(self):
- iris_frame1 = sql.read_sql_query("SELECT * FROM iris", self.conn)
- iris_frame2 = sql.read_sql("SELECT * FROM iris", self.conn)
- tm.assert_frame_equal(iris_frame1, iris_frame2)
- pytest.raises(sql.DatabaseError, sql.read_sql, 'iris', self.conn)
- def test_safe_names_warning(self):
- # GH 6798
- df = DataFrame([[1, 2], [3, 4]], columns=['a', 'b ']) # has a space
- # warns on create table with spaces in names
- with tm.assert_produces_warning():
- sql.to_sql(df, "test_frame3_legacy", self.conn, index=False)
- def test_get_schema2(self):
- # without providing a connection object (available for backwards comp)
- create_sql = sql.get_schema(self.test_frame1, 'test')
- assert 'CREATE' in create_sql
- def _get_sqlite_column_type(self, schema, column):
- for col in schema.split('\n'):
- if col.split()[0].strip('""') == column:
- return col.split()[1]
- raise ValueError('Column %s not found' % (column))
- def test_sqlite_type_mapping(self):
- # Test Timestamp objects (no datetime64 because of timezone) (GH9085)
- df = DataFrame({'time': to_datetime(['201412120154', '201412110254'],
- utc=True)})
- db = sql.SQLiteDatabase(self.conn)
- table = sql.SQLiteTable("test_type", db, frame=df)
- schema = table.sql_schema()
- assert self._get_sqlite_column_type(schema, 'time') == "TIMESTAMP"
- # -----------------------------------------------------------------------------
- # -- Database flavor specific tests
- class _TestSQLAlchemy(SQLAlchemyMixIn, PandasSQLTest):
- """
- Base class for testing the sqlalchemy backend.
- Subclasses for specific database types are created below. Tests that
- deviate for each flavor are overwritten there.
- """
- flavor = None
- @pytest.fixture(autouse=True, scope='class')
- def setup_class(cls):
- cls.setup_import()
- cls.setup_driver()
- conn = cls.connect()
- conn.connect()
- def load_test_data_and_sql(self):
- self._load_raw_sql()
- self._load_test1_data()
- @pytest.fixture(autouse=True)
- def setup_method(self, load_iris_data):
- self.load_test_data_and_sql()
- @classmethod
- def setup_import(cls):
- # Skip this test if SQLAlchemy not available
- if not SQLALCHEMY_INSTALLED:
- pytest.skip('SQLAlchemy not installed')
- @classmethod
- def setup_driver(cls):
- raise NotImplementedError()
- @classmethod
- def connect(cls):
- raise NotImplementedError()
- def setup_connect(self):
- try:
- self.conn = self.connect()
- self.pandasSQL = sql.SQLDatabase(self.conn)
- # to test if connection can be made:
- self.conn.connect()
- except sqlalchemy.exc.OperationalError:
- pytest.skip(
- "Can't connect to {0} server".format(self.flavor))
- def test_read_sql(self):
- self._read_sql_iris()
- def test_read_sql_parameter(self):
- self._read_sql_iris_parameter()
- def test_read_sql_named_parameter(self):
- self._read_sql_iris_named_parameter()
- def test_to_sql(self):
- self._to_sql()
- def test_to_sql_empty(self):
- self._to_sql_empty()
- def test_to_sql_fail(self):
- self._to_sql_fail()
- def test_to_sql_replace(self):
- self._to_sql_replace()
- def test_to_sql_append(self):
- self._to_sql_append()
- def test_to_sql_method_multi(self):
- self._to_sql(method='multi')
- def test_to_sql_method_callable(self):
- self._to_sql_method_callable()
- def test_create_table(self):
- temp_conn = self.connect()
- temp_frame = DataFrame(
- {'one': [1., 2., 3., 4.], 'two': [4., 3., 2., 1.]})
- pandasSQL = sql.SQLDatabase(temp_conn)
- pandasSQL.to_sql(temp_frame, 'temp_frame')
- assert temp_conn.has_table('temp_frame')
- def test_drop_table(self):
- temp_conn = self.connect()
- temp_frame = DataFrame(
- {'one': [1., 2., 3., 4.], 'two': [4., 3., 2., 1.]})
- pandasSQL = sql.SQLDatabase(temp_conn)
- pandasSQL.to_sql(temp_frame, 'temp_frame')
- assert temp_conn.has_table('temp_frame')
- pandasSQL.drop_table('temp_frame')
- assert not temp_conn.has_table('temp_frame')
- def test_roundtrip(self):
- self._roundtrip()
- def test_execute_sql(self):
- self._execute_sql()
- def test_read_table(self):
- iris_frame = sql.read_sql_table("iris", con=self.conn)
- self._check_iris_loaded_frame(iris_frame)
- def test_read_table_columns(self):
- iris_frame = sql.read_sql_table(
- "iris", con=self.conn, columns=['SepalLength', 'SepalLength'])
- tm.equalContents(
- iris_frame.columns.values, ['SepalLength', 'SepalLength'])
- def test_read_table_absent(self):
- pytest.raises(
- ValueError, sql.read_sql_table, "this_doesnt_exist", con=self.conn)
- def test_default_type_conversion(self):
- df = sql.read_sql_table("types_test_data", self.conn)
- assert issubclass(df.FloatCol.dtype.type, np.floating)
- assert issubclass(df.IntCol.dtype.type, np.integer)
- assert issubclass(df.BoolCol.dtype.type, np.bool_)
- # Int column with NA values stays as float
- assert issubclass(df.IntColWithNull.dtype.type, np.floating)
- # Bool column with NA values becomes object
- assert issubclass(df.BoolColWithNull.dtype.type, np.object)
- def test_bigint(self):
- # int64 should be converted to BigInteger, GH7433
- df = DataFrame(data={'i64': [2**62]})
- df.to_sql('test_bigint', self.conn, index=False)
- result = sql.read_sql_table('test_bigint', self.conn)
- tm.assert_frame_equal(df, result)
- def test_default_date_load(self):
- df = sql.read_sql_table("types_test_data", self.conn)
- # IMPORTANT - sqlite has no native date type, so shouldn't parse, but
- # MySQL SHOULD be converted.
- assert issubclass(df.DateCol.dtype.type, np.datetime64)
- def test_datetime_with_timezone(self):
- # edge case that converts postgresql datetime with time zone types
- # to datetime64[ns,psycopg2.tz.FixedOffsetTimezone..], which is ok
- # but should be more natural, so coerce to datetime64[ns] for now
- def check(col):
- # check that a column is either datetime64[ns]
- # or datetime64[ns, UTC]
- if is_datetime64_dtype(col.dtype):
- # "2000-01-01 00:00:00-08:00" should convert to
- # "2000-01-01 08:00:00"
- assert col[0] == Timestamp('2000-01-01 08:00:00')
- # "2000-06-01 00:00:00-07:00" should convert to
- # "2000-06-01 07:00:00"
- assert col[1] == Timestamp('2000-06-01 07:00:00')
- elif is_datetime64tz_dtype(col.dtype):
- assert str(col.dt.tz) == 'UTC'
- # "2000-01-01 00:00:00-08:00" should convert to
- # "2000-01-01 08:00:00"
- # "2000-06-01 00:00:00-07:00" should convert to
- # "2000-06-01 07:00:00"
- # GH 6415
- expected_data = [Timestamp('2000-01-01 08:00:00', tz='UTC'),
- Timestamp('2000-06-01 07:00:00', tz='UTC')]
- expected = Series(expected_data, name=col.name)
- tm.assert_series_equal(col, expected)
- else:
- raise AssertionError("DateCol loaded with incorrect type "
- "-> {0}".format(col.dtype))
- # GH11216
- df = pd.read_sql_query("select * from types_test_data", self.conn)
- if not hasattr(df, 'DateColWithTz'):
- pytest.skip("no column with datetime with time zone")
- # this is parsed on Travis (linux), but not on macosx for some reason
- # even with the same versions of psycopg2 & sqlalchemy, possibly a
- # Postgrsql server version difference
- col = df.DateColWithTz
- assert is_datetime64tz_dtype(col.dtype)
- df = pd.read_sql_query("select * from types_test_data",
- self.conn, parse_dates=['DateColWithTz'])
- if not hasattr(df, 'DateColWithTz'):
- pytest.skip("no column with datetime with time zone")
- col = df.DateColWithTz
- assert is_datetime64tz_dtype(col.dtype)
- assert str(col.dt.tz) == 'UTC'
- check(df.DateColWithTz)
- df = pd.concat(list(pd.read_sql_query("select * from types_test_data",
- self.conn, chunksize=1)),
- ignore_index=True)
- col = df.DateColWithTz
- assert is_datetime64tz_dtype(col.dtype)
- assert str(col.dt.tz) == 'UTC'
- expected = sql.read_sql_table("types_test_data", self.conn)
- col = expected.DateColWithTz
- assert is_datetime64tz_dtype(col.dtype)
- tm.assert_series_equal(df.DateColWithTz, expected.DateColWithTz)
- # xref #7139
- # this might or might not be converted depending on the postgres driver
- df = sql.read_sql_table("types_test_data", self.conn)
- check(df.DateColWithTz)
- def test_datetime_with_timezone_roundtrip(self):
- # GH 9086
- # Write datetimetz data to a db and read it back
- # For dbs that support timestamps with timezones, should get back UTC
- # otherwise naive data should be returned
- expected = DataFrame({'A': date_range(
- '2013-01-01 09:00:00', periods=3, tz='US/Pacific'
- )})
- expected.to_sql('test_datetime_tz', self.conn, index=False)
- if self.flavor == 'postgresql':
- # SQLAlchemy "timezones" (i.e. offsets) are coerced to UTC
- expected['A'] = expected['A'].dt.tz_convert('UTC')
- else:
- # Otherwise, timestamps are returned as local, naive
- expected['A'] = expected['A'].dt.tz_localize(None)
- result = sql.read_sql_table('test_datetime_tz', self.conn)
- tm.assert_frame_equal(result, expected)
- result = sql.read_sql_query(
- 'SELECT * FROM test_datetime_tz', self.conn
- )
- if self.flavor == 'sqlite':
- # read_sql_query does not return datetime type like read_sql_table
- assert isinstance(result.loc[0, 'A'], string_types)
- result['A'] = to_datetime(result['A'])
- tm.assert_frame_equal(result, expected)
- def test_naive_datetimeindex_roundtrip(self):
- # GH 23510
- # Ensure that a naive DatetimeIndex isn't converted to UTC
- dates = date_range('2018-01-01', periods=5, freq='6H')
- expected = DataFrame({'nums': range(5)}, index=dates)
- expected.to_sql('foo_table', self.conn, index_label='info_date')
- result = sql.read_sql_table('foo_table', self.conn,
- index_col='info_date')
- # result index with gain a name from a set_index operation; expected
- tm.assert_frame_equal(result, expected, check_names=False)
- def test_date_parsing(self):
- # No Parsing
- df = sql.read_sql_table("types_test_data", self.conn)
- expected_type = object if self.flavor == 'sqlite' else np.datetime64
- assert issubclass(df.DateCol.dtype.type, expected_type)
- df = sql.read_sql_table("types_test_data", self.conn,
- parse_dates=['DateCol'])
- assert issubclass(df.DateCol.dtype.type, np.datetime64)
- df = sql.read_sql_table("types_test_data", self.conn,
- parse_dates={'DateCol': '%Y-%m-%d %H:%M:%S'})
- assert issubclass(df.DateCol.dtype.type, np.datetime64)
- df = sql.read_sql_table("types_test_data", self.conn, parse_dates={
- 'DateCol': {'format': '%Y-%m-%d %H:%M:%S'}})
- assert issubclass(df.DateCol.dtype.type, np.datetime64)
- df = sql.read_sql_table(
- "types_test_data", self.conn, parse_dates=['IntDateCol'])
- assert issubclass(df.IntDateCol.dtype.type, np.datetime64)
- df = sql.read_sql_table(
- "types_test_data", self.conn, parse_dates={'IntDateCol': 's'})
- assert issubclass(df.IntDateCol.dtype.type, np.datetime64)
- df = sql.read_sql_table("types_test_data", self.conn,
- parse_dates={'IntDateCol': {'unit': 's'}})
- assert issubclass(df.IntDateCol.dtype.type, np.datetime64)
- def test_datetime(self):
- df = DataFrame({'A': date_range('2013-01-01 09:00:00', periods=3),
- 'B': np.arange(3.0)})
- df.to_sql('test_datetime', self.conn)
- # with read_table -> type information from schema used
- result = sql.read_sql_table('test_datetime', self.conn)
- result = result.drop('index', axis=1)
- tm.assert_frame_equal(result, df)
- # with read_sql -> no type information -> sqlite has no native
- result = sql.read_sql_query('SELECT * FROM test_datetime', self.conn)
- result = result.drop('index', axis=1)
- if self.flavor == 'sqlite':
- assert isinstance(result.loc[0, 'A'], string_types)
- result['A'] = to_datetime(result['A'])
- tm.assert_frame_equal(result, df)
- else:
- tm.assert_frame_equal(result, df)
- def test_datetime_NaT(self):
- df = DataFrame({'A': date_range('2013-01-01 09:00:00', periods=3),
- 'B': np.arange(3.0)})
- df.loc[1, 'A'] = np.nan
- df.to_sql('test_datetime', self.conn, index=False)
- # with read_table -> type information from schema used
- result = sql.read_sql_table('test_datetime', self.conn)
- tm.assert_frame_equal(result, df)
- # with read_sql -> no type information -> sqlite has no native
- result = sql.read_sql_query('SELECT * FROM test_datetime', self.conn)
- if self.flavor == 'sqlite':
- assert isinstance(result.loc[0, 'A'], string_types)
- result['A'] = to_datetime(result['A'], errors='coerce')
- tm.assert_frame_equal(result, df)
- else:
- tm.assert_frame_equal(result, df)
- def test_datetime_date(self):
- # test support for datetime.date
- df = DataFrame([date(2014, 1, 1), date(2014, 1, 2)], columns=["a"])
- df.to_sql('test_date', self.conn, index=False)
- res = read_sql_table('test_date', self.conn)
- result = res['a']
- expected = to_datetime(df['a'])
- # comes back as datetime64
- tm.assert_series_equal(result, expected)
- def test_datetime_time(self):
- # test support for datetime.time
- df = DataFrame([time(9, 0, 0), time(9, 1, 30)], columns=["a"])
- df.to_sql('test_time', self.conn, index=False)
- res = read_sql_table('test_time', self.conn)
- tm.assert_frame_equal(res, df)
- # GH8341
- # first, use the fallback to have the sqlite adapter put in place
- sqlite_conn = TestSQLiteFallback.connect()
- sql.to_sql(df, "test_time2", sqlite_conn, index=False)
- res = sql.read_sql_query("SELECT * FROM test_time2", sqlite_conn)
- ref = df.applymap(lambda _: _.strftime("%H:%M:%S.%f"))
- tm.assert_frame_equal(ref, res) # check if adapter is in place
- # then test if sqlalchemy is unaffected by the sqlite adapter
- sql.to_sql(df, "test_time3", self.conn, index=False)
- if self.flavor == 'sqlite':
- res = sql.read_sql_query("SELECT * FROM test_time3", self.conn)
- ref = df.applymap(lambda _: _.strftime("%H:%M:%S.%f"))
- tm.assert_frame_equal(ref, res)
- res = sql.read_sql_table("test_time3", self.conn)
- tm.assert_frame_equal(df, res)
- def test_mixed_dtype_insert(self):
- # see GH6509
- s1 = Series(2**25 + 1, dtype=np.int32)
- s2 = Series(0.0, dtype=np.float32)
- df = DataFrame({'s1': s1, 's2': s2})
- # write and read again
- df.to_sql("test_read_write", self.conn, index=False)
- df2 = sql.read_sql_table("test_read_write", self.conn)
- tm.assert_frame_equal(df, df2, check_dtype=False, check_exact=True)
- def test_nan_numeric(self):
- # NaNs in numeric float column
- df = DataFrame({'A': [0, 1, 2], 'B': [0.2, np.nan, 5.6]})
- df.to_sql('test_nan', self.conn, index=False)
- # with read_table
- result = sql.read_sql_table('test_nan', self.conn)
- tm.assert_frame_equal(result, df)
- # with read_sql
- result = sql.read_sql_query('SELECT * FROM test_nan', self.conn)
- tm.assert_frame_equal(result, df)
- def test_nan_fullcolumn(self):
- # full NaN column (numeric float column)
- df = DataFrame({'A': [0, 1, 2], 'B': [np.nan, np.nan, np.nan]})
- df.to_sql('test_nan', self.conn, index=False)
- # with read_table
- result = sql.read_sql_table('test_nan', self.conn)
- tm.assert_frame_equal(result, df)
- # with read_sql -> not type info from table -> stays None
- df['B'] = df['B'].astype('object')
- df['B'] = None
- result = sql.read_sql_query('SELECT * FROM test_nan', self.conn)
- tm.assert_frame_equal(result, df)
- def test_nan_string(self):
- # NaNs in string column
- df = DataFrame({'A': [0, 1, 2], 'B': ['a', 'b', np.nan]})
- df.to_sql('test_nan', self.conn, index=False)
- # NaNs are coming back as None
- df.loc[2, 'B'] = None
- # with read_table
- result = sql.read_sql_table('test_nan', self.conn)
- tm.assert_frame_equal(result, df)
- # with read_sql
- result = sql.read_sql_query('SELECT * FROM test_nan', self.conn)
- tm.assert_frame_equal(result, df)
- def _get_index_columns(self, tbl_name):
- from sqlalchemy.engine import reflection
- insp = reflection.Inspector.from_engine(self.conn)
- ixs = insp.get_indexes(tbl_name)
- ixs = [i['column_names'] for i in ixs]
- return ixs
- def test_to_sql_save_index(self):
- self._to_sql_save_index()
- def test_transactions(self):
- self._transaction_test()
- def test_get_schema_create_table(self):
- # Use a dataframe without a bool column, since MySQL converts bool to
- # TINYINT (which read_sql_table returns as an int and causes a dtype
- # mismatch)
- self._load_test3_data()
- tbl = 'test_get_schema_create_table'
- create_sql = sql.get_schema(self.test_frame3, tbl, con=self.conn)
- blank_test_df = self.test_frame3.iloc[:0]
- self.drop_table(tbl)
- self.conn.execute(create_sql)
- returned_df = sql.read_sql_table(tbl, self.conn)
- tm.assert_frame_equal(returned_df, blank_test_df,
- check_index_type=False)
- self.drop_table(tbl)
- def test_dtype(self):
- cols = ['A', 'B']
- data = [(0.8, True),
- (0.9, None)]
- df = DataFrame(data, columns=cols)
- df.to_sql('dtype_test', self.conn)
- df.to_sql('dtype_test2', self.conn, dtype={'B': sqlalchemy.TEXT})
- meta = sqlalchemy.schema.MetaData(bind=self.conn)
- meta.reflect()
- sqltype = meta.tables['dtype_test2'].columns['B'].type
- assert isinstance(sqltype, sqlalchemy.TEXT)
- pytest.raises(ValueError, df.to_sql,
- 'error', self.conn, dtype={'B': str})
- # GH9083
- df.to_sql('dtype_test3', self.conn, dtype={'B': sqlalchemy.String(10)})
- meta.reflect()
- sqltype = meta.tables['dtype_test3'].columns['B'].type
- assert isinstance(sqltype, sqlalchemy.String)
- assert sqltype.length == 10
- # single dtype
- df.to_sql('single_dtype_test', self.conn, dtype=sqlalchemy.TEXT)
- meta = sqlalchemy.schema.MetaData(bind=self.conn)
- meta.reflect()
- sqltypea = meta.tables['single_dtype_test'].columns['A'].type
- sqltypeb = meta.tables['single_dtype_test'].columns['B'].type
- assert isinstance(sqltypea, sqlalchemy.TEXT)
- assert isinstance(sqltypeb, sqlalchemy.TEXT)
- def test_notna_dtype(self):
- cols = {'Bool': Series([True, None]),
- 'Date': Series([datetime(2012, 5, 1), None]),
- 'Int': Series([1, None], dtype='object'),
- 'Float': Series([1.1, None])
- }
- df = DataFrame(cols)
- tbl = 'notna_dtype_test'
- df.to_sql(tbl, self.conn)
- returned_df = sql.read_sql_table(tbl, self.conn) # noqa
- meta = sqlalchemy.schema.MetaData(bind=self.conn)
- meta.reflect()
- if self.flavor == 'mysql':
- my_type = sqltypes.Integer
- else:
- my_type = sqltypes.Boolean
- col_dict = meta.tables[tbl].columns
- assert isinstance(col_dict['Bool'].type, my_type)
- assert isinstance(col_dict['Date'].type, sqltypes.DateTime)
- assert isinstance(col_dict['Int'].type, sqltypes.Integer)
- assert isinstance(col_dict['Float'].type, sqltypes.Float)
- def test_double_precision(self):
- V = 1.23456789101112131415
- df = DataFrame({'f32': Series([V, ], dtype='float32'),
- 'f64': Series([V, ], dtype='float64'),
- 'f64_as_f32': Series([V, ], dtype='float64'),
- 'i32': Series([5, ], dtype='int32'),
- 'i64': Series([5, ], dtype='int64'),
- })
- df.to_sql('test_dtypes', self.conn, index=False, if_exists='replace',
- dtype={'f64_as_f32': sqlalchemy.Float(precision=23)})
- res = sql.read_sql_table('test_dtypes', self.conn)
- # check precision of float64
- assert (np.round(df['f64'].iloc[0], 14) ==
- np.round(res['f64'].iloc[0], 14))
- # check sql types
- meta = sqlalchemy.schema.MetaData(bind=self.conn)
- meta.reflect()
- col_dict = meta.tables['test_dtypes'].columns
- assert str(col_dict['f32'].type) == str(col_dict['f64_as_f32'].type)
- assert isinstance(col_dict['f32'].type, sqltypes.Float)
- assert isinstance(col_dict['f64'].type, sqltypes.Float)
- assert isinstance(col_dict['i32'].type, sqltypes.Integer)
- assert isinstance(col_dict['i64'].type, sqltypes.BigInteger)
- def test_connectable_issue_example(self):
- # This tests the example raised in issue
- # https://github.com/pandas-dev/pandas/issues/10104
- def foo(connection):
- query = 'SELECT test_foo_data FROM test_foo_data'
- return sql.read_sql_query(query, con=connection)
- def bar(connection, data):
- data.to_sql(name='test_foo_data',
- con=connection, if_exists='append')
- def main(connectable):
- with connectable.connect() as conn:
- with conn.begin():
- foo_data = conn.run_callable(foo)
- conn.run_callable(bar, foo_data)
- DataFrame({'test_foo_data': [0, 1, 2]}).to_sql(
- 'test_foo_data', self.conn)
- main(self.conn)
- def test_temporary_table(self):
- test_data = u'Hello, World!'
- expected = DataFrame({'spam': [test_data]})
- Base = declarative.declarative_base()
- class Temporary(Base):
- __tablename__ = 'temp_test'
- __table_args__ = {'prefixes': ['TEMPORARY']}
- id = sqlalchemy.Column(sqlalchemy.Integer, primary_key=True)
- spam = sqlalchemy.Column(sqlalchemy.Unicode(30), nullable=False)
- Session = sa_session.sessionmaker(bind=self.conn)
- session = Session()
- with session.transaction:
- conn = session.connection()
- Temporary.__table__.create(conn)
- session.add(Temporary(spam=test_data))
- session.flush()
- df = sql.read_sql_query(
- sql=sqlalchemy.select([Temporary.spam]),
- con=conn,
- )
- tm.assert_frame_equal(df, expected)
- class _TestSQLAlchemyConn(_EngineToConnMixin, _TestSQLAlchemy):
- def test_transactions(self):
- pytest.skip(
- "Nested transactions rollbacks don't work with Pandas")
- class _TestSQLiteAlchemy(object):
- """
- Test the sqlalchemy backend against an in-memory sqlite database.
- """
- flavor = 'sqlite'
- @classmethod
- def connect(cls):
- return sqlalchemy.create_engine('sqlite:///:memory:')
- @classmethod
- def setup_driver(cls):
- # sqlite3 is built-in
- cls.driver = None
- def test_default_type_conversion(self):
- df = sql.read_sql_table("types_test_data", self.conn)
- assert issubclass(df.FloatCol.dtype.type, np.floating)
- assert issubclass(df.IntCol.dtype.type, np.integer)
- # sqlite has no boolean type, so integer type is returned
- assert issubclass(df.BoolCol.dtype.type, np.integer)
- # Int column with NA values stays as float
- assert issubclass(df.IntColWithNull.dtype.type, np.floating)
- # Non-native Bool column with NA values stays as float
- assert issubclass(df.BoolColWithNull.dtype.type, np.floating)
- def test_default_date_load(self):
- df = sql.read_sql_table("types_test_data", self.conn)
- # IMPORTANT - sqlite has no native date type, so shouldn't parse, but
- assert not issubclass(df.DateCol.dtype.type, np.datetime64)
- def test_bigint_warning(self):
- # test no warning for BIGINT (to support int64) is raised (GH7433)
- df = DataFrame({'a': [1, 2]}, dtype='int64')
- df.to_sql('test_bigintwarning', self.conn, index=False)
- with warnings.catch_warnings(record=True) as w:
- warnings.simplefilter("always")
- sql.read_sql_table('test_bigintwarning', self.conn)
- assert len(w) == 0
- class _TestMySQLAlchemy(object):
- """
- Test the sqlalchemy backend against an MySQL database.
- """
- flavor = 'mysql'
- @classmethod
- def connect(cls):
- url = 'mysql+{driver}://root@localhost/pandas_nosetest'
- return sqlalchemy.create_engine(url.format(driver=cls.driver),
- connect_args=cls.connect_args)
- @classmethod
- def setup_driver(cls):
- pymysql = pytest.importorskip('pymysql')
- cls.driver = 'pymysql'
- cls.connect_args = {
- 'client_flag': pymysql.constants.CLIENT.MULTI_STATEMENTS}
- def test_default_type_conversion(self):
- df = sql.read_sql_table("types_test_data", self.conn)
- assert issubclass(df.FloatCol.dtype.type, np.floating)
- assert issubclass(df.IntCol.dtype.type, np.integer)
- # MySQL has no real BOOL type (it's an alias for TINYINT)
- assert issubclass(df.BoolCol.dtype.type, np.integer)
- # Int column with NA values stays as float
- assert issubclass(df.IntColWithNull.dtype.type, np.floating)
- # Bool column with NA = int column with NA values => becomes float
- assert issubclass(df.BoolColWithNull.dtype.type, np.floating)
- def test_read_procedure(self):
- import pymysql
- # see GH7324. Although it is more an api test, it is added to the
- # mysql tests as sqlite does not have stored procedures
- df = DataFrame({'a': [1, 2, 3], 'b': [0.1, 0.2, 0.3]})
- df.to_sql('test_procedure', self.conn, index=False)
- proc = """DROP PROCEDURE IF EXISTS get_testdb;
- CREATE PROCEDURE get_testdb ()
- BEGIN
- SELECT * FROM test_procedure;
- END"""
- connection = self.conn.connect()
- trans = connection.begin()
- try:
- r1 = connection.execute(proc) # noqa
- trans.commit()
- except pymysql.Error:
- trans.rollback()
- raise
- res1 = sql.read_sql_query("CALL get_testdb();", self.conn)
- tm.assert_frame_equal(df, res1)
- # test delegation to read_sql_query
- res2 = sql.read_sql("CALL get_testdb();", self.conn)
- tm.assert_frame_equal(df, res2)
- class _TestPostgreSQLAlchemy(object):
- """
- Test the sqlalchemy backend against an PostgreSQL database.
- """
- flavor = 'postgresql'
- @classmethod
- def connect(cls):
- url = 'postgresql+{driver}://postgres@localhost/pandas_nosetest'
- return sqlalchemy.create_engine(url.format(driver=cls.driver))
- @classmethod
- def setup_driver(cls):
- pytest.importorskip('psycopg2')
- cls.driver = 'psycopg2'
- def test_schema_support(self):
- # only test this for postgresql (schema's not supported in
- # mysql/sqlite)
- df = DataFrame({'col1': [1, 2], 'col2': [
- 0.1, 0.2], 'col3': ['a', 'n']})
- # create a schema
- self.conn.execute("DROP SCHEMA IF EXISTS other CASCADE;")
- self.conn.execute("CREATE SCHEMA other;")
- # write dataframe to different schema's
- df.to_sql('test_schema_public', self.conn, index=False)
- df.to_sql('test_schema_public_explicit', self.conn, index=False,
- schema='public')
- df.to_sql('test_schema_other', self.conn, index=False, schema='other')
- # read dataframes back in
- res1 = sql.read_sql_table('test_schema_public', self.conn)
- tm.assert_frame_equal(df, res1)
- res2 = sql.read_sql_table('test_schema_public_explicit', self.conn)
- tm.assert_frame_equal(df, res2)
- res3 = sql.read_sql_table('test_schema_public_explicit', self.conn,
- schema='public')
- tm.assert_frame_equal(df, res3)
- res4 = sql.read_sql_table('test_schema_other', self.conn,
- schema='other')
- tm.assert_frame_equal(df, res4)
- pytest.raises(ValueError, sql.read_sql_table, 'test_schema_other',
- self.conn, schema='public')
- # different if_exists options
- # create a schema
- self.conn.execute("DROP SCHEMA IF EXISTS other CASCADE;")
- self.conn.execute("CREATE SCHEMA other;")
- # write dataframe with different if_exists options
- df.to_sql('test_schema_other', self.conn, schema='other', index=False)
- df.to_sql('test_schema_other', self.conn, schema='other', index=False,
- if_exists='replace')
- df.to_sql('test_schema_other', self.conn, schema='other', index=False,
- if_exists='append')
- res = sql.read_sql_table(
- 'test_schema_other', self.conn, schema='other')
- tm.assert_frame_equal(concat([df, df], ignore_index=True), res)
- # specifying schema in user-provided meta
- # The schema won't be applied on another Connection
- # because of transactional schemas
- if isinstance(self.conn, sqlalchemy.engine.Engine):
- engine2 = self.connect()
- meta = sqlalchemy.MetaData(engine2, schema='other')
- pdsql = sql.SQLDatabase(engine2, meta=meta)
- pdsql.to_sql(df, 'test_schema_other2', index=False)
- pdsql.to_sql(df, 'test_schema_other2',
- index=False, if_exists='replace')
- pdsql.to_sql(df, 'test_schema_other2',
- index=False, if_exists='append')
- res1 = sql.read_sql_table(
- 'test_schema_other2', self.conn, schema='other')
- res2 = pdsql.read_table('test_schema_other2')
- tm.assert_frame_equal(res1, res2)
- def test_copy_from_callable_insertion_method(self):
- # GH 8953
- # Example in io.rst found under _io.sql.method
- # not available in sqlite, mysql
- def psql_insert_copy(table, conn, keys, data_iter):
- # gets a DBAPI connection that can provide a cursor
- dbapi_conn = conn.connection
- with dbapi_conn.cursor() as cur:
- s_buf = compat.StringIO()
- writer = csv.writer(s_buf)
- writer.writerows(data_iter)
- s_buf.seek(0)
- columns = ', '.join('"{}"'.format(k) for k in keys)
- if table.schema:
- table_name = '{}.{}'.format(table.schema, table.name)
- else:
- table_name = table.name
- sql_query = 'COPY {} ({}) FROM STDIN WITH CSV'.format(
- table_name, columns)
- cur.copy_expert(sql=sql_query, file=s_buf)
- expected = DataFrame({'col1': [1, 2], 'col2': [0.1, 0.2],
- 'col3': ['a', 'n']})
- expected.to_sql('test_copy_insert', self.conn, index=False,
- method=psql_insert_copy)
- result = sql.read_sql_table('test_copy_insert', self.conn)
- tm.assert_frame_equal(result, expected)
- @pytest.mark.single
- @pytest.mark.db
- class TestMySQLAlchemy(_TestMySQLAlchemy, _TestSQLAlchemy):
- pass
- @pytest.mark.single
- @pytest.mark.db
- class TestMySQLAlchemyConn(_TestMySQLAlchemy, _TestSQLAlchemyConn):
- pass
- @pytest.mark.single
- @pytest.mark.db
- class TestPostgreSQLAlchemy(_TestPostgreSQLAlchemy, _TestSQLAlchemy):
- pass
- @pytest.mark.single
- @pytest.mark.db
- class TestPostgreSQLAlchemyConn(_TestPostgreSQLAlchemy, _TestSQLAlchemyConn):
- pass
- @pytest.mark.single
- class TestSQLiteAlchemy(_TestSQLiteAlchemy, _TestSQLAlchemy):
- pass
- @pytest.mark.single
- class TestSQLiteAlchemyConn(_TestSQLiteAlchemy, _TestSQLAlchemyConn):
- pass
- # -----------------------------------------------------------------------------
- # -- Test Sqlite / MySQL fallback
- @pytest.mark.single
- class TestSQLiteFallback(SQLiteMixIn, PandasSQLTest):
- """
- Test the fallback mode against an in-memory sqlite database.
- """
- flavor = 'sqlite'
- @classmethod
- def connect(cls):
- return sqlite3.connect(':memory:')
- def setup_connect(self):
- self.conn = self.connect()
- def load_test_data_and_sql(self):
- self.pandasSQL = sql.SQLiteDatabase(self.conn)
- self._load_test1_data()
- @pytest.fixture(autouse=True)
- def setup_method(self, load_iris_data):
- self.load_test_data_and_sql()
- def test_read_sql(self):
- self._read_sql_iris()
- def test_read_sql_parameter(self):
- self._read_sql_iris_parameter()
- def test_read_sql_named_parameter(self):
- self._read_sql_iris_named_parameter()
- def test_to_sql(self):
- self._to_sql()
- def test_to_sql_empty(self):
- self._to_sql_empty()
- def test_to_sql_fail(self):
- self._to_sql_fail()
- def test_to_sql_replace(self):
- self._to_sql_replace()
- def test_to_sql_append(self):
- self._to_sql_append()
- def test_create_and_drop_table(self):
- temp_frame = DataFrame(
- {'one': [1., 2., 3., 4.], 'two': [4., 3., 2., 1.]})
- self.pandasSQL.to_sql(temp_frame, 'drop_test_frame')
- assert self.pandasSQL.has_table('drop_test_frame')
- self.pandasSQL.drop_table('drop_test_frame')
- assert not self.pandasSQL.has_table('drop_test_frame')
- def test_roundtrip(self):
- self._roundtrip()
- def test_execute_sql(self):
- self._execute_sql()
- def test_datetime_date(self):
- # test support for datetime.date
- df = DataFrame([date(2014, 1, 1), date(2014, 1, 2)], columns=["a"])
- df.to_sql('test_date', self.conn, index=False)
- res = read_sql_query('SELECT * FROM test_date', self.conn)
- if self.flavor == 'sqlite':
- # comes back as strings
- tm.assert_frame_equal(res, df.astype(str))
- elif self.flavor == 'mysql':
- tm.assert_frame_equal(res, df)
- def test_datetime_time(self):
- # test support for datetime.time, GH #8341
- df = DataFrame([time(9, 0, 0), time(9, 1, 30)], columns=["a"])
- df.to_sql('test_time', self.conn, index=False)
- res = read_sql_query('SELECT * FROM test_time', self.conn)
- if self.flavor == 'sqlite':
- # comes back as strings
- expected = df.applymap(lambda _: _.strftime("%H:%M:%S.%f"))
- tm.assert_frame_equal(res, expected)
- def _get_index_columns(self, tbl_name):
- ixs = sql.read_sql_query(
- "SELECT * FROM sqlite_master WHERE type = 'index' " +
- "AND tbl_name = '%s'" % tbl_name, self.conn)
- ix_cols = []
- for ix_name in ixs.name:
- ix_info = sql.read_sql_query(
- "PRAGMA index_info(%s)" % ix_name, self.conn)
- ix_cols.append(ix_info.name.tolist())
- return ix_cols
- def test_to_sql_save_index(self):
- self._to_sql_save_index()
- def test_transactions(self):
- if PY36:
- pytest.skip("not working on python > 3.5")
- self._transaction_test()
- def _get_sqlite_column_type(self, table, column):
- recs = self.conn.execute('PRAGMA table_info(%s)' % table)
- for cid, name, ctype, not_null, default, pk in recs:
- if name == column:
- return ctype
- raise ValueError('Table %s, column %s not found' % (table, column))
- def test_dtype(self):
- if self.flavor == 'mysql':
- pytest.skip('Not applicable to MySQL legacy')
- cols = ['A', 'B']
- data = [(0.8, True),
- (0.9, None)]
- df = DataFrame(data, columns=cols)
- df.to_sql('dtype_test', self.conn)
- df.to_sql('dtype_test2', self.conn, dtype={'B': 'STRING'})
- # sqlite stores Boolean values as INTEGER
- assert self._get_sqlite_column_type(
- 'dtype_test', 'B') == 'INTEGER'
- assert self._get_sqlite_column_type(
- 'dtype_test2', 'B') == 'STRING'
- pytest.raises(ValueError, df.to_sql,
- 'error', self.conn, dtype={'B': bool})
- # single dtype
- df.to_sql('single_dtype_test', self.conn, dtype='STRING')
- assert self._get_sqlite_column_type(
- 'single_dtype_test', 'A') == 'STRING'
- assert self._get_sqlite_column_type(
- 'single_dtype_test', 'B') == 'STRING'
- def test_notna_dtype(self):
- if self.flavor == 'mysql':
- pytest.skip('Not applicable to MySQL legacy')
- cols = {'Bool': Series([True, None]),
- 'Date': Series([datetime(2012, 5, 1), None]),
- 'Int': Series([1, None], dtype='object'),
- 'Float': Series([1.1, None])
- }
- df = DataFrame(cols)
- tbl = 'notna_dtype_test'
- df.to_sql(tbl, self.conn)
- assert self._get_sqlite_column_type(tbl, 'Bool') == 'INTEGER'
- assert self._get_sqlite_column_type(tbl, 'Date') == 'TIMESTAMP'
- assert self._get_sqlite_column_type(tbl, 'Int') == 'INTEGER'
- assert self._get_sqlite_column_type(tbl, 'Float') == 'REAL'
- def test_illegal_names(self):
- # For sqlite, these should work fine
- df = DataFrame([[1, 2], [3, 4]], columns=['a', 'b'])
- # Raise error on blank
- pytest.raises(ValueError, df.to_sql, "", self.conn)
- for ndx, weird_name in enumerate(
- ['test_weird_name]', 'test_weird_name[',
- 'test_weird_name`', 'test_weird_name"', 'test_weird_name\'',
- '_b.test_weird_name_01-30', '"_b.test_weird_name_01-30"',
- '99beginswithnumber', '12345', u'\xe9']):
- df.to_sql(weird_name, self.conn)
- sql.table_exists(weird_name, self.conn)
- df2 = DataFrame([[1, 2], [3, 4]], columns=['a', weird_name])
- c_tbl = 'test_weird_col_name%d' % ndx
- df2.to_sql(c_tbl, self.conn)
- sql.table_exists(c_tbl, self.conn)
- # -----------------------------------------------------------------------------
- # -- Old tests from 0.13.1 (before refactor using sqlalchemy)
- def date_format(dt):
- """Returns date in YYYYMMDD format."""
- return dt.strftime('%Y%m%d')
- _formatters = {
- datetime: lambda dt: "'%s'" % date_format(dt),
- str: lambda x: "'%s'" % x,
- np.str_: lambda x: "'%s'" % x,
- compat.text_type: lambda x: "'%s'" % x,
- compat.binary_type: lambda x: "'%s'" % x,
- float: lambda x: "%.8f" % x,
- int: lambda x: "%s" % x,
- type(None): lambda x: "NULL",
- np.float64: lambda x: "%.10f" % x,
- bool: lambda x: "'%s'" % x,
- }
- def format_query(sql, *args):
- """
- """
- processed_args = []
- for arg in args:
- if isinstance(arg, float) and isna(arg):
- arg = None
- formatter = _formatters[type(arg)]
- processed_args.append(formatter(arg))
- return sql % tuple(processed_args)
- def tquery(query, con=None, cur=None):
- """Replace removed sql.tquery function"""
- res = sql.execute(query, con=con, cur=cur).fetchall()
- if res is None:
- return None
- else:
- return list(res)
- @pytest.mark.single
- class TestXSQLite(SQLiteMixIn):
- @pytest.fixture(autouse=True)
- def setup_method(self, request, datapath):
- self.method = request.function
- self.conn = sqlite3.connect(':memory:')
- # In some test cases we may close db connection
- # Re-open conn here so we can perform cleanup in teardown
- yield
- self.method = request.function
- self.conn = sqlite3.connect(':memory:')
- def test_basic(self):
- frame = tm.makeTimeDataFrame()
- self._check_roundtrip(frame)
- def test_write_row_by_row(self):
- frame = tm.makeTimeDataFrame()
- frame.iloc[0, 0] = np.nan
- create_sql = sql.get_schema(frame, 'test')
- cur = self.conn.cursor()
- cur.execute(create_sql)
- cur = self.conn.cursor()
- ins = "INSERT INTO test VALUES (%s, %s, %s, %s)"
- for idx, row in frame.iterrows():
- fmt_sql = format_query(ins, *row)
- tquery(fmt_sql, cur=cur)
- self.conn.commit()
- result = sql.read_sql("select * from test", con=self.conn)
- result.index = frame.index
- tm.assert_frame_equal(result, frame, check_less_precise=True)
- def test_execute(self):
- frame = tm.makeTimeDataFrame()
- create_sql = sql.get_schema(frame, 'test')
- cur = self.conn.cursor()
- cur.execute(create_sql)
- ins = "INSERT INTO test VALUES (?, ?, ?, ?)"
- row = frame.iloc[0]
- sql.execute(ins, self.conn, params=tuple(row))
- self.conn.commit()
- result = sql.read_sql("select * from test", self.conn)
- result.index = frame.index[:1]
- tm.assert_frame_equal(result, frame[:1])
- def test_schema(self):
- frame = tm.makeTimeDataFrame()
- create_sql = sql.get_schema(frame, 'test')
- lines = create_sql.splitlines()
- for l in lines:
- tokens = l.split(' ')
- if len(tokens) == 2 and tokens[0] == 'A':
- assert tokens[1] == 'DATETIME'
- frame = tm.makeTimeDataFrame()
- create_sql = sql.get_schema(frame, 'test', keys=['A', 'B'])
- lines = create_sql.splitlines()
- assert 'PRIMARY KEY ("A", "B")' in create_sql
- cur = self.conn.cursor()
- cur.execute(create_sql)
- def test_execute_fail(self):
- create_sql = """
- CREATE TABLE test
- (
- a TEXT,
- b TEXT,
- c REAL,
- PRIMARY KEY (a, b)
- );
- """
- cur = self.conn.cursor()
- cur.execute(create_sql)
- sql.execute('INSERT INTO test VALUES("foo", "bar", 1.234)', self.conn)
- sql.execute('INSERT INTO test VALUES("foo", "baz", 2.567)', self.conn)
- with pytest.raises(Exception):
- sql.execute('INSERT INTO test VALUES("foo", "bar", 7)', self.conn)
- def test_execute_closed_connection(self):
- create_sql = """
- CREATE TABLE test
- (
- a TEXT,
- b TEXT,
- c REAL,
- PRIMARY KEY (a, b)
- );
- """
- cur = self.conn.cursor()
- cur.execute(create_sql)
- sql.execute('INSERT INTO test VALUES("foo", "bar", 1.234)', self.conn)
- self.conn.close()
- with pytest.raises(Exception):
- tquery("select * from test", con=self.conn)
- def test_na_roundtrip(self):
- pass
- def _check_roundtrip(self, frame):
- sql.to_sql(frame, name='test_table', con=self.conn, index=False)
- result = sql.read_sql("select * from test_table", self.conn)
- # HACK! Change this once indexes are handled properly.
- result.index = frame.index
- expected = frame
- tm.assert_frame_equal(result, expected)
- frame['txt'] = ['a'] * len(frame)
- frame2 = frame.copy()
- frame2['Idx'] = Index(lrange(len(frame2))) + 10
- sql.to_sql(frame2, name='test_table2', con=self.conn, index=False)
- result = sql.read_sql("select * from test_table2", self.conn,
- index_col='Idx')
- expected = frame.copy()
- expected.index = Index(lrange(len(frame2))) + 10
- expected.index.name = 'Idx'
- tm.assert_frame_equal(expected, result)
- def test_keyword_as_column_names(self):
- df = DataFrame({'From': np.ones(5)})
- sql.to_sql(df, con=self.conn, name='testkeywords', index=False)
- def test_onecolumn_of_integer(self):
- # GH 3628
- # a column_of_integers dataframe should transfer well to sql
- mono_df = DataFrame([1, 2], columns=['c0'])
- sql.to_sql(mono_df, con=self.conn, name='mono_df', index=False)
- # computing the sum via sql
- con_x = self.conn
- the_sum = sum(my_c0[0]
- for my_c0 in con_x.execute("select * from mono_df"))
- # it should not fail, and gives 3 ( Issue #3628 )
- assert the_sum == 3
- result = sql.read_sql("select * from mono_df", con_x)
- tm.assert_frame_equal(result, mono_df)
- def test_if_exists(self):
- df_if_exists_1 = DataFrame({'col1': [1, 2], 'col2': ['A', 'B']})
- df_if_exists_2 = DataFrame(
- {'col1': [3, 4, 5], 'col2': ['C', 'D', 'E']})
- table_name = 'table_if_exists'
- sql_select = "SELECT * FROM %s" % table_name
- def clean_up(test_table_to_drop):
- """
- Drops tables created from individual tests
- so no dependencies arise from sequential tests
- """
- self.drop_table(test_table_to_drop)
- # test if invalid value for if_exists raises appropriate error
- pytest.raises(ValueError,
- sql.to_sql,
- frame=df_if_exists_1,
- con=self.conn,
- name=table_name,
- if_exists='notvalidvalue')
- clean_up(table_name)
- # test if_exists='fail'
- sql.to_sql(frame=df_if_exists_1, con=self.conn,
- name=table_name, if_exists='fail')
- pytest.raises(ValueError,
- sql.to_sql,
- frame=df_if_exists_1,
- con=self.conn,
- name=table_name,
- if_exists='fail')
- # test if_exists='replace'
- sql.to_sql(frame=df_if_exists_1, con=self.conn, name=table_name,
- if_exists='replace', index=False)
- assert tquery(sql_select, con=self.conn) == [(1, 'A'), (2, 'B')]
- sql.to_sql(frame=df_if_exists_2, con=self.conn, name=table_name,
- if_exists='replace', index=False)
- assert (tquery(sql_select, con=self.conn) ==
- [(3, 'C'), (4, 'D'), (5, 'E')])
- clean_up(table_name)
- # test if_exists='append'
- sql.to_sql(frame=df_if_exists_1, con=self.conn, name=table_name,
- if_exists='fail', index=False)
- assert tquery(sql_select, con=self.conn) == [(1, 'A'), (2, 'B')]
- sql.to_sql(frame=df_if_exists_2, con=self.conn, name=table_name,
- if_exists='append', index=False)
- assert (tquery(sql_select, con=self.conn) ==
- [(1, 'A'), (2, 'B'), (3, 'C'), (4, 'D'), (5, 'E')])
- clean_up(table_name)
- @pytest.mark.single
- @pytest.mark.db
- @pytest.mark.skip(reason="gh-13611: there is no support for MySQL "
- "if SQLAlchemy is not installed")
- class TestXMySQL(MySQLMixIn):
- @pytest.fixture(autouse=True, scope='class')
- def setup_class(cls):
- pymysql = pytest.importorskip('pymysql')
- pymysql.connect(host='localhost', user='root', passwd='',
- db='pandas_nosetest')
- try:
- pymysql.connect(read_default_group='pandas')
- except pymysql.ProgrammingError:
- raise RuntimeError(
- "Create a group of connection parameters under the heading "
- "[pandas] in your system's mysql default file, "
- "typically located at ~/.my.cnf or /etc/.my.cnf.")
- except pymysql.Error:
- raise RuntimeError(
- "Cannot connect to database. "
- "Create a group of connection parameters under the heading "
- "[pandas] in your system's mysql default file, "
- "typically located at ~/.my.cnf or /etc/.my.cnf.")
- @pytest.fixture(autouse=True)
- def setup_method(self, request, datapath):
- pymysql = pytest.importorskip('pymysql')
- pymysql.connect(host='localhost', user='root', passwd='',
- db='pandas_nosetest')
- try:
- pymysql.connect(read_default_group='pandas')
- except pymysql.ProgrammingError:
- raise RuntimeError(
- "Create a group of connection parameters under the heading "
- "[pandas] in your system's mysql default file, "
- "typically located at ~/.my.cnf or /etc/.my.cnf.")
- except pymysql.Error:
- raise RuntimeError(
- "Cannot connect to database. "
- "Create a group of connection parameters under the heading "
- "[pandas] in your system's mysql default file, "
- "typically located at ~/.my.cnf or /etc/.my.cnf.")
- self.method = request.function
- def test_basic(self):
- frame = tm.makeTimeDataFrame()
- self._check_roundtrip(frame)
- def test_write_row_by_row(self):
- frame = tm.makeTimeDataFrame()
- frame.iloc[0, 0] = np.nan
- drop_sql = "DROP TABLE IF EXISTS test"
- create_sql = sql.get_schema(frame, 'test')
- cur = self.conn.cursor()
- cur.execute(drop_sql)
- cur.execute(create_sql)
- ins = "INSERT INTO test VALUES (%s, %s, %s, %s)"
- for idx, row in frame.iterrows():
- fmt_sql = format_query(ins, *row)
- tquery(fmt_sql, cur=cur)
- self.conn.commit()
- result = sql.read_sql("select * from test", con=self.conn)
- result.index = frame.index
- tm.assert_frame_equal(result, frame, check_less_precise=True)
- def test_chunksize_read_type(self):
- frame = tm.makeTimeDataFrame()
- frame.index.name = "index"
- drop_sql = "DROP TABLE IF EXISTS test"
- cur = self.conn.cursor()
- cur.execute(drop_sql)
- sql.to_sql(frame, name='test', con=self.conn)
- query = "select * from test"
- chunksize = 5
- chunk_gen = pd.read_sql_query(sql=query, con=self.conn,
- chunksize=chunksize, index_col="index")
- chunk_df = next(chunk_gen)
- tm.assert_frame_equal(frame[:chunksize], chunk_df)
- def test_execute(self):
- frame = tm.makeTimeDataFrame()
- drop_sql = "DROP TABLE IF EXISTS test"
- create_sql = sql.get_schema(frame, 'test')
- cur = self.conn.cursor()
- with warnings.catch_warnings():
- warnings.filterwarnings("ignore", "Unknown table.*")
- cur.execute(drop_sql)
- cur.execute(create_sql)
- ins = "INSERT INTO test VALUES (%s, %s, %s, %s)"
- row = frame.iloc[0].values.tolist()
- sql.execute(ins, self.conn, params=tuple(row))
- self.conn.commit()
- result = sql.read_sql("select * from test", self.conn)
- result.index = frame.index[:1]
- tm.assert_frame_equal(result, frame[:1])
- def test_schema(self):
- frame = tm.makeTimeDataFrame()
- create_sql = sql.get_schema(frame, 'test')
- lines = create_sql.splitlines()
- for l in lines:
- tokens = l.split(' ')
- if len(tokens) == 2 and tokens[0] == 'A':
- assert tokens[1] == 'DATETIME'
- frame = tm.makeTimeDataFrame()
- drop_sql = "DROP TABLE IF EXISTS test"
- create_sql = sql.get_schema(frame, 'test', keys=['A', 'B'])
- lines = create_sql.splitlines()
- assert 'PRIMARY KEY (`A`, `B`)' in create_sql
- cur = self.conn.cursor()
- cur.execute(drop_sql)
- cur.execute(create_sql)
- def test_execute_fail(self):
- drop_sql = "DROP TABLE IF EXISTS test"
- create_sql = """
- CREATE TABLE test
- (
- a TEXT,
- b TEXT,
- c REAL,
- PRIMARY KEY (a(5), b(5))
- );
- """
- cur = self.conn.cursor()
- cur.execute(drop_sql)
- cur.execute(create_sql)
- sql.execute('INSERT INTO test VALUES("foo", "bar", 1.234)', self.conn)
- sql.execute('INSERT INTO test VALUES("foo", "baz", 2.567)', self.conn)
- with pytest.raises(Exception):
- sql.execute('INSERT INTO test VALUES("foo", "bar", 7)', self.conn)
- def test_execute_closed_connection(self, request, datapath):
- drop_sql = "DROP TABLE IF EXISTS test"
- create_sql = """
- CREATE TABLE test
- (
- a TEXT,
- b TEXT,
- c REAL,
- PRIMARY KEY (a(5), b(5))
- );
- """
- cur = self.conn.cursor()
- cur.execute(drop_sql)
- cur.execute(create_sql)
- sql.execute('INSERT INTO test VALUES("foo", "bar", 1.234)', self.conn)
- self.conn.close()
- with pytest.raises(Exception):
- tquery("select * from test", con=self.conn)
- # Initialize connection again (needed for tearDown)
- self.setup_method(request, datapath)
- def test_na_roundtrip(self):
- pass
- def _check_roundtrip(self, frame):
- drop_sql = "DROP TABLE IF EXISTS test_table"
- cur = self.conn.cursor()
- with warnings.catch_warnings():
- warnings.filterwarnings("ignore", "Unknown table.*")
- cur.execute(drop_sql)
- sql.to_sql(frame, name='test_table', con=self.conn, index=False)
- result = sql.read_sql("select * from test_table", self.conn)
- # HACK! Change this once indexes are handled properly.
- result.index = frame.index
- result.index.name = frame.index.name
- expected = frame
- tm.assert_frame_equal(result, expected)
- frame['txt'] = ['a'] * len(frame)
- frame2 = frame.copy()
- index = Index(lrange(len(frame2))) + 10
- frame2['Idx'] = index
- drop_sql = "DROP TABLE IF EXISTS test_table2"
- cur = self.conn.cursor()
- with warnings.catch_warnings():
- warnings.filterwarnings("ignore", "Unknown table.*")
- cur.execute(drop_sql)
- sql.to_sql(frame2, name='test_table2',
- con=self.conn, index=False)
- result = sql.read_sql("select * from test_table2", self.conn,
- index_col='Idx')
- expected = frame.copy()
- # HACK! Change this once indexes are handled properly.
- expected.index = index
- expected.index.names = result.index.names
- tm.assert_frame_equal(expected, result)
- def test_keyword_as_column_names(self):
- df = DataFrame({'From': np.ones(5)})
- sql.to_sql(df, con=self.conn, name='testkeywords',
- if_exists='replace', index=False)
- def test_if_exists(self):
- df_if_exists_1 = DataFrame({'col1': [1, 2], 'col2': ['A', 'B']})
- df_if_exists_2 = DataFrame(
- {'col1': [3, 4, 5], 'col2': ['C', 'D', 'E']})
- table_name = 'table_if_exists'
- sql_select = "SELECT * FROM %s" % table_name
- def clean_up(test_table_to_drop):
- """
- Drops tables created from individual tests
- so no dependencies arise from sequential tests
- """
- self.drop_table(test_table_to_drop)
- # test if invalid value for if_exists raises appropriate error
- pytest.raises(ValueError,
- sql.to_sql,
- frame=df_if_exists_1,
- con=self.conn,
- name=table_name,
- if_exists='notvalidvalue')
- clean_up(table_name)
- # test if_exists='fail'
- sql.to_sql(frame=df_if_exists_1, con=self.conn, name=table_name,
- if_exists='fail', index=False)
- pytest.raises(ValueError,
- sql.to_sql,
- frame=df_if_exists_1,
- con=self.conn,
- name=table_name,
- if_exists='fail')
- # test if_exists='replace'
- sql.to_sql(frame=df_if_exists_1, con=self.conn, name=table_name,
- if_exists='replace', index=False)
- assert tquery(sql_select, con=self.conn) == [(1, 'A'), (2, 'B')]
- sql.to_sql(frame=df_if_exists_2, con=self.conn, name=table_name,
- if_exists='replace', index=False)
- assert (tquery(sql_select, con=self.conn) ==
- [(3, 'C'), (4, 'D'), (5, 'E')])
- clean_up(table_name)
- # test if_exists='append'
- sql.to_sql(frame=df_if_exists_1, con=self.conn, name=table_name,
- if_exists='fail', index=False)
- assert tquery(sql_select, con=self.conn) == [(1, 'A'), (2, 'B')]
- sql.to_sql(frame=df_if_exists_2, con=self.conn, name=table_name,
- if_exists='append', index=False)
- assert (tquery(sql_select, con=self.conn) ==
- [(1, 'A'), (2, 'B'), (3, 'C'), (4, 'D'), (5, 'E')])
- clean_up(table_name)
|