schedules.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434
  1. # Licensed to the Apache Software Foundation (ASF) under one
  2. # or more contributor license agreements. See the NOTICE file
  3. # distributed with this work for additional information
  4. # regarding copyright ownership. The ASF licenses this file
  5. # to you under the Apache License, Version 2.0 (the
  6. # "License"); you may not use this file except in compliance
  7. # with the License. You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing,
  12. # software distributed under the License is distributed on an
  13. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  14. # KIND, either express or implied. See the License for the
  15. # specific language governing permissions and limitations
  16. # under the License.
  17. """Utility functions used across Superset"""
  18. import logging
  19. import time
  20. import urllib.request
  21. from collections import namedtuple
  22. from datetime import datetime, timedelta
  23. from email.utils import make_msgid, parseaddr
  24. from urllib.error import URLError # pylint: disable=ungrouped-imports
  25. import croniter
  26. import simplejson as json
  27. from dateutil.tz import tzlocal
  28. from flask import render_template, Response, session, url_for
  29. from flask_babel import gettext as __
  30. from flask_login import login_user
  31. from retry.api import retry_call
  32. from selenium.common.exceptions import WebDriverException
  33. from selenium.webdriver import chrome, firefox
  34. from werkzeug.http import parse_cookie
  35. # Superset framework imports
  36. from superset import app, db, security_manager
  37. from superset.extensions import celery_app
  38. from superset.models.schedules import (
  39. EmailDeliveryType,
  40. get_scheduler_model,
  41. ScheduleType,
  42. SliceEmailReportFormat,
  43. )
  44. from superset.utils.core import get_email_address_list, send_email_smtp
  45. # Globals
  46. config = app.config
  47. logger = logging.getLogger("tasks.email_reports")
  48. logger.setLevel(logging.INFO)
  49. # Time in seconds, we will wait for the page to load and render
  50. PAGE_RENDER_WAIT = 30
  51. EmailContent = namedtuple("EmailContent", ["body", "data", "images"])
  52. def _get_recipients(schedule):
  53. bcc = config["EMAIL_REPORT_BCC_ADDRESS"]
  54. if schedule.deliver_as_group:
  55. to = schedule.recipients
  56. yield (to, bcc)
  57. else:
  58. for to in get_email_address_list(schedule.recipients):
  59. yield (to, bcc)
  60. def _deliver_email(schedule, subject, email):
  61. for (to, bcc) in _get_recipients(schedule):
  62. send_email_smtp(
  63. to,
  64. subject,
  65. email.body,
  66. config,
  67. data=email.data,
  68. images=email.images,
  69. bcc=bcc,
  70. mime_subtype="related",
  71. dryrun=config["SCHEDULED_EMAIL_DEBUG_MODE"],
  72. )
  73. def _generate_mail_content(schedule, screenshot, name, url):
  74. if schedule.delivery_type == EmailDeliveryType.attachment:
  75. images = None
  76. data = {"screenshot.png": screenshot}
  77. body = __(
  78. '<b><a href="%(url)s">Explore in Superset</a></b><p></p>',
  79. name=name,
  80. url=url,
  81. )
  82. elif schedule.delivery_type == EmailDeliveryType.inline:
  83. # Get the domain from the 'From' address ..
  84. # and make a message id without the < > in the ends
  85. domain = parseaddr(config["SMTP_MAIL_FROM"])[1].split("@")[1]
  86. msgid = make_msgid(domain)[1:-1]
  87. images = {msgid: screenshot}
  88. data = None
  89. body = __(
  90. """
  91. <b><a href="%(url)s">Explore in Superset</a></b><p></p>
  92. <img src="cid:%(msgid)s">
  93. """,
  94. name=name,
  95. url=url,
  96. msgid=msgid,
  97. )
  98. return EmailContent(body, data, images)
  99. def _get_auth_cookies():
  100. # Login with the user specified to get the reports
  101. with app.test_request_context():
  102. user = security_manager.find_user(config["EMAIL_REPORTS_USER"])
  103. login_user(user)
  104. # A mock response object to get the cookie information from
  105. response = Response()
  106. app.session_interface.save_session(app, session, response)
  107. cookies = []
  108. # Set the cookies in the driver
  109. for name, value in response.headers:
  110. if name.lower() == "set-cookie":
  111. cookie = parse_cookie(value)
  112. cookies.append(cookie["session"])
  113. return cookies
  114. def _get_url_path(view, **kwargs):
  115. with app.test_request_context():
  116. return urllib.parse.urljoin(
  117. str(config["WEBDRIVER_BASEURL"]), url_for(view, **kwargs)
  118. )
  119. def create_webdriver():
  120. # Create a webdriver for use in fetching reports
  121. if config["EMAIL_REPORTS_WEBDRIVER"] == "firefox":
  122. driver_class = firefox.webdriver.WebDriver
  123. options = firefox.options.Options()
  124. elif config["EMAIL_REPORTS_WEBDRIVER"] == "chrome":
  125. driver_class = chrome.webdriver.WebDriver
  126. options = chrome.options.Options()
  127. options.add_argument("--headless")
  128. # Prepare args for the webdriver init
  129. kwargs = dict(options=options)
  130. kwargs.update(config["WEBDRIVER_CONFIGURATION"])
  131. # Initialize the driver
  132. driver = driver_class(**kwargs)
  133. # Some webdrivers need an initial hit to the welcome URL
  134. # before we set the cookie
  135. welcome_url = _get_url_path("Superset.welcome")
  136. # Hit the welcome URL and check if we were asked to login
  137. driver.get(welcome_url)
  138. elements = driver.find_elements_by_id("loginbox")
  139. # This indicates that we were not prompted for a login box.
  140. if not elements:
  141. return driver
  142. # Set the cookies in the driver
  143. for cookie in _get_auth_cookies():
  144. info = dict(name="session", value=cookie)
  145. driver.add_cookie(info)
  146. return driver
  147. def destroy_webdriver(driver):
  148. """
  149. Destroy a driver
  150. """
  151. # This is some very flaky code in selenium. Hence the retries
  152. # and catch-all exceptions
  153. try:
  154. retry_call(driver.close, tries=2)
  155. except Exception: # pylint: disable=broad-except
  156. pass
  157. try:
  158. driver.quit()
  159. except Exception: # pylint: disable=broad-except
  160. pass
  161. def deliver_dashboard(schedule):
  162. """
  163. Given a schedule, delivery the dashboard as an email report
  164. """
  165. dashboard = schedule.dashboard
  166. dashboard_url = _get_url_path("Superset.dashboard", dashboard_id=dashboard.id)
  167. # Create a driver, fetch the page, wait for the page to render
  168. driver = create_webdriver()
  169. window = config["WEBDRIVER_WINDOW"]["dashboard"]
  170. driver.set_window_size(*window)
  171. driver.get(dashboard_url)
  172. time.sleep(PAGE_RENDER_WAIT)
  173. # Set up a function to retry once for the element.
  174. # This is buggy in certain selenium versions with firefox driver
  175. get_element = getattr(driver, "find_element_by_class_name")
  176. element = retry_call(
  177. get_element, fargs=["grid-container"], tries=2, delay=PAGE_RENDER_WAIT
  178. )
  179. try:
  180. screenshot = element.screenshot_as_png
  181. except WebDriverException:
  182. # Some webdrivers do not support screenshots for elements.
  183. # In such cases, take a screenshot of the entire page.
  184. screenshot = driver.screenshot() # pylint: disable=no-member
  185. finally:
  186. destroy_webdriver(driver)
  187. # Generate the email body and attachments
  188. email = _generate_mail_content(
  189. schedule, screenshot, dashboard.dashboard_title, dashboard_url
  190. )
  191. subject = __(
  192. "%(prefix)s %(title)s",
  193. prefix=config["EMAIL_REPORTS_SUBJECT_PREFIX"],
  194. title=dashboard.dashboard_title,
  195. )
  196. _deliver_email(schedule, subject, email)
  197. def _get_slice_data(schedule):
  198. slc = schedule.slice
  199. slice_url = _get_url_path(
  200. "Superset.explore_json", csv="true", form_data=json.dumps({"slice_id": slc.id})
  201. )
  202. # URL to include in the email
  203. url = _get_url_path("Superset.slice", slice_id=slc.id)
  204. cookies = {}
  205. for cookie in _get_auth_cookies():
  206. cookies["session"] = cookie
  207. opener = urllib.request.build_opener()
  208. opener.addheaders.append(("Cookie", f"session={cookies['session']}"))
  209. response = opener.open(slice_url)
  210. if response.getcode() != 200:
  211. raise URLError(response.getcode())
  212. # TODO: Move to the csv module
  213. content = response.read()
  214. rows = [r.split(b",") for r in content.splitlines()]
  215. if schedule.delivery_type == EmailDeliveryType.inline:
  216. data = None
  217. # Parse the csv file and generate HTML
  218. columns = rows.pop(0)
  219. with app.app_context():
  220. body = render_template(
  221. "superset/reports/slice_data.html",
  222. columns=columns,
  223. rows=rows,
  224. name=slc.slice_name,
  225. link=url,
  226. )
  227. elif schedule.delivery_type == EmailDeliveryType.attachment:
  228. data = {__("%(name)s.csv", name=slc.slice_name): content}
  229. body = __(
  230. '<b><a href="%(url)s">Explore in Superset</a></b><p></p>',
  231. name=slc.slice_name,
  232. url=url,
  233. )
  234. return EmailContent(body, data, None)
  235. def _get_slice_visualization(schedule):
  236. slc = schedule.slice
  237. # Create a driver, fetch the page, wait for the page to render
  238. driver = create_webdriver()
  239. window = config["WEBDRIVER_WINDOW"]["slice"]
  240. driver.set_window_size(*window)
  241. slice_url = _get_url_path("Superset.slice", slice_id=slc.id)
  242. driver.get(slice_url)
  243. time.sleep(PAGE_RENDER_WAIT)
  244. # Set up a function to retry once for the element.
  245. # This is buggy in certain selenium versions with firefox driver
  246. element = retry_call(
  247. driver.find_element_by_class_name,
  248. fargs=["chart-container"],
  249. tries=2,
  250. delay=PAGE_RENDER_WAIT,
  251. )
  252. try:
  253. screenshot = element.screenshot_as_png
  254. except WebDriverException:
  255. # Some webdrivers do not support screenshots for elements.
  256. # In such cases, take a screenshot of the entire page.
  257. screenshot = driver.screenshot() # pylint: disable=no-member
  258. finally:
  259. destroy_webdriver(driver)
  260. # Generate the email body and attachments
  261. return _generate_mail_content(schedule, screenshot, slc.slice_name, slice_url)
  262. def deliver_slice(schedule):
  263. """
  264. Given a schedule, delivery the slice as an email report
  265. """
  266. if schedule.email_format == SliceEmailReportFormat.data:
  267. email = _get_slice_data(schedule)
  268. elif schedule.email_format == SliceEmailReportFormat.visualization:
  269. email = _get_slice_visualization(schedule)
  270. else:
  271. raise RuntimeError("Unknown email report format")
  272. subject = __(
  273. "%(prefix)s %(title)s",
  274. prefix=config["EMAIL_REPORTS_SUBJECT_PREFIX"],
  275. title=schedule.slice.slice_name,
  276. )
  277. _deliver_email(schedule, subject, email)
  278. @celery_app.task(
  279. name="email_reports.send",
  280. bind=True,
  281. soft_time_limit=config["EMAIL_ASYNC_TIME_LIMIT_SEC"],
  282. )
  283. def schedule_email_report(
  284. task, report_type, schedule_id, recipients=None
  285. ): # pylint: disable=unused-argument
  286. model_cls = get_scheduler_model(report_type)
  287. schedule = db.create_scoped_session().query(model_cls).get(schedule_id)
  288. # The user may have disabled the schedule. If so, ignore this
  289. if not schedule or not schedule.active:
  290. logger.info("Ignoring deactivated schedule")
  291. return
  292. # TODO: Detach the schedule object from the db session
  293. if recipients is not None:
  294. schedule.id = schedule_id
  295. schedule.recipients = recipients
  296. if report_type == ScheduleType.dashboard.value:
  297. deliver_dashboard(schedule)
  298. elif report_type == ScheduleType.slice.value:
  299. deliver_slice(schedule)
  300. else:
  301. raise RuntimeError("Unknown report type")
  302. def next_schedules(crontab, start_at, stop_at, resolution=0):
  303. crons = croniter.croniter(crontab, start_at - timedelta(seconds=1))
  304. previous = start_at - timedelta(days=1)
  305. for eta in crons.all_next(datetime):
  306. # Do not cross the time boundary
  307. if eta >= stop_at:
  308. break
  309. if eta < start_at:
  310. continue
  311. # Do not allow very frequent tasks
  312. if eta - previous < timedelta(seconds=resolution):
  313. continue
  314. yield eta
  315. previous = eta
  316. def schedule_window(report_type, start_at, stop_at, resolution):
  317. """
  318. Find all active schedules and schedule celery tasks for
  319. each of them with a specific ETA (determined by parsing
  320. the cron schedule for the schedule)
  321. """
  322. model_cls = get_scheduler_model(report_type)
  323. dbsession = db.create_scoped_session()
  324. schedules = dbsession.query(model_cls).filter(model_cls.active.is_(True))
  325. for schedule in schedules:
  326. args = (report_type, schedule.id)
  327. # Schedule the job for the specified time window
  328. for eta in next_schedules(
  329. schedule.crontab, start_at, stop_at, resolution=resolution
  330. ):
  331. schedule_email_report.apply_async(args, eta=eta)
  332. @celery_app.task(name="email_reports.schedule_hourly")
  333. def schedule_hourly():
  334. """ Celery beat job meant to be invoked hourly """
  335. if not config["ENABLE_SCHEDULED_EMAIL_REPORTS"]:
  336. logger.info("Scheduled email reports not enabled in config")
  337. return
  338. resolution = config["EMAIL_REPORTS_CRON_RESOLUTION"] * 60
  339. # Get the top of the hour
  340. start_at = datetime.now(tzlocal()).replace(microsecond=0, second=0, minute=0)
  341. stop_at = start_at + timedelta(seconds=3600)
  342. schedule_window(ScheduleType.dashboard.value, start_at, stop_at, resolution)
  343. schedule_window(ScheduleType.slice.value, start_at, stop_at, resolution)