query_context.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252
  1. # Licensed to the Apache Software Foundation (ASF) under one
  2. # or more contributor license agreements. See the NOTICE file
  3. # distributed with this work for additional information
  4. # regarding copyright ownership. The ASF licenses this file
  5. # to you under the Apache License, Version 2.0 (the
  6. # "License"); you may not use this file except in compliance
  7. # with the License. You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing,
  12. # software distributed under the License is distributed on an
  13. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  14. # KIND, either express or implied. See the License for the
  15. # specific language governing permissions and limitations
  16. # under the License.
  17. import logging
  18. import pickle as pkl
  19. from datetime import datetime, timedelta
  20. from typing import Any, ClassVar, Dict, List, Optional
  21. import numpy as np
  22. import pandas as pd
  23. from superset import app, cache, db
  24. from superset.connectors.base.models import BaseDatasource
  25. from superset.connectors.connector_registry import ConnectorRegistry
  26. from superset.stats_logger import BaseStatsLogger
  27. from superset.utils import core as utils
  28. from superset.utils.core import DTTM_ALIAS
  29. from .query_object import QueryObject
  30. config = app.config
  31. stats_logger: BaseStatsLogger = config["STATS_LOGGER"]
  32. logger = logging.getLogger(__name__)
  33. class QueryContext:
  34. """
  35. The query context contains the query object and additional fields necessary
  36. to retrieve the data payload for a given viz.
  37. """
  38. cache_type: ClassVar[str] = "df"
  39. enforce_numerical_metrics: ClassVar[bool] = True
  40. datasource: BaseDatasource
  41. queries: List[QueryObject]
  42. force: bool
  43. custom_cache_timeout: Optional[int]
  44. # TODO: Type datasource and query_object dictionary with TypedDict when it becomes
  45. # a vanilla python type https://github.com/python/mypy/issues/5288
  46. def __init__(
  47. self,
  48. datasource: Dict[str, Any],
  49. queries: List[Dict[str, Any]],
  50. force: bool = False,
  51. custom_cache_timeout: Optional[int] = None,
  52. ) -> None:
  53. self.datasource = ConnectorRegistry.get_datasource(
  54. str(datasource["type"]), int(datasource["id"]), db.session
  55. )
  56. self.queries = [QueryObject(**query_obj) for query_obj in queries]
  57. self.force = force
  58. self.custom_cache_timeout = custom_cache_timeout
  59. def get_query_result(self, query_object: QueryObject) -> Dict[str, Any]:
  60. """Returns a pandas dataframe based on the query object"""
  61. # Here, we assume that all the queries will use the same datasource, which is
  62. # is a valid assumption for current setting. In a long term, we may or maynot
  63. # support multiple queries from different data source.
  64. timestamp_format = None
  65. if self.datasource.type == "table":
  66. dttm_col = self.datasource.get_column(query_object.granularity)
  67. if dttm_col:
  68. timestamp_format = dttm_col.python_date_format
  69. # The datasource here can be different backend but the interface is common
  70. result = self.datasource.query(query_object.to_dict())
  71. df = result.df
  72. # Transform the timestamp we received from database to pandas supported
  73. # datetime format. If no python_date_format is specified, the pattern will
  74. # be considered as the default ISO date format
  75. # If the datetime format is unix, the parse will use the corresponding
  76. # parsing logic
  77. if not df.empty:
  78. if DTTM_ALIAS in df.columns:
  79. if timestamp_format in ("epoch_s", "epoch_ms"):
  80. # Column has already been formatted as a timestamp.
  81. df[DTTM_ALIAS] = df[DTTM_ALIAS].apply(pd.Timestamp)
  82. else:
  83. df[DTTM_ALIAS] = pd.to_datetime(
  84. df[DTTM_ALIAS], utc=False, format=timestamp_format
  85. )
  86. if self.datasource.offset:
  87. df[DTTM_ALIAS] += timedelta(hours=self.datasource.offset)
  88. df[DTTM_ALIAS] += query_object.time_shift
  89. if self.enforce_numerical_metrics:
  90. self.df_metrics_to_num(df, query_object)
  91. df.replace([np.inf, -np.inf], np.nan)
  92. return {
  93. "query": result.query,
  94. "status": result.status,
  95. "error_message": result.error_message,
  96. "df": df,
  97. }
  98. @staticmethod
  99. def df_metrics_to_num( # pylint: disable=invalid-name,no-self-use
  100. df: pd.DataFrame, query_object: QueryObject
  101. ) -> None:
  102. """Converting metrics to numeric when pandas.read_sql cannot"""
  103. for col, dtype in df.dtypes.items():
  104. if dtype.type == np.object_ and col in query_object.metrics:
  105. df[col] = pd.to_numeric(df[col], errors="coerce")
  106. @staticmethod
  107. def get_data( # pylint: disable=invalid-name,no-self-use
  108. df: pd.DataFrame
  109. ) -> List[Dict]:
  110. return df.to_dict(orient="records")
  111. def get_single_payload(self, query_obj: QueryObject) -> Dict[str, Any]:
  112. """Returns a payload of metadata and data"""
  113. payload = self.get_df_payload(query_obj)
  114. df = payload["df"]
  115. status = payload["status"]
  116. if status != utils.QueryStatus.FAILED:
  117. if df.empty:
  118. payload["error"] = "No data"
  119. else:
  120. payload["data"] = self.get_data(df)
  121. del payload["df"]
  122. return payload
  123. def get_payload(self) -> List[Dict[str, Any]]:
  124. """Get all the payloads from the arrays"""
  125. return [self.get_single_payload(query_object) for query_object in self.queries]
  126. @property
  127. def cache_timeout(self) -> int:
  128. if self.custom_cache_timeout is not None:
  129. return self.custom_cache_timeout
  130. if self.datasource.cache_timeout is not None:
  131. return self.datasource.cache_timeout
  132. if (
  133. hasattr(self.datasource, "database")
  134. and self.datasource.database.cache_timeout
  135. ) is not None:
  136. return self.datasource.database.cache_timeout
  137. return config["CACHE_DEFAULT_TIMEOUT"]
  138. def cache_key(self, query_obj: QueryObject, **kwargs) -> Optional[str]:
  139. extra_cache_keys = self.datasource.get_extra_cache_keys(query_obj.to_dict())
  140. cache_key = (
  141. query_obj.cache_key(
  142. datasource=self.datasource.uid,
  143. extra_cache_keys=extra_cache_keys,
  144. changed_on=self.datasource.changed_on,
  145. **kwargs
  146. )
  147. if query_obj
  148. else None
  149. )
  150. return cache_key
  151. def get_df_payload( # pylint: disable=too-many-locals,too-many-statements
  152. self, query_obj: QueryObject, **kwargs
  153. ) -> Dict[str, Any]:
  154. """Handles caching around the df payload retrieval"""
  155. cache_key = self.cache_key(query_obj, **kwargs)
  156. logger.info("Cache key: %s", cache_key)
  157. is_loaded = False
  158. stacktrace = None
  159. df = pd.DataFrame()
  160. cached_dttm = datetime.utcnow().isoformat().split(".")[0]
  161. cache_value = None
  162. status = None
  163. query = ""
  164. error_message = None
  165. if cache_key and cache and not self.force:
  166. cache_value = cache.get(cache_key)
  167. if cache_value:
  168. stats_logger.incr("loading_from_cache")
  169. try:
  170. cache_value = pkl.loads(cache_value)
  171. df = cache_value["df"]
  172. query = cache_value["query"]
  173. status = utils.QueryStatus.SUCCESS
  174. is_loaded = True
  175. stats_logger.incr("loaded_from_cache")
  176. except Exception as e: # pylint: disable=broad-except
  177. logger.exception(e)
  178. logger.error(
  179. "Error reading cache: %s", utils.error_msg_from_exception(e)
  180. )
  181. logger.info("Serving from cache")
  182. if query_obj and not is_loaded:
  183. try:
  184. query_result = self.get_query_result(query_obj)
  185. status = query_result["status"]
  186. query = query_result["query"]
  187. error_message = query_result["error_message"]
  188. df = query_result["df"]
  189. if status != utils.QueryStatus.FAILED:
  190. stats_logger.incr("loaded_from_source")
  191. is_loaded = True
  192. except Exception as e: # pylint: disable=broad-except
  193. logger.exception(e)
  194. if not error_message:
  195. error_message = "{}".format(e)
  196. status = utils.QueryStatus.FAILED
  197. stacktrace = utils.get_stacktrace()
  198. if is_loaded and cache_key and cache and status != utils.QueryStatus.FAILED:
  199. try:
  200. cache_value = dict(dttm=cached_dttm, df=df, query=query)
  201. cache_binary = pkl.dumps(cache_value, protocol=pkl.HIGHEST_PROTOCOL)
  202. logger.info(
  203. "Caching %d chars at key %s", len(cache_binary), cache_key
  204. )
  205. stats_logger.incr("set_cache_key")
  206. cache.set(cache_key, cache_binary, timeout=self.cache_timeout)
  207. except Exception as e: # pylint: disable=broad-except
  208. # cache.set call can fail if the backend is down or if
  209. # the key is too large or whatever other reasons
  210. logger.warning("Could not cache key %s", cache_key)
  211. logger.exception(e)
  212. cache.delete(cache_key)
  213. return {
  214. "cache_key": cache_key,
  215. "cached_dttm": cache_value["dttm"] if cache_value is not None else None,
  216. "cache_timeout": self.cache_timeout,
  217. "df": df,
  218. "error": error_message,
  219. "is_cached": cache_key is not None,
  220. "query": query,
  221. "status": status,
  222. "stacktrace": stacktrace,
  223. "rowcount": len(df.index),
  224. }