123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335 |
- """Microsoft SQL Server database backend for Django."""
- from __future__ import absolute_import, unicode_literals
- import warnings
- from django.core.exceptions import ImproperlyConfigured, ValidationError
- from django.core.validators import validate_ipv46_address as ip_validator
- from django.db.backends.base.base import BaseDatabaseWrapper
- from django.db.backends.base.client import BaseDatabaseClient
- from django.db.backends.base.validation import BaseDatabaseValidation
- from django.db.utils import IntegrityError
- from . import dbapi as Database
- from .introspection import DatabaseIntrospection
- from .creation import DatabaseCreation
- from .features import DatabaseFeatures
- from .operations import DatabaseOperations
- from .schema import DatabaseSchemaEditor
- def is_ip_address(value):
- """
- Returns True if value is a valid IP address, otherwise False.
- """
- try:
- ip_validator(value)
- except ValidationError:
- return False
- return True
- def connection_string_from_settings():
- from django.conf import settings
- db_settings = getattr(settings, 'DATABASES', {}).get('default', None)
- return make_connection_string(db_settings)
- def make_connection_string(settings):
- db_name = settings['NAME'].strip()
- db_host = settings['HOST'] or '127.0.0.1'
- db_port = settings['PORT']
- db_user = settings['USER']
- db_password = settings['PASSWORD']
- options = settings.get('OPTIONS', {})
- if len(db_name) == 0:
- raise ImproperlyConfigured("You need to specify a DATABASE NAME in your Django settings file.")
- # Connection strings courtesy of:
- # http://www.connectionstrings.com/?carrier=sqlserver
- # If a port is given, force a TCP/IP connection. The host should be an IP address in this case.
- if db_port:
- if not is_ip_address(db_host):
- raise ImproperlyConfigured("When using DATABASE PORT, DATABASE HOST must be an IP address.")
- try:
- db_port = int(db_port)
- except ValueError:
- raise ImproperlyConfigured("DATABASE PORT must be a number.")
- db_host = '{0},{1};Network Library=DBMSSOCN'.format(db_host, db_port)
- # If no user is specified, use integrated security.
- if db_user != '':
- auth_string = 'UID={0};PWD={1}'.format(db_user, db_password)
- else:
- auth_string = 'Integrated Security=SSPI'
- parts = [
- 'DATA SOURCE={0};Initial Catalog={1}'.format(db_host, db_name),
- auth_string
- ]
- if not options.get('provider', None):
- options['provider'] = 'sqlncli10'
- parts.append('PROVIDER={0}'.format(options['provider']))
- extra_params = options.get('extra_params', '')
- if 'sqlncli' in options['provider'].lower() and 'datatypecompatibility=' not in extra_params.lower():
- # native client needs a compatibility mode that behaves like OLEDB
- parts.append('DataTypeCompatibility=80')
- if options.get('use_mars', True) and 'mars connection=' not in extra_params.lower():
- parts.append('MARS Connection=True')
- if extra_params:
- parts.append(options['extra_params'])
- return ";".join(parts)
- VERSION_SQL2012 = 11
- class DatabaseWrapper(BaseDatabaseWrapper):
- vendor = 'microsoft'
- Database = Database
- SchemaEditorClass = DatabaseSchemaEditor
- operators = {
- "exact": "= %s",
- "iexact": "LIKE %s ESCAPE '\\'",
- "contains": "LIKE %s ESCAPE '\\'",
- "icontains": "LIKE %s ESCAPE '\\'",
- "gt": "> %s",
- "gte": ">= %s",
- "lt": "< %s",
- "lte": "<= %s",
- "startswith": "LIKE %s ESCAPE '\\'",
- "endswith": "LIKE %s ESCAPE '\\'",
- "istartswith": "LIKE %s ESCAPE '\\'",
- "iendswith": "LIKE %s ESCAPE '\\'",
- }
- # The patterns below are used to generate SQL pattern lookup clauses when
- # the right-hand side of the lookup isn't a raw string (it might be an expression
- # or the result of a bilateral transformation).
- # In those cases, special characters for LIKE operators (e.g. \, *, _) should be
- # escaped on database side.
- #
- # Note: we use str.format() here for readability as '%' is used as a wildcard for
- # the LIKE operator.
- pattern_esc = r"REPLACE(REPLACE(REPLACE({}, '\', '\\'), '%%', '\%%'), '_', '\_')"
- pattern_ops = {
- 'contains': r"LIKE CONCAT('%%', {}, '%%') ESCAPE '\'",
- 'icontains': r"LIKE CONCAT('%%', {}, '%%') ESCAPE '\'",
- 'startswith': r"LIKE CONCAT({}, '%%') ESCAPE '\'",
- 'istartswith': r"LIKE CONCAT({}, '%%') ESCAPE '\'",
- 'endswith': r"LIKE CONCAT('%%', {}) ESCAPE '\'",
- 'iendswith': r"LIKE CONCAT('%%', {}) ESCAPE '\'",
- }
- # This dictionary maps Field objects to their associated Server Server column
- # types, as strings. Column-type strings can contain format strings; they'll
- # be interpolated against the values of Field.__dict__.
- data_types = {
- 'AutoField': 'int IDENTITY (1, 1)',
- 'BigAutoField': 'bigint IDENTITY (1, 1)',
- 'BigIntegerField': 'bigint',
- 'BinaryField': 'varbinary(max)',
- 'BooleanField': 'bit',
- 'CharField': 'nvarchar(%(max_length)s)',
- 'CommaSeparatedIntegerField': 'nvarchar(%(max_length)s)',
- 'DateField': 'date',
- 'DateTimeField': 'datetime2',
- 'DateTimeOffsetField': 'datetimeoffset',
- 'DecimalField': 'decimal(%(max_digits)s, %(decimal_places)s)',
- 'DurationField': 'bigint',
- 'FileField': 'nvarchar(%(max_length)s)',
- 'FilePathField': 'nvarchar(%(max_length)s)',
- 'FloatField': 'double precision',
- 'GenericIPAddressField': 'nvarchar(39)',
- 'IntegerField': 'int',
- 'IPAddressField': 'nvarchar(15)',
- 'LegacyDateField': 'datetime',
- 'LegacyDateTimeField': 'datetime',
- 'LegacyTimeField': 'time',
- 'NewDateField': 'date',
- 'NewDateTimeField': 'datetime2',
- 'NewTimeField': 'time',
- 'NullBooleanField': 'bit',
- 'OneToOneField': 'int',
- 'PositiveIntegerField': 'int',
- 'PositiveSmallIntegerField': 'smallint',
- 'SlugField': 'nvarchar(%(max_length)s)',
- 'SmallIntegerField': 'smallint',
- 'TextField': 'nvarchar(max)',
- 'TimeField': 'time',
- 'URLField': 'nvarchar(%(max_length)s)',
- 'UUIDField': 'uniqueidentifier',
- }
- data_type_check_constraints = {
- 'PositiveIntegerField': '%(qn_column)s >= 0',
- 'PositiveSmallIntegerField': '%(qn_column)s >= 0',
- }
- def __init__(self, *args, **kwargs):
- self.use_transactions = kwargs.pop('use_transactions', None)
- super(DatabaseWrapper, self).__init__(*args, **kwargs)
- try:
- self.command_timeout = int(self.settings_dict.get('COMMAND_TIMEOUT', 30))
- except ValueError:
- self.command_timeout = 30
- options = self.settings_dict.get('OPTIONS', {})
- try:
- self.cast_avg_to_float = not bool(options.get('disable_avg_cast', False))
- except ValueError:
- self.cast_avg_to_float = False
- if 'use_legacy_date_fields' in options:
- warnings.warn(
- "The `use_legacy_date_fields` setting is no longer supported. "
- "If you need to use the legacy SQL 'datetime' datatype, "
- "you must replace them with the provided model fields.",
- DeprecationWarning)
- self.features = DatabaseFeatures(self)
- self.ops = DatabaseOperations(self)
- self.client = BaseDatabaseClient(self)
- self.creation = DatabaseCreation(self)
- self.introspection = DatabaseIntrospection(self)
- self.validation = BaseDatabaseValidation(self)
- def get_connection_params(self):
- """Returns a dict of parameters suitable for get_new_connection."""
- settings_dict = self.settings_dict.copy()
- if settings_dict['NAME'] == '':
- from django.core.exceptions import ImproperlyConfigured
- raise ImproperlyConfigured(
- "settings.DATABASES is improperly configured. "
- "Please supply the NAME value.")
- if not settings_dict['NAME']:
- # if _nodb_connection, connect to master
- settings_dict['NAME'] = 'master'
- autocommit = settings_dict.get('OPTIONS', {}).get('autocommit', False)
- return {
- 'connection_string': make_connection_string(settings_dict),
- 'timeout': self.command_timeout,
- 'use_transactions': not autocommit,
- }
- def get_new_connection(self, conn_params):
- """Opens a connection to the database."""
- self.__connection_string = conn_params.get('connection_string', '')
- conn = Database.connect(**conn_params)
- return conn
- def init_connection_state(self):
- """Initializes the database connection settings."""
- # if 'mars connection=true' in self.__connection_string.lower():
- # # Issue #41 - Cannot use MARS with savepoints
- # self.features.uses_savepoints = False
- # cache the properties on the connection
- self.connection.adoConnProperties = dict([(x.Name, x.Value) for x in self.connection.adoConn.Properties])
- try:
- sql_version = int(self.__get_dbms_version().split('.', 2)[0])
- except (IndexError, ValueError):
- warnings.warn(
- "Unable to determine MS SQL server version. Only SQL 2012 or "
- "newer is supported.", DeprecationWarning)
- else:
- if sql_version < VERSION_SQL2012:
- warnings.warn(
- "This version of MS SQL server is no longer tested with "
- "django-mssql and not officially supported/maintained.",
- DeprecationWarning)
- def create_cursor(self):
- """Creates a cursor. Assumes that a connection is established."""
- cursor = self.connection.cursor()
- return cursor
- def _set_autocommit(self, value):
- self.connection.set_autocommit(value)
- def __get_dbms_version(self, make_connection=True):
- """
- Returns the 'DBMS Version' string, or ''. If a connection to the database has not already
- been established, a connection will be made when `make_connection` is True.
- """
- if not self.connection and make_connection:
- self.connect()
- return self.connection.adoConnProperties.get('DBMS Version', '') if self.connection else ''
- def disable_constraint_checking(self):
- """
- Turn off constraint checking for every table
- """
- if self.connection:
- cursor = self.connection.cursor()
- else:
- cursor = self._cursor()
- cursor.execute('EXEC sp_MSforeachtable "ALTER TABLE ? NOCHECK CONSTRAINT all"')
- return True
- def enable_constraint_checking(self):
- """
- Turn on constraint checking for every table
- """
- if self.connection:
- cursor = self.connection.cursor()
- else:
- cursor = self._cursor()
- # don't check the data, just turn them on
- cursor.execute('EXEC sp_MSforeachtable "ALTER TABLE ? WITH NOCHECK CHECK CONSTRAINT all"')
- def check_constraints(self, table_names=None):
- """
- Check the table constraints.
- """
- if self.connection:
- cursor = self.connection.cursor()
- else:
- cursor = self._cursor()
- if not table_names:
- cursor.execute('DBCC CHECKCONSTRAINTS WITH ALL_CONSTRAINTS')
- if cursor.description:
- raise IntegrityError(cursor.fetchall())
- else:
- qn = self.ops.quote_name
- for name in table_names:
- cursor.execute('DBCC CHECKCONSTRAINTS({0}) WITH ALL_CONSTRAINTS'.format(
- qn(name)
- ))
- if cursor.description:
- raise IntegrityError(cursor.fetchall())
- # # MS SQL Server doesn't support explicit savepoint commits; savepoints are
- # # implicitly committed with the transaction.
- # # Ignore them.
- def _savepoint_commit(self, sid):
- if self.queries_log:
- self.queries_log.append({
- 'sql': '-- RELEASE SAVEPOINT %s -- (because assertNumQueries)' % self.ops.quote_name(sid),
- 'time': '0.000',
- })
- def is_usable(self):
- try:
- # Use a mssql cursor directly, bypassing Django's utilities.
- with self.connection.cursor() as cursor:
- cursor.execute("SELECT 1")
- except self.Database.Error:
- return False
- else:
- return True
|