# Copyright (c) 2020, 2022, Oracle and/or its affiliates.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License, version 2.0, as
# published by the Free Software Foundation.
#
# This program is also distributed with certain software (including
# but not limited to OpenSSL) that is licensed under separate terms,
# as designated in a particular file or component or in included license
# documentation.  The authors of MySQL hereby grant you an
# additional permission to link the program and your derivative works
# with the separately licensed software that they have included with
# MySQL.
#
# Without limiting anything contained in the foregoing, this file,
# which is part of MySQL Connector/Python, is also subject to the
# Universal FOSS Exception, version 1.0, a copy of which can be found at
# http://oss.oracle.com/licenses/universal-foss-exception.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
# See the GNU General Public License, version 2.0, for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA

# mypy: disable-error-code="override"

"""Django database Backend using MySQL Connector/Python.

This Django database backend is heavily based on the MySQL backend from Django.

Changes include:
* Support for microseconds (MySQL 5.6.3 and later)
* Using INFORMATION_SCHEMA where possible
* Using new defaults for, for example SQL_AUTO_IS_NULL

Requires and comes with MySQL Connector/Python v8.0.22 and later:
    http://dev.mysql.com/downloads/connector/python/
"""

import warnings

from datetime import datetime, time
from typing import Any, Dict, Generator, Iterator, List, Optional, Set, Tuple, Union

from django.conf import settings
from django.core.exceptions import ImproperlyConfigured
from django.db import IntegrityError
from django.db.backends.base.base import BaseDatabaseWrapper
from django.utils import dateparse, timezone
from django.utils.functional import cached_property

try:
    import mysql.connector

    from mysql.connector.connection import MySQLConnection
    from mysql.connector.connection_cext import CMySQLConnection
    from mysql.connector.conversion import MySQLConverter
    from mysql.connector.cursor import MySQLCursor
    from mysql.connector.cursor_cext import CMySQLCursor
    from mysql.connector.custom_types import HexLiteral
    from mysql.connector.pooling import PooledMySQLConnection
    from mysql.connector.types import (
        ParamsDictType,
        ParamsSequenceOrDictType,
        ParamsSequenceType,
        RowType,
        StrOrBytes,
    )
except ImportError as err:
    raise ImproperlyConfigured(f"Error loading mysql.connector module: {err}") from err

try:
    from _mysql_connector import datetime_to_mysql
except ImportError:
    HAVE_CEXT = False
else:
    HAVE_CEXT = True

from .client import DatabaseClient
from .creation import DatabaseCreation
from .features import DatabaseFeatures
from .introspection import DatabaseIntrospection
from .operations import DatabaseOperations
from .schema import DatabaseSchemaEditor
from .validation import DatabaseValidation

Error = mysql.connector.Error
DatabaseError = mysql.connector.DatabaseError
NotSupportedError = mysql.connector.NotSupportedError
OperationalError = mysql.connector.OperationalError
ProgrammingError = mysql.connector.ProgrammingError


def adapt_datetime_with_timezone_support(value: datetime) -> StrOrBytes:
    """Equivalent to DateTimeField.get_db_prep_value. Used only by raw SQL."""
    if settings.USE_TZ:
        if timezone.is_naive(value):
            warnings.warn(
                f"MySQL received a naive datetime ({value})"
                " while time zone support is active.",
                RuntimeWarning,
            )
            default_timezone = timezone.get_default_timezone()
            value = timezone.make_aware(value, default_timezone)
        value = value.astimezone(timezone.utc).replace(tzinfo=None)
    if HAVE_CEXT:
        mysql_datetime: bytes = datetime_to_mysql(value)
        return mysql_datetime
    return value.strftime("%Y-%m-%d %H:%M:%S.%f")


class CursorWrapper:
    """Wrapper around MySQL Connector/Python's cursor class.

    The cursor class is defined by the options passed to MySQL
    Connector/Python. If buffered option is True in those options,
    MySQLCursorBuffered will be used.
    """

    codes_for_integrityerror = (
        1048,  # Column cannot be null
        1690,  # BIGINT UNSIGNED value is out of range
        3819,  # CHECK constraint is violated
        4025,  # CHECK constraint failed
    )

    def __init__(self, cursor: Union[MySQLCursor, CMySQLCursor]) -> None:
        self.cursor: Union[MySQLCursor, CMySQLCursor] = cursor

    @staticmethod
    def _adapt_execute_args_dict(args: ParamsDictType) -> ParamsDictType:
        if not args:
            return args
        new_args = dict(args)
        for key, value in args.items():
            if isinstance(value, datetime):
                new_args[key] = adapt_datetime_with_timezone_support(value)

        return new_args

    @staticmethod
    def _adapt_execute_args(
        args: Optional[ParamsSequenceType],
    ) -> Optional[ParamsSequenceType]:
        if not args:
            return args
        new_args = list(args)
        for i, arg in enumerate(args):
            if isinstance(arg, datetime):
                new_args[i] = adapt_datetime_with_timezone_support(arg)

        return tuple(new_args)

    def execute(
        self, query: str, args: Optional[ParamsSequenceOrDictType] = None
    ) -> Optional[Generator[Union[MySQLCursor, CMySQLCursor], None, None]]:
        """Executes the given operation

        This wrapper method around the execute()-method of the cursor is
        mainly needed to re-raise using different exceptions.
        """
        new_args: Optional[ParamsSequenceOrDictType] = None
        if isinstance(args, dict):
            new_args = self._adapt_execute_args_dict(args)
        else:
            new_args = self._adapt_execute_args(args)
        try:
            return self.cursor.execute(query, new_args)
        except mysql.connector.OperationalError as exc:
            if exc.args[0] in self.codes_for_integrityerror:
                raise IntegrityError(*tuple(exc.args)) from None
            raise

    def executemany(
        self,
        query: str,
        args: Union[
            Tuple[ParamsSequenceOrDictType, ...],
            List[ParamsSequenceOrDictType],
        ],
    ) -> Optional[Generator[Union[MySQLCursor, CMySQLCursor], None, None]]:
        """Executes the given operation

        This wrapper method around the executemany()-method of the cursor is
        mainly needed to re-raise using different exceptions.
        """
        try:
            return self.cursor.executemany(query, args)
        except mysql.connector.OperationalError as exc:
            if exc.args[0] in self.codes_for_integrityerror:
                raise IntegrityError(*tuple(exc.args)) from None
            raise

    def __getattr__(self, attr: Any) -> Any:
        """Return an attribute of wrapped cursor"""
        return getattr(self.cursor, attr)

    def __iter__(self) -> Iterator[RowType]:
        """Return an iterator over wrapped cursor"""
        return iter(self.cursor)


class DatabaseWrapper(BaseDatabaseWrapper):  # pylint: disable=abstract-method
    """Represent a database connection."""

    vendor = "mysql"
    # This dictionary maps Field objects to their associated MySQL column
    # types, as strings. Column-type strings can contain format strings; they'll
    # be interpolated against the values of Field.__dict__ before being output.
    # If a column type is set to None, it won't be included in the output.
    data_types = {
        "AutoField": "integer AUTO_INCREMENT",
        "BigAutoField": "bigint AUTO_INCREMENT",
        "BinaryField": "longblob",
        "BooleanField": "bool",
        "CharField": "varchar(%(max_length)s)",
        "DateField": "date",
        "DateTimeField": "datetime(6)",
        "DecimalField": "numeric(%(max_digits)s, %(decimal_places)s)",
        "DurationField": "bigint",
        "FileField": "varchar(%(max_length)s)",
        "FilePathField": "varchar(%(max_length)s)",
        "FloatField": "double precision",
        "IntegerField": "integer",
        "BigIntegerField": "bigint",
        "IPAddressField": "char(15)",
        "GenericIPAddressField": "char(39)",
        "JSONField": "json",
        "NullBooleanField": "bool",
        "OneToOneField": "integer",
        "PositiveBigIntegerField": "bigint UNSIGNED",
        "PositiveIntegerField": "integer UNSIGNED",
        "PositiveSmallIntegerField": "smallint UNSIGNED",
        "SlugField": "varchar(%(max_length)s)",
        "SmallAutoField": "smallint AUTO_INCREMENT",
        "SmallIntegerField": "smallint",
        "TextField": "longtext",
        "TimeField": "time(6)",
        "UUIDField": "char(32)",
    }

    # For these data types:
    # - MySQL < 8.0.13 doesn't accept default values and
    #   implicitly treat them as nullable
    # - all versions of MySQL doesn't support full width database
    #   indexes
    _limited_data_types = (
        "tinyblob",
        "blob",
        "mediumblob",
        "longblob",
        "tinytext",
        "text",
        "mediumtext",
        "longtext",
        "json",
    )

    operators = {
        "exact": "= %s",
        "iexact": "LIKE %s",
        "contains": "LIKE BINARY %s",
        "icontains": "LIKE %s",
        "regex": "REGEXP BINARY %s",
        "iregex": "REGEXP %s",
        "gt": "> %s",
        "gte": ">= %s",
        "lt": "< %s",
        "lte": "<= %s",
        "startswith": "LIKE BINARY %s",
        "endswith": "LIKE BINARY %s",
        "istartswith": "LIKE %s",
        "iendswith": "LIKE %s",
    }

    # The patterns below are used to generate SQL pattern lookup clauses when
    # the right-hand side of the lookup isn't a raw string (it might be an expression
    # or the result of a bilateral transformation).
    # In those cases, special characters for LIKE operators (e.g. \, *, _) should be
    # escaped on database side.
    #
    # Note: we use str.format() here for readability as '%' is used as a wildcard for
    # the LIKE operator.
    pattern_esc = r"REPLACE(REPLACE(REPLACE({}, '\\', '\\\\'), '%%', '\%%'), '_', '\_')"
    pattern_ops = {
        "contains": "LIKE BINARY CONCAT('%%', {}, '%%')",
        "icontains": "LIKE CONCAT('%%', {}, '%%')",
        "startswith": "LIKE BINARY CONCAT({}, '%%')",
        "istartswith": "LIKE CONCAT({}, '%%')",
        "endswith": "LIKE BINARY CONCAT('%%', {})",
        "iendswith": "LIKE CONCAT('%%', {})",
    }

    isolation_level: Optional[str] = None
    isolation_levels = {
        "read uncommitted",
        "read committed",
        "repeatable read",
        "serializable",
    }

    Database = mysql.connector
    SchemaEditorClass = DatabaseSchemaEditor
    # Classes instantiated in __init__().
    client_class = DatabaseClient
    creation_class = DatabaseCreation
    features_class = DatabaseFeatures
    introspection_class = DatabaseIntrospection
    ops_class = DatabaseOperations
    validation_class = DatabaseValidation

    def __init__(self, *args: Any, **kwargs: Any) -> None:
        super().__init__(*args, **kwargs)

        options = self.settings_dict.get("OPTIONS")
        if options:
            self._use_pure = options.get("use_pure", not HAVE_CEXT)
            converter_class = options.get(
                "converter_class",
                DjangoMySQLConverter,
            )
            if not issubclass(converter_class, DjangoMySQLConverter):
                raise ProgrammingError(
                    "Converter class should be a subclass of "
                    "mysql.connector.django.base.DjangoMySQLConverter"
                )
            self.converter = converter_class()
        else:
            self.converter = DjangoMySQLConverter()
            self._use_pure = not HAVE_CEXT

    def __getattr__(self, attr: str) -> bool:
        if attr.startswith("mysql_is"):
            return False
        raise AttributeError

    def get_connection_params(self) -> Dict[str, Any]:
        kwargs = {
            "charset": "utf8",
            "use_unicode": True,
            "buffered": False,
            "consume_results": True,
        }

        settings_dict = self.settings_dict

        if settings_dict["USER"]:
            kwargs["user"] = settings_dict["USER"]
        if settings_dict["NAME"]:
            kwargs["database"] = settings_dict["NAME"]
        if settings_dict["PASSWORD"]:
            kwargs["passwd"] = settings_dict["PASSWORD"]
        if settings_dict["HOST"].startswith("/"):
            kwargs["unix_socket"] = settings_dict["HOST"]
        elif settings_dict["HOST"]:
            kwargs["host"] = settings_dict["HOST"]
        if settings_dict["PORT"]:
            kwargs["port"] = int(settings_dict["PORT"])
        if settings_dict.get("OPTIONS", {}).get("init_command"):
            kwargs["init_command"] = settings_dict["OPTIONS"]["init_command"]

        # Raise exceptions for database warnings if DEBUG is on
        kwargs["raise_on_warnings"] = settings.DEBUG

        kwargs["client_flags"] = [
            # Need potentially affected rows on UPDATE
            mysql.connector.constants.ClientFlag.FOUND_ROWS,
        ]

        try:
            options = settings_dict["OPTIONS"].copy()
            isolation_level = options.pop("isolation_level")
            if isolation_level:
                isolation_level = isolation_level.lower()
                if isolation_level not in self.isolation_levels:
                    valid_levels = ", ".join(
                        f"'{level}'" for level in sorted(self.isolation_levels)
                    )
                    raise ImproperlyConfigured(
                        f"Invalid transaction isolation level '{isolation_level}' "
                        f"specified.\nUse one of {valid_levels}, or None."
                    )
            self.isolation_level = isolation_level
            kwargs.update(options)
        except KeyError:
            # OPTIONS missing is OK
            pass
        return kwargs

    def get_new_connection(
        self, conn_params: Dict[str, Any]
    ) -> Union[PooledMySQLConnection, MySQLConnection, CMySQLConnection]:
        if "converter_class" not in conn_params:
            conn_params["converter_class"] = DjangoMySQLConverter
        cnx = mysql.connector.connect(**conn_params)

        return cnx

    def init_connection_state(self) -> None:
        assignments = []
        if self.features.is_sql_auto_is_null_enabled:  # type: ignore[attr-defined]
            # SQL_AUTO_IS_NULL controls whether an AUTO_INCREMENT column on
            # a recently inserted row will return when the field is tested
            # for NULL. Disabling this brings this aspect of MySQL in line
            # with SQL standards.
            assignments.append("SET SQL_AUTO_IS_NULL = 0")

        if self.isolation_level:
            assignments.append(
                "SET SESSION TRANSACTION ISOLATION LEVEL "
                f"{self.isolation_level.upper()}"
            )

        if assignments:
            with self.cursor() as cursor:
                cursor.execute("; ".join(assignments))

        if "AUTOCOMMIT" in self.settings_dict:
            try:
                self.set_autocommit(self.settings_dict["AUTOCOMMIT"])
            except AttributeError:
                self._set_autocommit(self.settings_dict["AUTOCOMMIT"])

    def create_cursor(self, name: Any = None) -> CursorWrapper:
        cursor = self.connection.cursor()
        return CursorWrapper(cursor)

    def _rollback(self) -> None:
        try:
            BaseDatabaseWrapper._rollback(self)  # type: ignore[attr-defined]
        except NotSupportedError:
            pass

    def _set_autocommit(self, autocommit: bool) -> None:
        with self.wrap_database_errors:
            self.connection.autocommit = autocommit

    def disable_constraint_checking(self) -> bool:
        """
        Disable foreign key checks, primarily for use in adding rows with
        forward references. Always return True to indicate constraint checks
        need to be re-enabled.
        """
        with self.cursor() as cursor:
            cursor.execute("SET foreign_key_checks=0")
        return True

    def enable_constraint_checking(self) -> None:
        """
        Re-enable foreign key checks after they have been disabled.
        """
        # Override needs_rollback in case constraint_checks_disabled is
        # nested inside transaction.atomic.
        self.needs_rollback, needs_rollback = False, self.needs_rollback
        try:
            with self.cursor() as cursor:
                cursor.execute("SET foreign_key_checks=1")
        finally:
            self.needs_rollback = needs_rollback

    def check_constraints(self, table_names: Optional[List[str]] = None) -> None:
        """
        Check each table name in `table_names` for rows with invalid foreign
        key references. This method is intended to be used in conjunction with
        `disable_constraint_checking()` and `enable_constraint_checking()`, to
        determine if rows with invalid references were entered while constraint
        checks were off.
        """
        with self.cursor() as cursor:
            if table_names is None:
                table_names = self.introspection.table_names(cursor)
            for table_name in table_names:
                primary_key_column_name = self.introspection.get_primary_key_column(
                    cursor, table_name
                )
                if not primary_key_column_name:
                    continue
                key_columns = self.introspection.get_key_columns(cursor, table_name)
                for (
                    column_name,
                    referenced_table_name,
                    referenced_column_name,
                ) in key_columns:
                    cursor.execute(
                        f"""
                        SELECT REFERRING.`{primary_key_column_name}`,
                        REFERRING.`{column_name}`
                        FROM `{table_name}` as REFERRING
                        LEFT JOIN `{referenced_table_name}` as REFERRED
                        ON (
                            REFERRING.`{column_name}` =
                            REFERRED.`{referenced_column_name}`
                        )
                        WHERE REFERRING.`{column_name}` IS NOT NULL
                        AND REFERRED.`{referenced_column_name}` IS NULL
                        """
                    )
                    for bad_row in cursor.fetchall():
                        raise IntegrityError(
                            f"The row in table '{table_name}' with primary "
                            f"key '{bad_row[0]}' has an invalid foreign key: "
                            f"{table_name}.{column_name} contains a value "
                            f"'{bad_row[1]}' that does not have a "
                            f"corresponding value in "
                            f"{referenced_table_name}."
                            f"{referenced_column_name}."
                        )

    def is_usable(self) -> bool:
        try:
            self.connection.ping()
        except Error:
            return False
        else:
            return True

    @cached_property
    @staticmethod
    def display_name() -> str:
        """Display name."""
        return "MySQL"

    @cached_property
    def data_type_check_constraints(self) -> Dict[str, str]:
        """Mapping of Field objects to their SQL for CHECK constraints."""
        if self.features.supports_column_check_constraints:
            check_constraints = {
                "PositiveBigIntegerField": "`%(column)s` >= 0",
                "PositiveIntegerField": "`%(column)s` >= 0",
                "PositiveSmallIntegerField": "`%(column)s` >= 0",
            }
            return check_constraints
        return {}

    @cached_property
    def mysql_server_data(self) -> Dict[str, Any]:
        """Return MySQL server data."""
        with self.temporary_connection() as cursor:
            # Select some server variables and test if the time zone
            # definitions are installed. CONVERT_TZ returns NULL if 'UTC'
            # timezone isn't loaded into the mysql.time_zone table.
            cursor.execute(
                """
                SELECT VERSION(),
                       @@sql_mode,
                       @@default_storage_engine,
                       @@sql_auto_is_null,
                       @@lower_case_table_names,
                       CONVERT_TZ('2001-01-01 01:00:00', 'UTC', 'UTC') IS NOT NULL
            """
            )
            row = cursor.fetchone()
        return {
            "version": row[0],
            "sql_mode": row[1],
            "default_storage_engine": row[2],
            "sql_auto_is_null": bool(row[3]),
            "lower_case_table_names": bool(row[4]),
            "has_zoneinfo_database": bool(row[5]),
        }

    @cached_property
    def mysql_server_info(self) -> Any:
        """Return MySQL version."""
        with self.temporary_connection() as cursor:
            cursor.execute("SELECT VERSION()")
            return cursor.fetchone()[0]

    @cached_property
    def mysql_version(self) -> Tuple[int, ...]:
        """Return MySQL version."""
        config = self.get_connection_params()
        with mysql.connector.connect(**config) as conn:
            server_version: Tuple[int, ...] = conn.get_server_version()
        return server_version

    @cached_property
    def sql_mode(self) -> Set[str]:
        """Return SQL mode."""
        with self.cursor() as cursor:
            cursor.execute("SELECT @@sql_mode")
            sql_mode = cursor.fetchone()
        return set(sql_mode[0].split(",") if sql_mode else ())

    @property
    def use_pure(self) -> bool:
        """Return True if pure Python version is being used."""
        ans: bool = self._use_pure
        return ans


class DjangoMySQLConverter(MySQLConverter):
    """Custom converter for Django."""

    # pylint: disable=unused-argument

    @staticmethod
    def _time_to_python(value: bytes, dsc: Any = None) -> Optional[time]:
        """Return MySQL TIME data type as datetime.time()

        Returns datetime.time()
        """
        return dateparse.parse_time(value.decode("utf-8"))

    @staticmethod
    def _datetime_to_python(value: bytes, dsc: Any = None) -> Optional[datetime]:
        """Connector/Python always returns naive datetime.datetime

        Connector/Python always returns naive timestamps since MySQL has
        no time zone support.

        - A naive datetime is a datetime that doesn't know its own timezone.

        Django needs a non-naive datetime, but in this method we don't need
        to make a datetime value time zone aware since Django itself at some
        point will make it aware (at least in versions 3.2.16 and 4.1.2) when
        USE_TZ=True. This may change in a future release, we need to keep an
        eye on this behaviour.

        Returns datetime.datetime()
        """
        return MySQLConverter._datetime_to_python(value) if value else None

    # pylint: enable=unused-argument

    def _safestring_to_mysql(self, value: str) -> Union[bytes, HexLiteral]:
        return self._str_to_mysql(value)

    def _safetext_to_mysql(self, value: str) -> Union[bytes, HexLiteral]:
        return self._str_to_mysql(value)

    def _safebytes_to_mysql(self, value: bytes) -> bytes:
        return self._bytes_to_mysql(value)
