druid_tests.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590
  1. # Licensed to the Apache Software Foundation (ASF) under one
  2. # or more contributor license agreements. See the NOTICE file
  3. # distributed with this work for additional information
  4. # regarding copyright ownership. The ASF licenses this file
  5. # to you under the Apache License, Version 2.0 (the
  6. # "License"); you may not use this file except in compliance
  7. # with the License. You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing,
  12. # software distributed under the License is distributed on an
  13. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  14. # KIND, either express or implied. See the License for the
  15. # specific language governing permissions and limitations
  16. # under the License.
  17. # isort:skip_file
  18. """Unit tests for Superset"""
  19. import json
  20. import unittest
  21. from datetime import datetime
  22. from unittest.mock import Mock, patch
  23. from tests.test_app import app
  24. from superset import db, security_manager
  25. from .base_tests import SupersetTestCase
  26. try:
  27. from superset.connectors.druid.models import (
  28. DruidCluster,
  29. DruidColumn,
  30. DruidDatasource,
  31. DruidMetric,
  32. )
  33. except ImportError:
  34. pass
  35. class PickableMock(Mock):
  36. def __reduce__(self):
  37. return (Mock, ())
  38. SEGMENT_METADATA = [
  39. {
  40. "id": "some_id",
  41. "intervals": ["2013-05-13T00:00:00.000Z/2013-05-14T00:00:00.000Z"],
  42. "columns": {
  43. "__time": {
  44. "type": "LONG",
  45. "hasMultipleValues": False,
  46. "size": 407240380,
  47. "cardinality": None,
  48. "errorMessage": None,
  49. },
  50. "dim1": {
  51. "type": "STRING",
  52. "hasMultipleValues": False,
  53. "size": 100000,
  54. "cardinality": 1944,
  55. "errorMessage": None,
  56. },
  57. "dim2": {
  58. "type": "STRING",
  59. "hasMultipleValues": True,
  60. "size": 100000,
  61. "cardinality": 1504,
  62. "errorMessage": None,
  63. },
  64. "metric1": {
  65. "type": "FLOAT",
  66. "hasMultipleValues": False,
  67. "size": 100000,
  68. "cardinality": None,
  69. "errorMessage": None,
  70. },
  71. },
  72. "aggregators": {
  73. "metric1": {"type": "longSum", "name": "metric1", "fieldName": "metric1"}
  74. },
  75. "size": 300000,
  76. "numRows": 5000000,
  77. }
  78. ]
  79. GB_RESULT_SET = [
  80. {
  81. "version": "v1",
  82. "timestamp": "2012-01-01T00:00:00.000Z",
  83. "event": {"dim1": "Canada", "dim2": "boy", "count": 12345678},
  84. },
  85. {
  86. "version": "v1",
  87. "timestamp": "2012-01-01T00:00:00.000Z",
  88. "event": {"dim1": "USA", "dim2": "girl", "count": 12345678 / 2},
  89. },
  90. ]
  91. DruidCluster.get_druid_version = lambda _: "0.9.1" # type: ignore
  92. class DruidTests(SupersetTestCase):
  93. """Testing interactions with Druid"""
  94. @classmethod
  95. def setUpClass(cls):
  96. cls.create_druid_test_objects()
  97. def get_test_cluster_obj(self):
  98. return DruidCluster(
  99. cluster_name="test_cluster",
  100. broker_host="localhost",
  101. broker_port=7980,
  102. broker_endpoint="druid/v2",
  103. metadata_last_refreshed=datetime.now(),
  104. )
  105. def get_cluster(self, PyDruid):
  106. instance = PyDruid.return_value
  107. instance.time_boundary.return_value = [{"result": {"maxTime": "2016-01-01"}}]
  108. instance.segment_metadata.return_value = SEGMENT_METADATA
  109. cluster = (
  110. db.session.query(DruidCluster)
  111. .filter_by(cluster_name="test_cluster")
  112. .first()
  113. )
  114. if cluster:
  115. for datasource in (
  116. db.session.query(DruidDatasource).filter_by(cluster_id=cluster.id).all()
  117. ):
  118. db.session.delete(datasource)
  119. db.session.delete(cluster)
  120. db.session.commit()
  121. cluster = self.get_test_cluster_obj()
  122. db.session.add(cluster)
  123. cluster.get_datasources = PickableMock(return_value=["test_datasource"])
  124. return cluster
  125. @unittest.skipUnless(
  126. SupersetTestCase.is_module_installed("pydruid"), "pydruid not installed"
  127. )
  128. @patch("superset.connectors.druid.models.PyDruid")
  129. def test_client(self, PyDruid):
  130. self.login(username="admin")
  131. cluster = self.get_cluster(PyDruid)
  132. cluster.refresh_datasources()
  133. cluster.refresh_datasources(merge_flag=True)
  134. datasource_id = cluster.datasources[0].id
  135. db.session.commit()
  136. nres = [
  137. list(v["event"].items()) + [("timestamp", v["timestamp"])]
  138. for v in GB_RESULT_SET
  139. ]
  140. nres = [dict(v) for v in nres]
  141. import pandas as pd
  142. df = pd.DataFrame(nres)
  143. instance = PyDruid.return_value
  144. instance.export_pandas.return_value = df
  145. instance.query_dict = {}
  146. instance.query_builder.last_query.query_dict = {}
  147. resp = self.get_resp("/superset/explore/druid/{}/".format(datasource_id))
  148. self.assertIn("test_datasource", resp)
  149. form_data = {
  150. "viz_type": "table",
  151. "granularity": "one+day",
  152. "druid_time_origin": "",
  153. "since": "7+days+ago",
  154. "until": "now",
  155. "row_limit": 5000,
  156. "include_search": "false",
  157. "metrics": ["count"],
  158. "groupby": ["dim1"],
  159. "force": "true",
  160. }
  161. # One groupby
  162. url = "/superset/explore_json/druid/{}/".format(datasource_id)
  163. resp = self.get_json_resp(url, {"form_data": json.dumps(form_data)})
  164. self.assertEqual("Canada", resp["data"]["records"][0]["dim1"])
  165. form_data = {
  166. "viz_type": "table",
  167. "granularity": "one+day",
  168. "druid_time_origin": "",
  169. "since": "7+days+ago",
  170. "until": "now",
  171. "row_limit": 5000,
  172. "include_search": "false",
  173. "metrics": ["count"],
  174. "groupby": ["dim1", "dim2"],
  175. "force": "true",
  176. }
  177. # two groupby
  178. url = "/superset/explore_json/druid/{}/".format(datasource_id)
  179. resp = self.get_json_resp(url, {"form_data": json.dumps(form_data)})
  180. self.assertEqual("Canada", resp["data"]["records"][0]["dim1"])
  181. @unittest.skipUnless(
  182. SupersetTestCase.is_module_installed("pydruid"), "pydruid not installed"
  183. )
  184. def test_druid_sync_from_config(self):
  185. CLUSTER_NAME = "new_druid"
  186. self.login()
  187. cluster = self.get_or_create(
  188. DruidCluster, {"cluster_name": CLUSTER_NAME}, db.session
  189. )
  190. db.session.merge(cluster)
  191. db.session.commit()
  192. ds = (
  193. db.session.query(DruidDatasource)
  194. .filter_by(datasource_name="test_click")
  195. .first()
  196. )
  197. if ds:
  198. db.session.delete(ds)
  199. db.session.commit()
  200. cfg = {
  201. "user": "admin",
  202. "cluster": CLUSTER_NAME,
  203. "config": {
  204. "name": "test_click",
  205. "dimensions": ["affiliate_id", "campaign", "first_seen"],
  206. "metrics_spec": [
  207. {"type": "count", "name": "count"},
  208. {"type": "sum", "name": "sum"},
  209. ],
  210. "batch_ingestion": {
  211. "sql": "SELECT * FROM clicks WHERE d='{{ ds }}'",
  212. "ts_column": "d",
  213. "sources": [{"table": "clicks", "partition": "d='{{ ds }}'"}],
  214. },
  215. },
  216. }
  217. def check():
  218. resp = self.client.post("/superset/sync_druid/", data=json.dumps(cfg))
  219. druid_ds = (
  220. db.session.query(DruidDatasource)
  221. .filter_by(datasource_name="test_click")
  222. .one()
  223. )
  224. col_names = set([c.column_name for c in druid_ds.columns])
  225. assert {"affiliate_id", "campaign", "first_seen"} == col_names
  226. metric_names = {m.metric_name for m in druid_ds.metrics}
  227. assert {"count", "sum"} == metric_names
  228. assert resp.status_code == 201
  229. check()
  230. # checking twice to make sure a second sync yields the same results
  231. check()
  232. # datasource exists, add new metrics and dimensions
  233. cfg = {
  234. "user": "admin",
  235. "cluster": CLUSTER_NAME,
  236. "config": {
  237. "name": "test_click",
  238. "dimensions": ["affiliate_id", "second_seen"],
  239. "metrics_spec": [
  240. {"type": "bla", "name": "sum"},
  241. {"type": "unique", "name": "unique"},
  242. ],
  243. },
  244. }
  245. resp = self.client.post("/superset/sync_druid/", data=json.dumps(cfg))
  246. druid_ds = (
  247. db.session.query(DruidDatasource)
  248. .filter_by(datasource_name="test_click")
  249. .one()
  250. )
  251. # columns and metrics are not deleted if config is changed as
  252. # user could define his own dimensions / metrics and want to keep them
  253. assert set([c.column_name for c in druid_ds.columns]) == set(
  254. ["affiliate_id", "campaign", "first_seen", "second_seen"]
  255. )
  256. assert set([m.metric_name for m in druid_ds.metrics]) == set(
  257. ["count", "sum", "unique"]
  258. )
  259. # metric type will not be overridden, sum stays instead of bla
  260. assert set([m.metric_type for m in druid_ds.metrics]) == set(
  261. ["longSum", "sum", "unique"]
  262. )
  263. assert resp.status_code == 201
  264. @unittest.skipUnless(
  265. SupersetTestCase.is_module_installed("pydruid"), "pydruid not installed"
  266. )
  267. @unittest.skipUnless(app.config["DRUID_IS_ACTIVE"], "DRUID_IS_ACTIVE is false")
  268. def test_filter_druid_datasource(self):
  269. CLUSTER_NAME = "new_druid"
  270. cluster = self.get_or_create(
  271. DruidCluster, {"cluster_name": CLUSTER_NAME}, db.session
  272. )
  273. db.session.merge(cluster)
  274. gamma_ds = self.get_or_create(
  275. DruidDatasource,
  276. {"datasource_name": "datasource_for_gamma", "cluster": cluster},
  277. db.session,
  278. )
  279. gamma_ds.cluster = cluster
  280. db.session.merge(gamma_ds)
  281. no_gamma_ds = self.get_or_create(
  282. DruidDatasource,
  283. {"datasource_name": "datasource_not_for_gamma", "cluster": cluster},
  284. db.session,
  285. )
  286. no_gamma_ds.cluster = cluster
  287. db.session.merge(no_gamma_ds)
  288. db.session.commit()
  289. security_manager.add_permission_view_menu("datasource_access", gamma_ds.perm)
  290. security_manager.add_permission_view_menu("datasource_access", no_gamma_ds.perm)
  291. perm = security_manager.find_permission_view_menu(
  292. "datasource_access", gamma_ds.get_perm()
  293. )
  294. security_manager.add_permission_role(security_manager.find_role("Gamma"), perm)
  295. security_manager.get_session.commit()
  296. self.login(username="gamma")
  297. url = "/druiddatasourcemodelview/list/"
  298. resp = self.get_resp(url)
  299. self.assertIn("datasource_for_gamma", resp)
  300. self.assertNotIn("datasource_not_for_gamma", resp)
  301. @unittest.skipUnless(
  302. SupersetTestCase.is_module_installed("pydruid"), "pydruid not installed"
  303. )
  304. @patch("superset.connectors.druid.models.PyDruid")
  305. def test_sync_druid_perm(self, PyDruid):
  306. self.login(username="admin")
  307. instance = PyDruid.return_value
  308. instance.time_boundary.return_value = [{"result": {"maxTime": "2016-01-01"}}]
  309. instance.segment_metadata.return_value = SEGMENT_METADATA
  310. cluster = (
  311. db.session.query(DruidCluster)
  312. .filter_by(cluster_name="test_cluster")
  313. .first()
  314. )
  315. if cluster:
  316. for datasource in (
  317. db.session.query(DruidDatasource).filter_by(cluster_id=cluster.id).all()
  318. ):
  319. db.session.delete(datasource)
  320. db.session.delete(cluster)
  321. db.session.commit()
  322. cluster = DruidCluster(
  323. cluster_name="test_cluster",
  324. broker_host="localhost",
  325. broker_port=7980,
  326. metadata_last_refreshed=datetime.now(),
  327. )
  328. db.session.add(cluster)
  329. cluster.get_datasources = PickableMock(return_value=["test_datasource"])
  330. cluster.refresh_datasources()
  331. cluster.datasources[0].merge_flag = True
  332. metadata = cluster.datasources[0].latest_metadata()
  333. self.assertEqual(len(metadata), 4)
  334. db.session.commit()
  335. view_menu_name = cluster.datasources[0].get_perm()
  336. view_menu = security_manager.find_view_menu(view_menu_name)
  337. permission = security_manager.find_permission("datasource_access")
  338. pv = (
  339. security_manager.get_session.query(security_manager.permissionview_model)
  340. .filter_by(permission=permission, view_menu=view_menu)
  341. .first()
  342. )
  343. assert pv is not None
  344. @unittest.skipUnless(
  345. SupersetTestCase.is_module_installed("pydruid"), "pydruid not installed"
  346. )
  347. @patch("superset.connectors.druid.models.PyDruid")
  348. def test_refresh_metadata(self, PyDruid):
  349. self.login(username="admin")
  350. cluster = self.get_cluster(PyDruid)
  351. cluster.refresh_datasources()
  352. datasource = cluster.datasources[0]
  353. cols = db.session.query(DruidColumn).filter(
  354. DruidColumn.datasource_id == datasource.id
  355. )
  356. for col in cols:
  357. self.assertIn(col.column_name, SEGMENT_METADATA[0]["columns"].keys())
  358. metrics = (
  359. db.session.query(DruidMetric)
  360. .filter(DruidMetric.datasource_id == datasource.id)
  361. .filter(DruidMetric.metric_name.like("%__metric1"))
  362. )
  363. for metric in metrics:
  364. agg, _ = metric.metric_name.split("__")
  365. self.assertEqual(
  366. json.loads(metric.json)["type"], "double{}".format(agg.capitalize())
  367. )
  368. @unittest.skipUnless(
  369. SupersetTestCase.is_module_installed("pydruid"), "pydruid not installed"
  370. )
  371. @patch("superset.connectors.druid.models.PyDruid")
  372. def test_refresh_metadata_augment_type(self, PyDruid):
  373. self.login(username="admin")
  374. cluster = self.get_cluster(PyDruid)
  375. cluster.refresh_datasources()
  376. metadata = SEGMENT_METADATA[:]
  377. metadata[0]["columns"]["metric1"]["type"] = "LONG"
  378. instance = PyDruid.return_value
  379. instance.segment_metadata.return_value = metadata
  380. cluster.refresh_datasources()
  381. datasource = cluster.datasources[0]
  382. column = (
  383. db.session.query(DruidColumn)
  384. .filter(DruidColumn.datasource_id == datasource.id)
  385. .filter(DruidColumn.column_name == "metric1")
  386. ).one()
  387. self.assertEqual(column.type, "LONG")
  388. metrics = (
  389. db.session.query(DruidMetric)
  390. .filter(DruidMetric.datasource_id == datasource.id)
  391. .filter(DruidMetric.metric_name.like("%__metric1"))
  392. )
  393. for metric in metrics:
  394. agg, _ = metric.metric_name.split("__")
  395. self.assertEqual(metric.json_obj["type"], "long{}".format(agg.capitalize()))
  396. @unittest.skipUnless(
  397. SupersetTestCase.is_module_installed("pydruid"), "pydruid not installed"
  398. )
  399. @patch("superset.connectors.druid.models.PyDruid")
  400. def test_refresh_metadata_augment_verbose_name(self, PyDruid):
  401. self.login(username="admin")
  402. cluster = self.get_cluster(PyDruid)
  403. cluster.refresh_datasources()
  404. datasource = cluster.datasources[0]
  405. metrics = (
  406. db.session.query(DruidMetric)
  407. .filter(DruidMetric.datasource_id == datasource.id)
  408. .filter(DruidMetric.metric_name.like("%__metric1"))
  409. )
  410. for metric in metrics:
  411. metric.verbose_name = metric.metric_name
  412. db.session.commit()
  413. # The verbose name should not change during a refresh.
  414. cluster.refresh_datasources()
  415. datasource = cluster.datasources[0]
  416. metrics = (
  417. db.session.query(DruidMetric)
  418. .filter(DruidMetric.datasource_id == datasource.id)
  419. .filter(DruidMetric.metric_name.like("%__metric1"))
  420. )
  421. for metric in metrics:
  422. self.assertEqual(metric.verbose_name, metric.metric_name)
  423. @unittest.skipUnless(
  424. SupersetTestCase.is_module_installed("pydruid"), "pydruid not installed"
  425. )
  426. def test_urls(self):
  427. cluster = self.get_test_cluster_obj()
  428. self.assertEqual(
  429. cluster.get_base_url("localhost", "9999"), "http://localhost:9999"
  430. )
  431. self.assertEqual(
  432. cluster.get_base_url("http://localhost", "9999"), "http://localhost:9999"
  433. )
  434. self.assertEqual(
  435. cluster.get_base_url("https://localhost", "9999"), "https://localhost:9999"
  436. )
  437. self.assertEqual(
  438. cluster.get_base_broker_url(), "http://localhost:7980/druid/v2"
  439. )
  440. @unittest.skipUnless(
  441. SupersetTestCase.is_module_installed("pydruid"), "pydruid not installed"
  442. )
  443. @patch("superset.connectors.druid.models.PyDruid")
  444. def test_druid_time_granularities(self, PyDruid):
  445. self.login(username="admin")
  446. cluster = self.get_cluster(PyDruid)
  447. cluster.refresh_datasources()
  448. cluster.refresh_datasources(merge_flag=True)
  449. datasource_id = cluster.datasources[0].id
  450. db.session.commit()
  451. nres = [
  452. list(v["event"].items()) + [("timestamp", v["timestamp"])]
  453. for v in GB_RESULT_SET
  454. ]
  455. nres = [dict(v) for v in nres]
  456. import pandas as pd
  457. df = pd.DataFrame(nres)
  458. instance = PyDruid.return_value
  459. instance.export_pandas.return_value = df
  460. instance.query_dict = {}
  461. instance.query_builder.last_query.query_dict = {}
  462. form_data = {
  463. "viz_type": "table",
  464. "since": "7+days+ago",
  465. "until": "now",
  466. "metrics": ["count"],
  467. "groupby": [],
  468. "include_time": "true",
  469. }
  470. granularity_map = {
  471. "5 seconds": "PT5S",
  472. "30 seconds": "PT30S",
  473. "1 minute": "PT1M",
  474. "5 minutes": "PT5M",
  475. "1 hour": "PT1H",
  476. "6 hour": "PT6H",
  477. "one day": "P1D",
  478. "1 day": "P1D",
  479. "7 days": "P7D",
  480. "week": "P1W",
  481. "week_starting_sunday": "P1W",
  482. "week_ending_saturday": "P1W",
  483. "month": "P1M",
  484. "quarter": "P3M",
  485. "year": "P1Y",
  486. }
  487. url = "/superset/explore_json/druid/{}/".format(datasource_id)
  488. for granularity_mapping in granularity_map:
  489. form_data["granularity"] = granularity_mapping
  490. self.get_json_resp(url, {"form_data": json.dumps(form_data)})
  491. self.assertEqual(
  492. granularity_map[granularity_mapping],
  493. instance.timeseries.call_args[1]["granularity"]["period"],
  494. )
  495. @unittest.skipUnless(
  496. SupersetTestCase.is_module_installed("pydruid"), "pydruid not installed"
  497. )
  498. @patch("superset.connectors.druid.models.PyDruid")
  499. def test_external_metadata(self, PyDruid):
  500. self.login(username="admin")
  501. self.login(username="admin")
  502. cluster = self.get_cluster(PyDruid)
  503. cluster.refresh_datasources()
  504. datasource = cluster.datasources[0]
  505. url = "/datasource/external_metadata/druid/{}/".format(datasource.id)
  506. resp = self.get_json_resp(url)
  507. col_names = {o.get("name") for o in resp}
  508. self.assertEqual(col_names, {"__time", "dim1", "dim2", "metric1"})
  509. if __name__ == "__main__":
  510. unittest.main()