access_tests.py 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619
  1. # Licensed to the Apache Software Foundation (ASF) under one
  2. # or more contributor license agreements. See the NOTICE file
  3. # distributed with this work for additional information
  4. # regarding copyright ownership. The ASF licenses this file
  5. # to you under the Apache License, Version 2.0 (the
  6. # "License"); you may not use this file except in compliance
  7. # with the License. You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing,
  12. # software distributed under the License is distributed on an
  13. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  14. # KIND, either express or implied. See the License for the
  15. # specific language governing permissions and limitations
  16. # under the License.
  17. # isort:skip_file
  18. """Unit tests for Superset"""
  19. import json
  20. import unittest
  21. from unittest import mock
  22. from tests.test_app import app # isort:skip
  23. from superset import db, security_manager
  24. from superset.connectors.connector_registry import ConnectorRegistry
  25. from superset.connectors.druid.models import DruidDatasource
  26. from superset.connectors.sqla.models import SqlaTable
  27. from superset.models import core as models
  28. from superset.models.datasource_access_request import DatasourceAccessRequest
  29. from .base_tests import SupersetTestCase
  30. ROLE_TABLES_PERM_DATA = {
  31. "role_name": "override_me",
  32. "database": [
  33. {
  34. "datasource_type": "table",
  35. "name": "examples",
  36. "schema": [{"name": "", "datasources": ["birth_names"]}],
  37. }
  38. ],
  39. }
  40. ROLE_ALL_PERM_DATA = {
  41. "role_name": "override_me",
  42. "database": [
  43. {
  44. "datasource_type": "table",
  45. "name": "examples",
  46. "schema": [{"name": "", "datasources": ["birth_names"]}],
  47. },
  48. {
  49. "datasource_type": "druid",
  50. "name": "druid_test",
  51. "schema": [{"name": "", "datasources": ["druid_ds_1", "druid_ds_2"]}],
  52. },
  53. ],
  54. }
  55. EXTEND_ROLE_REQUEST = (
  56. "/superset/approve?datasource_type={}&datasource_id={}&"
  57. "created_by={}&role_to_extend={}"
  58. )
  59. GRANT_ROLE_REQUEST = (
  60. "/superset/approve?datasource_type={}&datasource_id={}&"
  61. "created_by={}&role_to_grant={}"
  62. )
  63. TEST_ROLE_1 = "test_role1"
  64. TEST_ROLE_2 = "test_role2"
  65. DB_ACCESS_ROLE = "db_access_role"
  66. SCHEMA_ACCESS_ROLE = "schema_access_role"
  67. def create_access_request(session, ds_type, ds_name, role_name, user_name):
  68. ds_class = ConnectorRegistry.sources[ds_type]
  69. # TODO: generalize datasource names
  70. if ds_type == "table":
  71. ds = session.query(ds_class).filter(ds_class.table_name == ds_name).first()
  72. else:
  73. ds = session.query(ds_class).filter(ds_class.datasource_name == ds_name).first()
  74. ds_perm_view = security_manager.find_permission_view_menu(
  75. "datasource_access", ds.perm
  76. )
  77. security_manager.add_permission_role(
  78. security_manager.find_role(role_name), ds_perm_view
  79. )
  80. access_request = DatasourceAccessRequest(
  81. datasource_id=ds.id,
  82. datasource_type=ds_type,
  83. created_by_fk=security_manager.find_user(username=user_name).id,
  84. )
  85. session.add(access_request)
  86. session.commit()
  87. return access_request
  88. class RequestAccessTests(SupersetTestCase):
  89. @classmethod
  90. def setUpClass(cls):
  91. with app.app_context():
  92. security_manager.add_role("override_me")
  93. security_manager.add_role(TEST_ROLE_1)
  94. security_manager.add_role(TEST_ROLE_2)
  95. security_manager.add_role(DB_ACCESS_ROLE)
  96. security_manager.add_role(SCHEMA_ACCESS_ROLE)
  97. db.session.commit()
  98. @classmethod
  99. def tearDownClass(cls):
  100. with app.app_context():
  101. override_me = security_manager.find_role("override_me")
  102. db.session.delete(override_me)
  103. db.session.delete(security_manager.find_role(TEST_ROLE_1))
  104. db.session.delete(security_manager.find_role(TEST_ROLE_2))
  105. db.session.delete(security_manager.find_role(DB_ACCESS_ROLE))
  106. db.session.delete(security_manager.find_role(SCHEMA_ACCESS_ROLE))
  107. db.session.commit()
  108. def setUp(self):
  109. self.login("admin")
  110. def tearDown(self):
  111. self.logout()
  112. override_me = security_manager.find_role("override_me")
  113. override_me.permissions = []
  114. db.session.commit()
  115. db.session.close()
  116. def test_override_role_permissions_is_admin_only(self):
  117. self.logout()
  118. self.login("alpha")
  119. response = self.client.post(
  120. "/superset/override_role_permissions/",
  121. data=json.dumps(ROLE_TABLES_PERM_DATA),
  122. content_type="application/json",
  123. follow_redirects=True,
  124. )
  125. self.assertNotEqual(405, response.status_code)
  126. def test_override_role_permissions_1_table(self):
  127. response = self.client.post(
  128. "/superset/override_role_permissions/",
  129. data=json.dumps(ROLE_TABLES_PERM_DATA),
  130. content_type="application/json",
  131. )
  132. self.assertEqual(201, response.status_code)
  133. updated_override_me = security_manager.find_role("override_me")
  134. self.assertEqual(1, len(updated_override_me.permissions))
  135. birth_names = self.get_table_by_name("birth_names")
  136. self.assertEqual(
  137. birth_names.perm, updated_override_me.permissions[0].view_menu.name
  138. )
  139. self.assertEqual(
  140. "datasource_access", updated_override_me.permissions[0].permission.name
  141. )
  142. def test_override_role_permissions_druid_and_table(self):
  143. response = self.client.post(
  144. "/superset/override_role_permissions/",
  145. data=json.dumps(ROLE_ALL_PERM_DATA),
  146. content_type="application/json",
  147. )
  148. self.assertEqual(201, response.status_code)
  149. updated_role = security_manager.find_role("override_me")
  150. perms = sorted(updated_role.permissions, key=lambda p: p.view_menu.name)
  151. druid_ds_1 = self.get_druid_ds_by_name("druid_ds_1")
  152. self.assertEqual(druid_ds_1.perm, perms[0].view_menu.name)
  153. self.assertEqual("datasource_access", perms[0].permission.name)
  154. druid_ds_2 = self.get_druid_ds_by_name("druid_ds_2")
  155. self.assertEqual(druid_ds_2.perm, perms[1].view_menu.name)
  156. self.assertEqual(
  157. "datasource_access", updated_role.permissions[1].permission.name
  158. )
  159. birth_names = self.get_table_by_name("birth_names")
  160. self.assertEqual(birth_names.perm, perms[2].view_menu.name)
  161. self.assertEqual(
  162. "datasource_access", updated_role.permissions[2].permission.name
  163. )
  164. self.assertEqual(3, len(perms))
  165. def test_override_role_permissions_drops_absent_perms(self):
  166. override_me = security_manager.find_role("override_me")
  167. override_me.permissions.append(
  168. security_manager.find_permission_view_menu(
  169. view_menu_name=self.get_table_by_name("energy_usage").perm,
  170. permission_name="datasource_access",
  171. )
  172. )
  173. db.session.flush()
  174. response = self.client.post(
  175. "/superset/override_role_permissions/",
  176. data=json.dumps(ROLE_TABLES_PERM_DATA),
  177. content_type="application/json",
  178. )
  179. self.assertEqual(201, response.status_code)
  180. updated_override_me = security_manager.find_role("override_me")
  181. self.assertEqual(1, len(updated_override_me.permissions))
  182. birth_names = self.get_table_by_name("birth_names")
  183. self.assertEqual(
  184. birth_names.perm, updated_override_me.permissions[0].view_menu.name
  185. )
  186. self.assertEqual(
  187. "datasource_access", updated_override_me.permissions[0].permission.name
  188. )
  189. def test_clean_requests_after_role_extend(self):
  190. session = db.session
  191. # Case 1. Gamma and gamma2 requested test_role1 on energy_usage access
  192. # Gamma already has role test_role1
  193. # Extend test_role1 with energy_usage access for gamma2
  194. # Check if access request for gamma at energy_usage was deleted
  195. # gamma2 and gamma request table_role on energy usage
  196. if app.config["ENABLE_ACCESS_REQUEST"]:
  197. access_request1 = create_access_request(
  198. session, "table", "random_time_series", TEST_ROLE_1, "gamma2"
  199. )
  200. ds_1_id = access_request1.datasource_id
  201. create_access_request(
  202. session, "table", "random_time_series", TEST_ROLE_1, "gamma"
  203. )
  204. access_requests = self.get_access_requests("gamma", "table", ds_1_id)
  205. self.assertTrue(access_requests)
  206. # gamma gets test_role1
  207. self.get_resp(
  208. GRANT_ROLE_REQUEST.format("table", ds_1_id, "gamma", TEST_ROLE_1)
  209. )
  210. # extend test_role1 with access on energy usage
  211. self.client.get(
  212. EXTEND_ROLE_REQUEST.format("table", ds_1_id, "gamma2", TEST_ROLE_1)
  213. )
  214. access_requests = self.get_access_requests("gamma", "table", ds_1_id)
  215. self.assertFalse(access_requests)
  216. gamma_user = security_manager.find_user(username="gamma")
  217. gamma_user.roles.remove(security_manager.find_role("test_role1"))
  218. def test_clean_requests_after_alpha_grant(self):
  219. session = db.session
  220. # Case 2. Two access requests from gamma and gamma2
  221. # Gamma becomes alpha, gamma2 gets granted
  222. # Check if request by gamma has been deleted
  223. access_request1 = create_access_request(
  224. session, "table", "birth_names", TEST_ROLE_1, "gamma"
  225. )
  226. create_access_request(session, "table", "birth_names", TEST_ROLE_2, "gamma2")
  227. ds_1_id = access_request1.datasource_id
  228. # gamma becomes alpha
  229. alpha_role = security_manager.find_role("Alpha")
  230. gamma_user = security_manager.find_user(username="gamma")
  231. gamma_user.roles.append(alpha_role)
  232. session.commit()
  233. access_requests = self.get_access_requests("gamma", "table", ds_1_id)
  234. self.assertTrue(access_requests)
  235. self.client.get(
  236. EXTEND_ROLE_REQUEST.format("table", ds_1_id, "gamma2", TEST_ROLE_2)
  237. )
  238. access_requests = self.get_access_requests("gamma", "table", ds_1_id)
  239. self.assertFalse(access_requests)
  240. gamma_user = security_manager.find_user(username="gamma")
  241. gamma_user.roles.remove(security_manager.find_role("Alpha"))
  242. session.commit()
  243. def test_clean_requests_after_db_grant(self):
  244. session = db.session
  245. # Case 3. Two access requests from gamma and gamma2
  246. # Gamma gets database access, gamma2 access request granted
  247. # Check if request by gamma has been deleted
  248. gamma_user = security_manager.find_user(username="gamma")
  249. access_request1 = create_access_request(
  250. session, "table", "energy_usage", TEST_ROLE_1, "gamma"
  251. )
  252. create_access_request(session, "table", "energy_usage", TEST_ROLE_2, "gamma2")
  253. ds_1_id = access_request1.datasource_id
  254. # gamma gets granted database access
  255. database = session.query(models.Database).first()
  256. security_manager.add_permission_view_menu("database_access", database.perm)
  257. ds_perm_view = security_manager.find_permission_view_menu(
  258. "database_access", database.perm
  259. )
  260. security_manager.add_permission_role(
  261. security_manager.find_role(DB_ACCESS_ROLE), ds_perm_view
  262. )
  263. gamma_user.roles.append(security_manager.find_role(DB_ACCESS_ROLE))
  264. session.commit()
  265. access_requests = self.get_access_requests("gamma", "table", ds_1_id)
  266. self.assertTrue(access_requests)
  267. # gamma2 request gets fulfilled
  268. self.client.get(
  269. EXTEND_ROLE_REQUEST.format("table", ds_1_id, "gamma2", TEST_ROLE_2)
  270. )
  271. access_requests = self.get_access_requests("gamma", "table", ds_1_id)
  272. self.assertFalse(access_requests)
  273. gamma_user = security_manager.find_user(username="gamma")
  274. gamma_user.roles.remove(security_manager.find_role(DB_ACCESS_ROLE))
  275. session.commit()
  276. def test_clean_requests_after_schema_grant(self):
  277. session = db.session
  278. # Case 4. Two access requests from gamma and gamma2
  279. # Gamma gets schema access, gamma2 access request granted
  280. # Check if request by gamma has been deleted
  281. gamma_user = security_manager.find_user(username="gamma")
  282. access_request1 = create_access_request(
  283. session, "table", "wb_health_population", TEST_ROLE_1, "gamma"
  284. )
  285. create_access_request(
  286. session, "table", "wb_health_population", TEST_ROLE_2, "gamma2"
  287. )
  288. ds_1_id = access_request1.datasource_id
  289. ds = (
  290. session.query(SqlaTable)
  291. .filter_by(table_name="wb_health_population")
  292. .first()
  293. )
  294. ds.schema = "temp_schema"
  295. security_manager.add_permission_view_menu("schema_access", ds.schema_perm)
  296. schema_perm_view = security_manager.find_permission_view_menu(
  297. "schema_access", ds.schema_perm
  298. )
  299. security_manager.add_permission_role(
  300. security_manager.find_role(SCHEMA_ACCESS_ROLE), schema_perm_view
  301. )
  302. gamma_user.roles.append(security_manager.find_role(SCHEMA_ACCESS_ROLE))
  303. session.commit()
  304. # gamma2 request gets fulfilled
  305. self.client.get(
  306. EXTEND_ROLE_REQUEST.format("table", ds_1_id, "gamma2", TEST_ROLE_2)
  307. )
  308. access_requests = self.get_access_requests("gamma", "table", ds_1_id)
  309. self.assertFalse(access_requests)
  310. gamma_user = security_manager.find_user(username="gamma")
  311. gamma_user.roles.remove(security_manager.find_role(SCHEMA_ACCESS_ROLE))
  312. ds = (
  313. session.query(SqlaTable)
  314. .filter_by(table_name="wb_health_population")
  315. .first()
  316. )
  317. ds.schema = None
  318. session.commit()
  319. @mock.patch("superset.utils.core.send_MIME_email")
  320. def test_approve(self, mock_send_mime):
  321. if app.config["ENABLE_ACCESS_REQUEST"]:
  322. session = db.session
  323. TEST_ROLE_NAME = "table_role"
  324. security_manager.add_role(TEST_ROLE_NAME)
  325. # Case 1. Grant new role to the user.
  326. access_request1 = create_access_request(
  327. session, "table", "unicode_test", TEST_ROLE_NAME, "gamma"
  328. )
  329. ds_1_id = access_request1.datasource_id
  330. self.get_resp(
  331. GRANT_ROLE_REQUEST.format("table", ds_1_id, "gamma", TEST_ROLE_NAME)
  332. )
  333. # Test email content.
  334. self.assertTrue(mock_send_mime.called)
  335. call_args = mock_send_mime.call_args[0]
  336. self.assertEqual(
  337. [
  338. security_manager.find_user(username="gamma").email,
  339. security_manager.find_user(username="admin").email,
  340. ],
  341. call_args[1],
  342. )
  343. self.assertEqual(
  344. "[Superset] Access to the datasource {} was granted".format(
  345. self.get_table(ds_1_id).full_name
  346. ),
  347. call_args[2]["Subject"],
  348. )
  349. self.assertIn(TEST_ROLE_NAME, call_args[2].as_string())
  350. self.assertIn("unicode_test", call_args[2].as_string())
  351. access_requests = self.get_access_requests("gamma", "table", ds_1_id)
  352. # request was removed
  353. self.assertFalse(access_requests)
  354. # user was granted table_role
  355. user_roles = [r.name for r in security_manager.find_user("gamma").roles]
  356. self.assertIn(TEST_ROLE_NAME, user_roles)
  357. # Case 2. Extend the role to have access to the table
  358. access_request2 = create_access_request(
  359. session, "table", "energy_usage", TEST_ROLE_NAME, "gamma"
  360. )
  361. ds_2_id = access_request2.datasource_id
  362. energy_usage_perm = access_request2.datasource.perm
  363. self.client.get(
  364. EXTEND_ROLE_REQUEST.format(
  365. "table", access_request2.datasource_id, "gamma", TEST_ROLE_NAME
  366. )
  367. )
  368. access_requests = self.get_access_requests("gamma", "table", ds_2_id)
  369. # Test email content.
  370. self.assertTrue(mock_send_mime.called)
  371. call_args = mock_send_mime.call_args[0]
  372. self.assertEqual(
  373. [
  374. security_manager.find_user(username="gamma").email,
  375. security_manager.find_user(username="admin").email,
  376. ],
  377. call_args[1],
  378. )
  379. self.assertEqual(
  380. "[Superset] Access to the datasource {} was granted".format(
  381. self.get_table(ds_2_id).full_name
  382. ),
  383. call_args[2]["Subject"],
  384. )
  385. self.assertIn(TEST_ROLE_NAME, call_args[2].as_string())
  386. self.assertIn("energy_usage", call_args[2].as_string())
  387. # request was removed
  388. self.assertFalse(access_requests)
  389. # table_role was extended to grant access to the energy_usage table/
  390. perm_view = security_manager.find_permission_view_menu(
  391. "datasource_access", energy_usage_perm
  392. )
  393. TEST_ROLE = security_manager.find_role(TEST_ROLE_NAME)
  394. self.assertIn(perm_view, TEST_ROLE.permissions)
  395. # Case 3. Grant new role to the user to access the druid datasource.
  396. security_manager.add_role("druid_role")
  397. access_request3 = create_access_request(
  398. session, "druid", "druid_ds_1", "druid_role", "gamma"
  399. )
  400. self.get_resp(
  401. GRANT_ROLE_REQUEST.format(
  402. "druid", access_request3.datasource_id, "gamma", "druid_role"
  403. )
  404. )
  405. # user was granted table_role
  406. user_roles = [r.name for r in security_manager.find_user("gamma").roles]
  407. self.assertIn("druid_role", user_roles)
  408. # Case 4. Extend the role to have access to the druid datasource
  409. access_request4 = create_access_request(
  410. session, "druid", "druid_ds_2", "druid_role", "gamma"
  411. )
  412. druid_ds_2_perm = access_request4.datasource.perm
  413. self.client.get(
  414. EXTEND_ROLE_REQUEST.format(
  415. "druid", access_request4.datasource_id, "gamma", "druid_role"
  416. )
  417. )
  418. # druid_role was extended to grant access to the druid_access_ds_2
  419. druid_role = security_manager.find_role("druid_role")
  420. perm_view = security_manager.find_permission_view_menu(
  421. "datasource_access", druid_ds_2_perm
  422. )
  423. self.assertIn(perm_view, druid_role.permissions)
  424. # cleanup
  425. gamma_user = security_manager.find_user(username="gamma")
  426. gamma_user.roles.remove(security_manager.find_role("druid_role"))
  427. gamma_user.roles.remove(security_manager.find_role(TEST_ROLE_NAME))
  428. session.delete(security_manager.find_role("druid_role"))
  429. session.delete(security_manager.find_role(TEST_ROLE_NAME))
  430. session.commit()
  431. def test_request_access(self):
  432. if app.config["ENABLE_ACCESS_REQUEST"]:
  433. session = db.session
  434. self.logout()
  435. self.login(username="gamma")
  436. gamma_user = security_manager.find_user(username="gamma")
  437. security_manager.add_role("dummy_role")
  438. gamma_user.roles.append(security_manager.find_role("dummy_role"))
  439. session.commit()
  440. ACCESS_REQUEST = (
  441. "/superset/request_access?"
  442. "datasource_type={}&"
  443. "datasource_id={}&"
  444. "action={}&"
  445. )
  446. ROLE_GRANT_LINK = (
  447. '<a href="/superset/approve?datasource_type={}&datasource_id={}&'
  448. 'created_by={}&role_to_grant={}">Grant {} Role</a>'
  449. )
  450. # Request table access, there are no roles have this table.
  451. table1 = (
  452. session.query(SqlaTable)
  453. .filter_by(table_name="random_time_series")
  454. .first()
  455. )
  456. table_1_id = table1.id
  457. # request access to the table
  458. resp = self.get_resp(ACCESS_REQUEST.format("table", table_1_id, "go"))
  459. assert "Access was requested" in resp
  460. access_request1 = self.get_access_requests("gamma", "table", table_1_id)
  461. assert access_request1 is not None
  462. # Request access, roles exist that contains the table.
  463. # add table to the existing roles
  464. table3 = (
  465. session.query(SqlaTable).filter_by(table_name="energy_usage").first()
  466. )
  467. table_3_id = table3.id
  468. table3_perm = table3.perm
  469. security_manager.add_role("energy_usage_role")
  470. alpha_role = security_manager.find_role("Alpha")
  471. security_manager.add_permission_role(
  472. alpha_role,
  473. security_manager.find_permission_view_menu(
  474. "datasource_access", table3_perm
  475. ),
  476. )
  477. security_manager.add_permission_role(
  478. security_manager.find_role("energy_usage_role"),
  479. security_manager.find_permission_view_menu(
  480. "datasource_access", table3_perm
  481. ),
  482. )
  483. session.commit()
  484. self.get_resp(ACCESS_REQUEST.format("table", table_3_id, "go"))
  485. access_request3 = self.get_access_requests("gamma", "table", table_3_id)
  486. approve_link_3 = ROLE_GRANT_LINK.format(
  487. "table", table_3_id, "gamma", "energy_usage_role", "energy_usage_role"
  488. )
  489. self.assertEqual(
  490. access_request3.roles_with_datasource,
  491. "<ul><li>{}</li></ul>".format(approve_link_3),
  492. )
  493. # Request druid access, there are no roles have this table.
  494. druid_ds_4 = (
  495. session.query(DruidDatasource)
  496. .filter_by(datasource_name="druid_ds_1")
  497. .first()
  498. )
  499. druid_ds_4_id = druid_ds_4.id
  500. # request access to the table
  501. self.get_resp(ACCESS_REQUEST.format("druid", druid_ds_4_id, "go"))
  502. access_request4 = self.get_access_requests("gamma", "druid", druid_ds_4_id)
  503. self.assertEqual(
  504. access_request4.roles_with_datasource,
  505. "<ul></ul>".format(access_request4.id),
  506. )
  507. # Case 5. Roles exist that contains the druid datasource.
  508. # add druid ds to the existing roles
  509. druid_ds_5 = (
  510. session.query(DruidDatasource)
  511. .filter_by(datasource_name="druid_ds_2")
  512. .first()
  513. )
  514. druid_ds_5_id = druid_ds_5.id
  515. druid_ds_5_perm = druid_ds_5.perm
  516. druid_ds_2_role = security_manager.add_role("druid_ds_2_role")
  517. admin_role = security_manager.find_role("Admin")
  518. security_manager.add_permission_role(
  519. admin_role,
  520. security_manager.find_permission_view_menu(
  521. "datasource_access", druid_ds_5_perm
  522. ),
  523. )
  524. security_manager.add_permission_role(
  525. druid_ds_2_role,
  526. security_manager.find_permission_view_menu(
  527. "datasource_access", druid_ds_5_perm
  528. ),
  529. )
  530. session.commit()
  531. self.get_resp(ACCESS_REQUEST.format("druid", druid_ds_5_id, "go"))
  532. access_request5 = self.get_access_requests("gamma", "druid", druid_ds_5_id)
  533. approve_link_5 = ROLE_GRANT_LINK.format(
  534. "druid", druid_ds_5_id, "gamma", "druid_ds_2_role", "druid_ds_2_role"
  535. )
  536. self.assertEqual(
  537. access_request5.roles_with_datasource,
  538. "<ul><li>{}</li></ul>".format(approve_link_5),
  539. )
  540. # cleanup
  541. gamma_user = security_manager.find_user(username="gamma")
  542. gamma_user.roles.remove(security_manager.find_role("dummy_role"))
  543. session.commit()
  544. if __name__ == "__main__":
  545. unittest.main()