cache.py 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295
  1. # Licensed to the Apache Software Foundation (ASF) under one
  2. # or more contributor license agreements. See the NOTICE file
  3. # distributed with this work for additional information
  4. # regarding copyright ownership. The ASF licenses this file
  5. # to you under the Apache License, Version 2.0 (the
  6. # "License"); you may not use this file except in compliance
  7. # with the License. You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing,
  12. # software distributed under the License is distributed on an
  13. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  14. # KIND, either express or implied. See the License for the
  15. # specific language governing permissions and limitations
  16. # under the License.
  17. # pylint: disable=too-few-public-methods
  18. import json
  19. import logging
  20. from urllib import request
  21. from urllib.error import URLError
  22. from celery.utils.log import get_task_logger
  23. from sqlalchemy import and_, func
  24. from superset import app, db
  25. from superset.extensions import celery_app
  26. from superset.models.core import Log
  27. from superset.models.dashboard import Dashboard
  28. from superset.models.slice import Slice
  29. from superset.models.tags import Tag, TaggedObject
  30. from superset.utils.core import parse_human_datetime
  31. logger = get_task_logger(__name__)
  32. logger.setLevel(logging.INFO)
  33. def get_form_data(chart_id, dashboard=None):
  34. """
  35. Build `form_data` for chart GET request from dashboard's `default_filters`.
  36. When a dashboard has `default_filters` they need to be added as extra
  37. filters in the GET request for charts.
  38. """
  39. form_data = {"slice_id": chart_id}
  40. if dashboard is None or not dashboard.json_metadata:
  41. return form_data
  42. json_metadata = json.loads(dashboard.json_metadata)
  43. # do not apply filters if chart is immune to them
  44. if chart_id in json_metadata.get("filter_immune_slices", []):
  45. return form_data
  46. default_filters = json.loads(json_metadata.get("default_filters", "null"))
  47. if not default_filters:
  48. return form_data
  49. # are some of the fields in the chart immune to filters?
  50. filter_immune_slice_fields = json_metadata.get("filter_immune_slice_fields", {})
  51. immune_fields = filter_immune_slice_fields.get(str(chart_id), [])
  52. extra_filters = []
  53. for filters in default_filters.values():
  54. for col, val in filters.items():
  55. if col not in immune_fields:
  56. extra_filters.append({"col": col, "op": "in", "val": val})
  57. if extra_filters:
  58. form_data["extra_filters"] = extra_filters
  59. return form_data
  60. def get_url(chart):
  61. """Return external URL for warming up a given chart/table cache."""
  62. with app.test_request_context():
  63. baseurl = (
  64. "{SUPERSET_WEBSERVER_PROTOCOL}://"
  65. "{SUPERSET_WEBSERVER_ADDRESS}:"
  66. "{SUPERSET_WEBSERVER_PORT}".format(**app.config)
  67. )
  68. return f"{baseurl}{chart.url}"
  69. class Strategy:
  70. """
  71. A cache warm up strategy.
  72. Each strategy defines a `get_urls` method that returns a list of URLs to
  73. be fetched from the `/superset/warm_up_cache/` endpoint.
  74. Strategies can be configured in `superset/config.py`:
  75. CELERYBEAT_SCHEDULE = {
  76. 'cache-warmup-hourly': {
  77. 'task': 'cache-warmup',
  78. 'schedule': crontab(minute=1, hour='*'), # @hourly
  79. 'kwargs': {
  80. 'strategy_name': 'top_n_dashboards',
  81. 'top_n': 10,
  82. 'since': '7 days ago',
  83. },
  84. },
  85. }
  86. """
  87. def __init__(self):
  88. pass
  89. def get_urls(self):
  90. raise NotImplementedError("Subclasses must implement get_urls!")
  91. class DummyStrategy(Strategy):
  92. """
  93. Warm up all charts.
  94. This is a dummy strategy that will fetch all charts. Can be configured by:
  95. CELERYBEAT_SCHEDULE = {
  96. 'cache-warmup-hourly': {
  97. 'task': 'cache-warmup',
  98. 'schedule': crontab(minute=1, hour='*'), # @hourly
  99. 'kwargs': {'strategy_name': 'dummy'},
  100. },
  101. }
  102. """
  103. name = "dummy"
  104. def get_urls(self):
  105. session = db.create_scoped_session()
  106. charts = session.query(Slice).all()
  107. return [get_url(chart) for chart in charts]
  108. class TopNDashboardsStrategy(Strategy):
  109. """
  110. Warm up charts in the top-n dashboards.
  111. CELERYBEAT_SCHEDULE = {
  112. 'cache-warmup-hourly': {
  113. 'task': 'cache-warmup',
  114. 'schedule': crontab(minute=1, hour='*'), # @hourly
  115. 'kwargs': {
  116. 'strategy_name': 'top_n_dashboards',
  117. 'top_n': 5,
  118. 'since': '7 days ago',
  119. },
  120. },
  121. }
  122. """
  123. name = "top_n_dashboards"
  124. def __init__(self, top_n=5, since="7 days ago"):
  125. super(TopNDashboardsStrategy, self).__init__()
  126. self.top_n = top_n
  127. self.since = parse_human_datetime(since)
  128. def get_urls(self):
  129. urls = []
  130. session = db.create_scoped_session()
  131. records = (
  132. session.query(Log.dashboard_id, func.count(Log.dashboard_id))
  133. .filter(and_(Log.dashboard_id.isnot(None), Log.dttm >= self.since))
  134. .group_by(Log.dashboard_id)
  135. .order_by(func.count(Log.dashboard_id).desc())
  136. .limit(self.top_n)
  137. .all()
  138. )
  139. dash_ids = [record.dashboard_id for record in records]
  140. dashboards = session.query(Dashboard).filter(Dashboard.id.in_(dash_ids)).all()
  141. for dashboard in dashboards:
  142. for chart in dashboard.slices:
  143. urls.append(get_url(chart))
  144. return urls
  145. class DashboardTagsStrategy(Strategy):
  146. """
  147. Warm up charts in dashboards with custom tags.
  148. CELERYBEAT_SCHEDULE = {
  149. 'cache-warmup-hourly': {
  150. 'task': 'cache-warmup',
  151. 'schedule': crontab(minute=1, hour='*'), # @hourly
  152. 'kwargs': {
  153. 'strategy_name': 'dashboard_tags',
  154. 'tags': ['core', 'warmup'],
  155. },
  156. },
  157. }
  158. """
  159. name = "dashboard_tags"
  160. def __init__(self, tags=None):
  161. super(DashboardTagsStrategy, self).__init__()
  162. self.tags = tags or []
  163. def get_urls(self):
  164. urls = []
  165. session = db.create_scoped_session()
  166. tags = session.query(Tag).filter(Tag.name.in_(self.tags)).all()
  167. tag_ids = [tag.id for tag in tags]
  168. # add dashboards that are tagged
  169. tagged_objects = (
  170. session.query(TaggedObject)
  171. .filter(
  172. and_(
  173. TaggedObject.object_type == "dashboard",
  174. TaggedObject.tag_id.in_(tag_ids),
  175. )
  176. )
  177. .all()
  178. )
  179. dash_ids = [tagged_object.object_id for tagged_object in tagged_objects]
  180. tagged_dashboards = session.query(Dashboard).filter(Dashboard.id.in_(dash_ids))
  181. for dashboard in tagged_dashboards:
  182. for chart in dashboard.slices:
  183. urls.append(get_url(chart))
  184. # add charts that are tagged
  185. tagged_objects = (
  186. session.query(TaggedObject)
  187. .filter(
  188. and_(
  189. TaggedObject.object_type == "chart",
  190. TaggedObject.tag_id.in_(tag_ids),
  191. )
  192. )
  193. .all()
  194. )
  195. chart_ids = [tagged_object.object_id for tagged_object in tagged_objects]
  196. tagged_charts = session.query(Slice).filter(Slice.id.in_(chart_ids))
  197. for chart in tagged_charts:
  198. urls.append(get_url(chart))
  199. return urls
  200. strategies = [DummyStrategy, TopNDashboardsStrategy, DashboardTagsStrategy]
  201. @celery_app.task(name="cache-warmup")
  202. def cache_warmup(strategy_name, *args, **kwargs):
  203. """
  204. Warm up cache.
  205. This task periodically hits charts to warm up the cache.
  206. """
  207. logger.info("Loading strategy")
  208. class_ = None
  209. for class_ in strategies:
  210. if class_.name == strategy_name:
  211. break
  212. else:
  213. message = f"No strategy {strategy_name} found!"
  214. logger.error(message)
  215. return message
  216. logger.info(f"Loading {class_.__name__}")
  217. try:
  218. strategy = class_(*args, **kwargs)
  219. logger.info("Success!")
  220. except TypeError:
  221. message = "Error loading strategy!"
  222. logger.exception(message)
  223. return message
  224. results = {"success": [], "errors": []}
  225. for url in strategy.get_urls():
  226. try:
  227. logger.info(f"Fetching {url}")
  228. request.urlopen(url)
  229. results["success"].append(url)
  230. except URLError:
  231. logger.exception("Error warming up cache!")
  232. results["errors"].append(url)
  233. return results