base.py 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882
  1. # Licensed to the Apache Software Foundation (ASF) under one
  2. # or more contributor license agreements. See the NOTICE file
  3. # distributed with this work for additional information
  4. # regarding copyright ownership. The ASF licenses this file
  5. # to you under the Apache License, Version 2.0 (the
  6. # "License"); you may not use this file except in compliance
  7. # with the License. You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing,
  12. # software distributed under the License is distributed on an
  13. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  14. # KIND, either express or implied. See the License for the
  15. # specific language governing permissions and limitations
  16. # under the License.
  17. # pylint: disable=unused-argument
  18. import hashlib
  19. import os
  20. import re
  21. from contextlib import closing
  22. from datetime import datetime
  23. from typing import Any, Dict, List, NamedTuple, Optional, Tuple, TYPE_CHECKING, Union
  24. import pandas as pd
  25. import sqlparse
  26. from flask import g
  27. from flask_babel import lazy_gettext as _
  28. from sqlalchemy import column, DateTime, select
  29. from sqlalchemy.engine.base import Engine
  30. from sqlalchemy.engine.interfaces import Compiled, Dialect
  31. from sqlalchemy.engine.reflection import Inspector
  32. from sqlalchemy.engine.url import URL
  33. from sqlalchemy.ext.compiler import compiles
  34. from sqlalchemy.orm import Session
  35. from sqlalchemy.sql import quoted_name, text
  36. from sqlalchemy.sql.expression import ColumnClause, ColumnElement, Select, TextAsFrom
  37. from sqlalchemy.types import TypeEngine
  38. from wtforms.form import Form
  39. from superset import app, sql_parse
  40. from superset.models.sql_lab import Query
  41. from superset.utils import core as utils
  42. if TYPE_CHECKING:
  43. # prevent circular imports
  44. from superset.connectors.sqla.models import ( # pylint: disable=unused-import
  45. TableColumn,
  46. )
  47. from superset.models.core import Database # pylint: disable=unused-import
  48. class TimeGrain(NamedTuple): # pylint: disable=too-few-public-methods
  49. name: str # TODO: redundant field, remove
  50. label: str
  51. function: str
  52. duration: Optional[str]
  53. QueryStatus = utils.QueryStatus
  54. config = app.config
  55. builtin_time_grains: Dict[Optional[str], str] = {
  56. None: "Time Column",
  57. "PT1S": "second",
  58. "PT1M": "minute",
  59. "PT5M": "5 minute",
  60. "PT10M": "10 minute",
  61. "PT15M": "15 minute",
  62. "PT0.5H": "half hour",
  63. "PT1H": "hour",
  64. "P1D": "day",
  65. "P1W": "week",
  66. "P1M": "month",
  67. "P0.25Y": "quarter",
  68. "P1Y": "year",
  69. "1969-12-28T00:00:00Z/P1W": "week_start_sunday",
  70. "1969-12-29T00:00:00Z/P1W": "week_start_monday",
  71. "P1W/1970-01-03T00:00:00Z": "week_ending_saturday",
  72. "P1W/1970-01-04T00:00:00Z": "week_ending_sunday",
  73. }
  74. class TimestampExpression(
  75. ColumnClause
  76. ): # pylint: disable=abstract-method,too-many-ancestors,too-few-public-methods
  77. def __init__(self, expr: str, col: ColumnClause, **kwargs: Any) -> None:
  78. """Sqlalchemy class that can be can be used to render native column elements
  79. respeting engine-specific quoting rules as part of a string-based expression.
  80. :param expr: Sql expression with '{col}' denoting the locations where the col
  81. object will be rendered.
  82. :param col: the target column
  83. """
  84. super().__init__(expr, **kwargs)
  85. self.col = col
  86. @property
  87. def _constructor(self) -> ColumnClause:
  88. # Needed to ensure that the column label is rendered correctly when
  89. # proxied to the outer query.
  90. # See https://github.com/sqlalchemy/sqlalchemy/issues/4730
  91. return ColumnClause
  92. @compiles(TimestampExpression)
  93. def compile_timegrain_expression(
  94. element: TimestampExpression, compiler: Compiled, **kwargs: Any
  95. ) -> str:
  96. return element.name.replace("{col}", compiler.process(element.col, **kwargs))
  97. class LimitMethod: # pylint: disable=too-few-public-methods
  98. """Enum the ways that limits can be applied"""
  99. FETCH_MANY = "fetch_many"
  100. WRAP_SQL = "wrap_sql"
  101. FORCE_LIMIT = "force_limit"
  102. class BaseEngineSpec: # pylint: disable=too-many-public-methods
  103. """Abstract class for database engine specific configurations"""
  104. engine = "base" # str as defined in sqlalchemy.engine.engine
  105. _time_grain_functions: Dict[Optional[str], str] = {}
  106. time_groupby_inline = False
  107. limit_method = LimitMethod.FORCE_LIMIT
  108. time_secondary_columns = False
  109. allows_joins = True
  110. allows_subqueries = True
  111. allows_column_aliases = True
  112. force_column_alias_quotes = False
  113. arraysize = 0
  114. max_column_name_length = 0
  115. try_remove_schema_from_table_name = True # pylint: disable=invalid-name
  116. @classmethod
  117. def get_allow_cost_estimate(cls, version: Optional[str] = None) -> bool:
  118. return False
  119. @classmethod
  120. def get_engine(
  121. cls,
  122. database: "Database",
  123. schema: Optional[str] = None,
  124. source: Optional[str] = None,
  125. ) -> Engine:
  126. user_name = utils.get_username()
  127. return database.get_sqla_engine(
  128. schema=schema, nullpool=True, user_name=user_name, source=source
  129. )
  130. @classmethod
  131. def get_timestamp_expr(
  132. cls, col: ColumnClause, pdf: Optional[str], time_grain: Optional[str]
  133. ) -> TimestampExpression:
  134. """
  135. Construct a TimestampExpression to be used in a SQLAlchemy query.
  136. :param col: Target column for the TimestampExpression
  137. :param pdf: date format (seconds or milliseconds)
  138. :param time_grain: time grain, e.g. P1Y for 1 year
  139. :return: TimestampExpression object
  140. """
  141. if time_grain:
  142. time_expr = cls.get_time_grain_functions().get(time_grain)
  143. if not time_expr:
  144. raise NotImplementedError(
  145. f"No grain spec for {time_grain} for database {cls.engine}"
  146. )
  147. else:
  148. time_expr = "{col}"
  149. # if epoch, translate to DATE using db specific conf
  150. if pdf == "epoch_s":
  151. time_expr = time_expr.replace("{col}", cls.epoch_to_dttm())
  152. elif pdf == "epoch_ms":
  153. time_expr = time_expr.replace("{col}", cls.epoch_ms_to_dttm())
  154. return TimestampExpression(time_expr, col, type_=DateTime)
  155. @classmethod
  156. def get_time_grains(cls) -> Tuple[TimeGrain, ...]:
  157. """
  158. Generate a tuple of supported time grains.
  159. :return: All time grains supported by the engine
  160. """
  161. ret_list = []
  162. time_grain_functions = cls.get_time_grain_functions()
  163. time_grains = builtin_time_grains.copy()
  164. time_grains.update(config["TIME_GRAIN_ADDONS"])
  165. for duration, func in time_grain_functions.items():
  166. if duration in time_grains:
  167. name = time_grains[duration]
  168. ret_list.append(TimeGrain(name, _(name), func, duration))
  169. return tuple(ret_list)
  170. @classmethod
  171. def get_time_grain_functions(cls) -> Dict[Optional[str], str]:
  172. """
  173. Return a dict of all supported time grains including any potential added grains
  174. but excluding any potentially blacklisted grains in the config file.
  175. :return: All time grain functions supported by the engine
  176. """
  177. # TODO: use @memoize decorator or similar to avoid recomputation on every call
  178. time_grain_functions = cls._time_grain_functions.copy()
  179. grain_addon_functions = config["TIME_GRAIN_ADDON_FUNCTIONS"]
  180. time_grain_functions.update(grain_addon_functions.get(cls.engine, {}))
  181. blacklist: List[str] = config["TIME_GRAIN_BLACKLIST"]
  182. for key in blacklist:
  183. time_grain_functions.pop(key)
  184. return time_grain_functions
  185. @classmethod
  186. def make_select_compatible(
  187. cls, groupby_exprs: Dict[str, ColumnElement], select_exprs: List[ColumnElement]
  188. ) -> List[ColumnElement]:
  189. """
  190. Some databases will just return the group-by field into the select, but don't
  191. allow the group-by field to be put into the select list.
  192. :param groupby_exprs: mapping between column name and column object
  193. :param select_exprs: all columns in the select clause
  194. :return: columns to be included in the final select clause
  195. """
  196. return select_exprs
  197. @classmethod
  198. def fetch_data(cls, cursor: Any, limit: int) -> List[Tuple]:
  199. """
  200. :param cursor: Cursor instance
  201. :param limit: Maximum number of rows to be returned by the cursor
  202. :return: Result of query
  203. """
  204. if cls.arraysize:
  205. cursor.arraysize = cls.arraysize
  206. if cls.limit_method == LimitMethod.FETCH_MANY:
  207. return cursor.fetchmany(limit)
  208. return cursor.fetchall()
  209. @classmethod
  210. def expand_data(
  211. cls, columns: List[dict], data: List[dict]
  212. ) -> Tuple[List[dict], List[dict], List[dict]]:
  213. """
  214. Some engines support expanding nested fields. See implementation in Presto
  215. spec for details.
  216. :param columns: columns selected in the query
  217. :param data: original data set
  218. :return: list of all columns(selected columns and their nested fields),
  219. expanded data set, listed of nested fields
  220. """
  221. return columns, data, []
  222. @classmethod
  223. def alter_new_orm_column(cls, orm_col: "TableColumn") -> None:
  224. """Allow altering default column attributes when first detected/added
  225. For instance special column like `__time` for Druid can be
  226. set to is_dttm=True. Note that this only gets called when new
  227. columns are detected/created"""
  228. # TODO: Fix circular import caused by importing TableColumn
  229. pass
  230. @classmethod
  231. def epoch_to_dttm(cls) -> str:
  232. """
  233. SQL expression that converts epoch (seconds) to datetime that can be used in a
  234. query. The reference column should be denoted as `{col}` in the return
  235. expression, e.g. "FROM_UNIXTIME({col})"
  236. :return: SQL Expression
  237. """
  238. raise NotImplementedError()
  239. @classmethod
  240. def epoch_ms_to_dttm(cls) -> str:
  241. """
  242. SQL expression that converts epoch (milliseconds) to datetime that can be used
  243. in a query.
  244. :return: SQL Expression
  245. """
  246. return cls.epoch_to_dttm().replace("{col}", "({col}/1000)")
  247. @classmethod
  248. def get_datatype(cls, type_code: Any) -> Optional[str]:
  249. """
  250. Change column type code from cursor description to string representation.
  251. :param type_code: Type code from cursor description
  252. :return: String representation of type code
  253. """
  254. if isinstance(type_code, str) and type_code != "":
  255. return type_code.upper()
  256. return None
  257. @classmethod
  258. def extra_table_metadata(
  259. cls, database: "Database", table_name: str, schema_name: str
  260. ) -> Dict[str, Any]:
  261. """
  262. Returns engine-specific table metadata
  263. :param database: Database instance
  264. :param table_name: Table name
  265. :param schema_name: Schema name
  266. :return: Engine-specific table metadata
  267. """
  268. # TODO: Fix circular import caused by importing Database
  269. return {}
  270. @classmethod
  271. def apply_limit_to_sql(cls, sql: str, limit: int, database: "Database") -> str:
  272. """
  273. Alters the SQL statement to apply a LIMIT clause
  274. :param sql: SQL query
  275. :param limit: Maximum number of rows to be returned by the query
  276. :param database: Database instance
  277. :return: SQL query with limit clause
  278. """
  279. # TODO: Fix circular import caused by importing Database
  280. if cls.limit_method == LimitMethod.WRAP_SQL:
  281. sql = sql.strip("\t\n ;")
  282. qry = (
  283. select("*")
  284. .select_from(TextAsFrom(text(sql), ["*"]).alias("inner_qry"))
  285. .limit(limit)
  286. )
  287. return database.compile_sqla_query(qry)
  288. elif LimitMethod.FORCE_LIMIT:
  289. parsed_query = sql_parse.ParsedQuery(sql)
  290. sql = parsed_query.get_query_with_new_limit(limit)
  291. return sql
  292. @classmethod
  293. def get_limit_from_sql(cls, sql: str) -> Optional[int]:
  294. """
  295. Extract limit from SQL query
  296. :param sql: SQL query
  297. :return: Value of limit clause in query
  298. """
  299. parsed_query = sql_parse.ParsedQuery(sql)
  300. return parsed_query.limit
  301. @classmethod
  302. def get_query_with_new_limit(cls, sql: str, limit: int) -> str:
  303. """
  304. Create a query based on original query but with new limit clause
  305. :param sql: SQL query
  306. :param limit: New limit to insert/replace into query
  307. :return: Query with new limit
  308. """
  309. parsed_query = sql_parse.ParsedQuery(sql)
  310. return parsed_query.get_query_with_new_limit(limit)
  311. @staticmethod
  312. def csv_to_df(**kwargs: Any) -> pd.DataFrame:
  313. """ Read csv into Pandas DataFrame
  314. :param kwargs: params to be passed to DataFrame.read_csv
  315. :return: Pandas DataFrame containing data from csv
  316. """
  317. kwargs["encoding"] = "utf-8"
  318. kwargs["iterator"] = True
  319. chunks = pd.read_csv(**kwargs)
  320. df = pd.concat(chunk for chunk in chunks)
  321. return df
  322. @classmethod
  323. def df_to_sql( # pylint: disable=invalid-name
  324. cls, df: pd.DataFrame, **kwargs: Any
  325. ) -> None:
  326. """ Upload data from a Pandas DataFrame to a database. For
  327. regular engines this calls the DataFrame.to_sql() method. Can be
  328. overridden for engines that don't work well with to_sql(), e.g.
  329. BigQuery.
  330. :param df: Dataframe with data to be uploaded
  331. :param kwargs: kwargs to be passed to to_sql() method
  332. """
  333. df.to_sql(**kwargs)
  334. @classmethod
  335. def create_table_from_csv(cls, form: Form, database: "Database") -> None:
  336. """
  337. Create table from contents of a csv. Note: this method does not create
  338. metadata for the table.
  339. :param form: Parameters defining how to process data
  340. :param database: Database model object for the target database
  341. """
  342. def _allowed_file(filename: str) -> bool:
  343. # Only allow specific file extensions as specified in the config
  344. extension = os.path.splitext(filename)[1].lower()
  345. return (
  346. extension is not None and extension[1:] in config["ALLOWED_EXTENSIONS"]
  347. )
  348. filename = form.csv_file.data.filename
  349. if not _allowed_file(filename):
  350. raise Exception("Invalid file type selected")
  351. csv_to_df_kwargs = {
  352. "filepath_or_buffer": filename,
  353. "sep": form.sep.data,
  354. "header": form.header.data if form.header.data else 0,
  355. "index_col": form.index_col.data,
  356. "mangle_dupe_cols": form.mangle_dupe_cols.data,
  357. "skipinitialspace": form.skipinitialspace.data,
  358. "skiprows": form.skiprows.data,
  359. "nrows": form.nrows.data,
  360. "skip_blank_lines": form.skip_blank_lines.data,
  361. "parse_dates": form.parse_dates.data,
  362. "infer_datetime_format": form.infer_datetime_format.data,
  363. "chunksize": 10000,
  364. }
  365. df = cls.csv_to_df(**csv_to_df_kwargs)
  366. engine = cls.get_engine(database)
  367. df_to_sql_kwargs = {
  368. "df": df,
  369. "name": form.name.data,
  370. "con": engine,
  371. "schema": form.schema.data,
  372. "if_exists": form.if_exists.data,
  373. "index": form.index.data,
  374. "index_label": form.index_label.data,
  375. "chunksize": 10000,
  376. }
  377. cls.df_to_sql(**df_to_sql_kwargs)
  378. @classmethod
  379. def convert_dttm(cls, target_type: str, dttm: datetime) -> Optional[str]:
  380. """
  381. Convert Python datetime object to a SQL expression
  382. :param target_type: The target type of expression
  383. :param dttm: The datetime object
  384. :return: The SQL expression
  385. """
  386. return None
  387. @classmethod
  388. def get_all_datasource_names(
  389. cls, database: "Database", datasource_type: str
  390. ) -> List[utils.DatasourceName]:
  391. """Returns a list of all tables or views in database.
  392. :param database: Database instance
  393. :param datasource_type: Datasource_type can be 'table' or 'view'
  394. :return: List of all datasources in database or schema
  395. """
  396. # TODO: Fix circular import caused by importing Database
  397. schemas = database.get_all_schema_names(
  398. cache=database.schema_cache_enabled,
  399. cache_timeout=database.schema_cache_timeout,
  400. force=True,
  401. )
  402. all_datasources: List[utils.DatasourceName] = []
  403. for schema in schemas:
  404. if datasource_type == "table":
  405. all_datasources += database.get_all_table_names_in_schema(
  406. schema=schema,
  407. force=True,
  408. cache=database.table_cache_enabled,
  409. cache_timeout=database.table_cache_timeout,
  410. )
  411. elif datasource_type == "view":
  412. all_datasources += database.get_all_view_names_in_schema(
  413. schema=schema,
  414. force=True,
  415. cache=database.table_cache_enabled,
  416. cache_timeout=database.table_cache_timeout,
  417. )
  418. else:
  419. raise Exception(f"Unsupported datasource_type: {datasource_type}")
  420. return all_datasources
  421. @classmethod
  422. def handle_cursor(cls, cursor: Any, query: Query, session: Session) -> None:
  423. """Handle a live cursor between the execute and fetchall calls
  424. The flow works without this method doing anything, but it allows
  425. for handling the cursor and updating progress information in the
  426. query object"""
  427. # TODO: Fix circular import error caused by importing sql_lab.Query
  428. pass
  429. @classmethod
  430. def extract_error_message(cls, e: Exception) -> str:
  431. return f"{cls.engine} error: {cls._extract_error_message(e)}"
  432. @classmethod
  433. def _extract_error_message(cls, e: Exception) -> Optional[str]:
  434. """Extract error message for queries"""
  435. return utils.error_msg_from_exception(e)
  436. @classmethod
  437. def adjust_database_uri(cls, uri: URL, selected_schema: Optional[str]) -> None:
  438. """
  439. Mutate the database component of the SQLAlchemy URI.
  440. The URI here represents the URI as entered when saving the database,
  441. ``selected_schema`` is the schema currently active presumably in
  442. the SQL Lab dropdown. Based on that, for some database engine,
  443. we can return a new altered URI that connects straight to the
  444. active schema, meaning the users won't have to prefix the object
  445. names by the schema name.
  446. Some databases engines have 2 level of namespacing: database and
  447. schema (postgres, oracle, mssql, ...)
  448. For those it's probably better to not alter the database
  449. component of the URI with the schema name, it won't work.
  450. Some database drivers like presto accept '{catalog}/{schema}' in
  451. the database component of the URL, that can be handled here.
  452. """
  453. pass
  454. @classmethod
  455. def patch(cls) -> None:
  456. """
  457. TODO: Improve docstring and refactor implementation in Hive
  458. """
  459. pass
  460. @classmethod
  461. def get_schema_names(cls, inspector: Inspector) -> List[str]:
  462. """
  463. Get all schemas from database
  464. :param inspector: SqlAlchemy inspector
  465. :return: All schemas in the database
  466. """
  467. return sorted(inspector.get_schema_names())
  468. @classmethod
  469. def get_table_names(
  470. cls, database: "Database", inspector: Inspector, schema: Optional[str]
  471. ) -> List[str]:
  472. """
  473. Get all tables from schema
  474. :param inspector: SqlAlchemy inspector
  475. :param schema: Schema to inspect. If omitted, uses default schema for database
  476. :return: All tables in schema
  477. """
  478. tables = inspector.get_table_names(schema)
  479. if schema and cls.try_remove_schema_from_table_name:
  480. tables = [re.sub(f"^{schema}\\.", "", table) for table in tables]
  481. return sorted(tables)
  482. @classmethod
  483. def get_view_names(
  484. cls, database: "Database", inspector: Inspector, schema: Optional[str]
  485. ) -> List[str]:
  486. """
  487. Get all views from schema
  488. :param inspector: SqlAlchemy inspector
  489. :param schema: Schema name. If omitted, uses default schema for database
  490. :return: All views in schema
  491. """
  492. views = inspector.get_view_names(schema)
  493. if schema and cls.try_remove_schema_from_table_name:
  494. views = [re.sub(f"^{schema}\\.", "", view) for view in views]
  495. return sorted(views)
  496. @classmethod
  497. def get_columns(
  498. cls, inspector: Inspector, table_name: str, schema: Optional[str]
  499. ) -> List[Dict[str, Any]]:
  500. """
  501. Get all columns from a given schema and table
  502. :param inspector: SqlAlchemy Inspector instance
  503. :param table_name: Table name
  504. :param schema: Schema name. If omitted, uses default schema for database
  505. :return: All columns in table
  506. """
  507. return inspector.get_columns(table_name, schema)
  508. @classmethod
  509. def where_latest_partition( # pylint: disable=too-many-arguments
  510. cls,
  511. table_name: str,
  512. schema: Optional[str],
  513. database: "Database",
  514. query: Select,
  515. columns: Optional[List] = None,
  516. ) -> Optional[Select]:
  517. """
  518. Add a where clause to a query to reference only the most recent partition
  519. :param table_name: Table name
  520. :param schema: Schema name
  521. :param database: Database instance
  522. :param query: SqlAlchemy query
  523. :param columns: List of TableColumns
  524. :return: SqlAlchemy query with additional where clause referencing latest
  525. partition
  526. """
  527. # TODO: Fix circular import caused by importing Database, TableColumn
  528. return None
  529. @classmethod
  530. def _get_fields(cls, cols: List[Dict[str, Any]]) -> List[Any]:
  531. return [column(c["name"]) for c in cols]
  532. @classmethod
  533. def select_star( # pylint: disable=too-many-arguments,too-many-locals
  534. cls,
  535. database: "Database",
  536. table_name: str,
  537. engine: Engine,
  538. schema: Optional[str] = None,
  539. limit: int = 100,
  540. show_cols: bool = False,
  541. indent: bool = True,
  542. latest_partition: bool = True,
  543. cols: Optional[List[Dict[str, Any]]] = None,
  544. ) -> str:
  545. """
  546. Generate a "SELECT * from [schema.]table_name" query with appropriate limit.
  547. :param database: Database instance
  548. :param table_name: Table name
  549. :param engine: SqlALchemy Engine instance
  550. :param schema: Schema
  551. :param limit: limit to impose on query
  552. :param show_cols: Show columns in query; otherwise use "*"
  553. :param indent: Add indentation to query
  554. :param latest_partition: Only query latest partition
  555. :param cols: Columns to include in query
  556. :return: SQL query
  557. """
  558. fields: Union[str, List[Any]] = "*"
  559. cols = cols or []
  560. if (show_cols or latest_partition) and not cols:
  561. cols = database.get_columns(table_name, schema)
  562. if show_cols:
  563. fields = cls._get_fields(cols)
  564. quote = engine.dialect.identifier_preparer.quote
  565. if schema:
  566. full_table_name = quote(schema) + "." + quote(table_name)
  567. else:
  568. full_table_name = quote(table_name)
  569. qry = select(fields).select_from(text(full_table_name))
  570. if limit:
  571. qry = qry.limit(limit)
  572. if latest_partition:
  573. partition_query = cls.where_latest_partition(
  574. table_name, schema, database, qry, columns=cols
  575. )
  576. if partition_query is not None:
  577. qry = partition_query
  578. sql = database.compile_sqla_query(qry)
  579. if indent:
  580. sql = sqlparse.format(sql, reindent=True)
  581. return sql
  582. @classmethod
  583. def estimate_statement_cost(
  584. cls, statement: str, database: "Database", cursor: Any, user_name: str
  585. ) -> Dict[str, Any]:
  586. """
  587. Generate a SQL query that estimates the cost of a given statement.
  588. :param statement: A single SQL statement
  589. :param database: Database instance
  590. :param cursor: Cursor instance
  591. :param username: Effective username
  592. :return: Dictionary with different costs
  593. """
  594. raise Exception("Database does not support cost estimation")
  595. @classmethod
  596. def query_cost_formatter(
  597. cls, raw_cost: List[Dict[str, Any]]
  598. ) -> List[Dict[str, str]]:
  599. """
  600. Format cost estimate.
  601. :param raw_cost: Raw estimate from `estimate_query_cost`
  602. :return: Human readable cost estimate
  603. """
  604. raise Exception("Database does not support cost estimation")
  605. @classmethod
  606. def estimate_query_cost(
  607. cls, database: "Database", schema: str, sql: str, source: Optional[str] = None
  608. ) -> List[Dict[str, str]]:
  609. """
  610. Estimate the cost of a multiple statement SQL query.
  611. :param database: Database instance
  612. :param schema: Database schema
  613. :param sql: SQL query with possibly multiple statements
  614. :param source: Source of the query (eg, "sql_lab")
  615. """
  616. database_version = database.get_extra().get("version")
  617. if not cls.get_allow_cost_estimate(database_version):
  618. raise Exception("Database does not support cost estimation")
  619. user_name = g.user.username if g.user else None
  620. parsed_query = sql_parse.ParsedQuery(sql)
  621. statements = parsed_query.get_statements()
  622. engine = cls.get_engine(database, schema=schema, source=source)
  623. costs = []
  624. with closing(engine.raw_connection()) as conn:
  625. with closing(conn.cursor()) as cursor:
  626. for statement in statements:
  627. costs.append(
  628. cls.estimate_statement_cost(
  629. statement, database, cursor, user_name
  630. )
  631. )
  632. return costs
  633. @classmethod
  634. def modify_url_for_impersonation(
  635. cls, url: URL, impersonate_user: bool, username: Optional[str]
  636. ) -> None:
  637. """
  638. Modify the SQL Alchemy URL object with the user to impersonate if applicable.
  639. :param url: SQLAlchemy URL object
  640. :param impersonate_user: Flag indicating if impersonation is enabled
  641. :param username: Effective username
  642. """
  643. if impersonate_user and username is not None:
  644. url.username = username
  645. @classmethod
  646. def get_configuration_for_impersonation( # pylint: disable=invalid-name
  647. cls, uri: str, impersonate_user: bool, username: Optional[str]
  648. ) -> Dict[str, str]:
  649. """
  650. Return a configuration dictionary that can be merged with other configs
  651. that can set the correct properties for impersonating users
  652. :param uri: URI
  653. :param impersonate_user: Flag indicating if impersonation is enabled
  654. :param username: Effective username
  655. :return: Configs required for impersonation
  656. """
  657. return {}
  658. @classmethod
  659. def execute(cls, cursor: Any, query: str, **kwargs: Any) -> None:
  660. """
  661. Execute a SQL query
  662. :param cursor: Cursor instance
  663. :param query: Query to execute
  664. :param kwargs: kwargs to be passed to cursor.execute()
  665. :return:
  666. """
  667. if cls.arraysize:
  668. cursor.arraysize = cls.arraysize
  669. cursor.execute(query)
  670. @classmethod
  671. def make_label_compatible(cls, label: str) -> Union[str, quoted_name]:
  672. """
  673. Conditionally mutate and/or quote a sqlalchemy expression label. If
  674. force_column_alias_quotes is set to True, return the label as a
  675. sqlalchemy.sql.elements.quoted_name object to ensure that the select query
  676. and query results have same case. Otherwise return the mutated label as a
  677. regular string. If maxmimum supported column name length is exceeded,
  678. generate a truncated label by calling truncate_label().
  679. :param label: expected expression label/alias
  680. :return: conditionally mutated label supported by the db engine
  681. """
  682. label_mutated = cls._mutate_label(label)
  683. if (
  684. cls.max_column_name_length
  685. and len(label_mutated) > cls.max_column_name_length
  686. ):
  687. label_mutated = cls._truncate_label(label)
  688. if cls.force_column_alias_quotes:
  689. label_mutated = quoted_name(label_mutated, True)
  690. return label_mutated
  691. @classmethod
  692. def get_sqla_column_type(cls, type_: str) -> Optional[TypeEngine]:
  693. """
  694. Return a sqlalchemy native column type that corresponds to the column type
  695. defined in the data source (return None to use default type inferred by
  696. SQLAlchemy). Needs to be overridden if column requires special handling
  697. (see MSSQL for example of NCHAR/NVARCHAR handling).
  698. :param type_: Column type returned by inspector
  699. :return: SqlAlchemy column type
  700. """
  701. return None
  702. @staticmethod
  703. def _mutate_label(label: str) -> str:
  704. """
  705. Most engines support mixed case aliases that can include numbers
  706. and special characters, like commas, parentheses etc. For engines that
  707. have restrictions on what types of aliases are supported, this method
  708. can be overridden to ensure that labels conform to the engine's
  709. limitations. Mutated labels should be deterministic (input label A always
  710. yields output label X) and unique (input labels A and B don't yield the same
  711. output label X).
  712. :param label: Preferred expression label
  713. :return: Conditionally mutated label
  714. """
  715. return label
  716. @classmethod
  717. def _truncate_label(cls, label: str) -> str:
  718. """
  719. In the case that a label exceeds the max length supported by the engine,
  720. this method is used to construct a deterministic and unique label based on
  721. the original label. By default this returns an md5 hash of the original label,
  722. conditionally truncated if the length of the hash exceeds the max column length
  723. of the engine.
  724. :param label: Expected expression label
  725. :return: Truncated label
  726. """
  727. label = hashlib.md5(label.encode("utf-8")).hexdigest()
  728. # truncate hash if it exceeds max length
  729. if cls.max_column_name_length and len(label) > cls.max_column_name_length:
  730. label = label[: cls.max_column_name_length]
  731. return label
  732. @classmethod
  733. def column_datatype_to_string(
  734. cls, sqla_column_type: TypeEngine, dialect: Dialect
  735. ) -> str:
  736. """
  737. Convert sqlalchemy column type to string representation.
  738. Can be overridden to remove unnecessary details, especially
  739. collation info (see mysql, mssql).
  740. :param sqla_column_type: SqlAlchemy column type
  741. :param dialect: Sqlalchemy dialect
  742. :return: Compiled column type
  743. """
  744. return sqla_column_type.compile(dialect=dialect).upper()
  745. @classmethod
  746. def get_function_names(cls, database: "Database") -> List[str]:
  747. """
  748. Get a list of function names that are able to be called on the database.
  749. Used for SQL Lab autocomplete.
  750. :param database: The database to get functions for
  751. :return: A list of function names useable in the database
  752. """
  753. return []
  754. @staticmethod
  755. def pyodbc_rows_to_tuples(data: List[Any]) -> List[Tuple]:
  756. """
  757. Convert pyodbc.Row objects from `fetch_data` to tuples.
  758. :param data: List of tuples or pyodbc.Row objects
  759. :return: List of tuples
  760. """
  761. if data and type(data[0]).__name__ == "Row":
  762. data = [tuple(row) for row in data]
  763. return data