123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818 |
- # Licensed to the Apache Software Foundation (ASF) under one
- # or more contributor license agreements. See the NOTICE file
- # distributed with this work for additional information
- # regarding copyright ownership. The ASF licenses this file
- # to you under the Apache License, Version 2.0 (the
- # "License"); you may not use this file except in compliance
- # with the License. You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing,
- # software distributed under the License is distributed on an
- # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- # KIND, either express or implied. See the License for the
- # specific language governing permissions and limitations
- # under the License.
- # isort:skip_file
- import inspect
- import unittest
- from unittest.mock import Mock, patch
- import prison
- import tests.test_app
- from superset import app, appbuilder, db, security_manager, viz
- from superset.connectors.druid.models import DruidCluster, DruidDatasource
- from superset.connectors.sqla.models import SqlaTable
- from superset.exceptions import SupersetSecurityException
- from superset.models.core import Database
- from superset.models.slice import Slice
- from superset.utils.core import get_example_database
- from .base_tests import SupersetTestCase
- def get_perm_tuples(role_name):
- perm_set = set()
- for perm in security_manager.find_role(role_name).permissions:
- perm_set.add((perm.permission.name, perm.view_menu.name))
- return perm_set
- SCHEMA_ACCESS_ROLE = "schema_access_role"
- def create_schema_perm(view_menu_name: str) -> None:
- permission = "schema_access"
- security_manager.add_permission_view_menu(permission, view_menu_name)
- perm_view = security_manager.find_permission_view_menu(permission, view_menu_name)
- security_manager.add_permission_role(
- security_manager.find_role(SCHEMA_ACCESS_ROLE), perm_view
- )
- return None
- def delete_schema_perm(view_menu_name: str) -> None:
- pv = security_manager.find_permission_view_menu("schema_access", "[examples].[2]")
- security_manager.del_permission_role(
- security_manager.find_role(SCHEMA_ACCESS_ROLE), pv
- )
- security_manager.del_permission_view_menu("schema_access", "[examples].[2]")
- return None
- class RolePermissionTests(SupersetTestCase):
- """Testing export role permissions."""
- def setUp(self):
- session = db.session
- security_manager.add_role(SCHEMA_ACCESS_ROLE)
- session.commit()
- ds = (
- db.session.query(SqlaTable)
- .filter_by(table_name="wb_health_population")
- .first()
- )
- ds.schema = "temp_schema"
- ds.schema_perm = ds.get_schema_perm()
- ds_slices = (
- session.query(Slice)
- .filter_by(datasource_type="table")
- .filter_by(datasource_id=ds.id)
- .all()
- )
- for s in ds_slices:
- s.schema_perm = ds.schema_perm
- create_schema_perm("[examples].[temp_schema]")
- gamma_user = security_manager.find_user(username="gamma")
- gamma_user.roles.append(security_manager.find_role(SCHEMA_ACCESS_ROLE))
- session.commit()
- def tearDown(self):
- session = db.session
- ds = (
- session.query(SqlaTable)
- .filter_by(table_name="wb_health_population")
- .first()
- )
- schema_perm = ds.schema_perm
- ds.schema = None
- ds.schema_perm = None
- ds_slices = (
- session.query(Slice)
- .filter_by(datasource_type="table")
- .filter_by(datasource_id=ds.id)
- .all()
- )
- for s in ds_slices:
- s.schema_perm = None
- delete_schema_perm(schema_perm)
- session.delete(security_manager.find_role(SCHEMA_ACCESS_ROLE))
- session.commit()
- def test_set_perm_sqla_table(self):
- session = db.session
- table = SqlaTable(
- schema="tmp_schema",
- table_name="tmp_perm_table",
- database=get_example_database(),
- )
- session.add(table)
- session.commit()
- stored_table = (
- session.query(SqlaTable).filter_by(table_name="tmp_perm_table").one()
- )
- self.assertEquals(
- stored_table.perm, f"[examples].[tmp_perm_table](id:{stored_table.id})"
- )
- self.assertIsNotNone(
- security_manager.find_permission_view_menu(
- "datasource_access", stored_table.perm
- )
- )
- self.assertEquals(stored_table.schema_perm, "[examples].[tmp_schema]")
- self.assertIsNotNone(
- security_manager.find_permission_view_menu(
- "schema_access", stored_table.schema_perm
- )
- )
- # table name change
- stored_table.table_name = "tmp_perm_table_v2"
- session.commit()
- stored_table = (
- session.query(SqlaTable).filter_by(table_name="tmp_perm_table_v2").one()
- )
- self.assertEquals(
- stored_table.perm, f"[examples].[tmp_perm_table_v2](id:{stored_table.id})"
- )
- self.assertIsNotNone(
- security_manager.find_permission_view_menu(
- "datasource_access", stored_table.perm
- )
- )
- # no changes in schema
- self.assertEquals(stored_table.schema_perm, "[examples].[tmp_schema]")
- self.assertIsNotNone(
- security_manager.find_permission_view_menu(
- "schema_access", stored_table.schema_perm
- )
- )
- # schema name change
- stored_table.schema = "tmp_schema_v2"
- session.commit()
- stored_table = (
- session.query(SqlaTable).filter_by(table_name="tmp_perm_table_v2").one()
- )
- self.assertEquals(
- stored_table.perm, f"[examples].[tmp_perm_table_v2](id:{stored_table.id})"
- )
- self.assertIsNotNone(
- security_manager.find_permission_view_menu(
- "datasource_access", stored_table.perm
- )
- )
- # no changes in schema
- self.assertEquals(stored_table.schema_perm, "[examples].[tmp_schema_v2]")
- self.assertIsNotNone(
- security_manager.find_permission_view_menu(
- "schema_access", stored_table.schema_perm
- )
- )
- # database change
- new_db = Database(sqlalchemy_uri="some_uri", database_name="tmp_db")
- session.add(new_db)
- stored_table.database = (
- session.query(Database).filter_by(database_name="tmp_db").one()
- )
- session.commit()
- stored_table = (
- session.query(SqlaTable).filter_by(table_name="tmp_perm_table_v2").one()
- )
- self.assertEquals(
- stored_table.perm, f"[tmp_db].[tmp_perm_table_v2](id:{stored_table.id})"
- )
- self.assertIsNotNone(
- security_manager.find_permission_view_menu(
- "datasource_access", stored_table.perm
- )
- )
- # no changes in schema
- self.assertEquals(stored_table.schema_perm, "[tmp_db].[tmp_schema_v2]")
- self.assertIsNotNone(
- security_manager.find_permission_view_menu(
- "schema_access", stored_table.schema_perm
- )
- )
- # no schema
- stored_table.schema = None
- session.commit()
- stored_table = (
- session.query(SqlaTable).filter_by(table_name="tmp_perm_table_v2").one()
- )
- self.assertEquals(
- stored_table.perm, f"[tmp_db].[tmp_perm_table_v2](id:{stored_table.id})"
- )
- self.assertIsNotNone(
- security_manager.find_permission_view_menu(
- "datasource_access", stored_table.perm
- )
- )
- self.assertIsNone(stored_table.schema_perm)
- session.delete(new_db)
- session.delete(stored_table)
- session.commit()
- def test_set_perm_druid_datasource(self):
- session = db.session
- druid_cluster = (
- session.query(DruidCluster).filter_by(cluster_name="druid_test").one()
- )
- datasource = DruidDatasource(
- datasource_name="tmp_datasource",
- cluster=druid_cluster,
- cluster_id=druid_cluster.id,
- )
- session.add(datasource)
- session.commit()
- # store without a schema
- stored_datasource = (
- session.query(DruidDatasource)
- .filter_by(datasource_name="tmp_datasource")
- .one()
- )
- self.assertEquals(
- stored_datasource.perm,
- f"[druid_test].[tmp_datasource](id:{stored_datasource.id})",
- )
- self.assertIsNotNone(
- security_manager.find_permission_view_menu(
- "datasource_access", stored_datasource.perm
- )
- )
- self.assertIsNone(stored_datasource.schema_perm)
- # store with a schema
- stored_datasource.datasource_name = "tmp_schema.tmp_datasource"
- session.commit()
- self.assertEquals(
- stored_datasource.perm,
- f"[druid_test].[tmp_schema.tmp_datasource](id:{stored_datasource.id})",
- )
- self.assertIsNotNone(
- security_manager.find_permission_view_menu(
- "datasource_access", stored_datasource.perm
- )
- )
- self.assertIsNotNone(stored_datasource.schema_perm, "[druid_test].[tmp_schema]")
- self.assertIsNotNone(
- security_manager.find_permission_view_menu(
- "schema_access", stored_datasource.schema_perm
- )
- )
- session.delete(stored_datasource)
- session.commit()
- def test_set_perm_druid_cluster(self):
- session = db.session
- cluster = DruidCluster(cluster_name="tmp_druid_cluster")
- session.add(cluster)
- stored_cluster = (
- session.query(DruidCluster)
- .filter_by(cluster_name="tmp_druid_cluster")
- .one()
- )
- self.assertEquals(
- stored_cluster.perm, f"[tmp_druid_cluster].(id:{stored_cluster.id})"
- )
- self.assertIsNotNone(
- security_manager.find_permission_view_menu(
- "database_access", stored_cluster.perm
- )
- )
- stored_cluster.cluster_name = "tmp_druid_cluster2"
- session.commit()
- self.assertEquals(
- stored_cluster.perm, f"[tmp_druid_cluster2].(id:{stored_cluster.id})"
- )
- self.assertIsNotNone(
- security_manager.find_permission_view_menu(
- "database_access", stored_cluster.perm
- )
- )
- session.delete(stored_cluster)
- session.commit()
- def test_set_perm_database(self):
- session = db.session
- database = Database(
- database_name="tmp_database", sqlalchemy_uri="sqlite://test"
- )
- session.add(database)
- stored_db = (
- session.query(Database).filter_by(database_name="tmp_database").one()
- )
- self.assertEquals(stored_db.perm, f"[tmp_database].(id:{stored_db.id})")
- self.assertIsNotNone(
- security_manager.find_permission_view_menu(
- "database_access", stored_db.perm
- )
- )
- stored_db.database_name = "tmp_database2"
- session.commit()
- stored_db = (
- session.query(Database).filter_by(database_name="tmp_database2").one()
- )
- self.assertEquals(stored_db.perm, f"[tmp_database2].(id:{stored_db.id})")
- self.assertIsNotNone(
- security_manager.find_permission_view_menu(
- "database_access", stored_db.perm
- )
- )
- session.delete(stored_db)
- session.commit()
- def test_set_perm_slice(self):
- session = db.session
- database = Database(
- database_name="tmp_database", sqlalchemy_uri="sqlite://test"
- )
- table = SqlaTable(table_name="tmp_perm_table", database=database)
- session.add(database)
- session.add(table)
- session.commit()
- # no schema permission
- slice = Slice(
- datasource_id=table.id,
- datasource_type="table",
- datasource_name="tmp_perm_table",
- slice_name="slice_name",
- )
- session.add(slice)
- session.commit()
- slice = session.query(Slice).filter_by(slice_name="slice_name").one()
- self.assertEquals(slice.perm, table.perm)
- self.assertEquals(slice.perm, f"[tmp_database].[tmp_perm_table](id:{table.id})")
- self.assertEquals(slice.schema_perm, table.schema_perm)
- self.assertIsNone(slice.schema_perm)
- table.schema = "tmp_perm_schema"
- table.table_name = "tmp_perm_table_v2"
- session.commit()
- # TODO(bogdan): modify slice permissions on the table update.
- self.assertNotEquals(slice.perm, table.perm)
- self.assertEquals(slice.perm, f"[tmp_database].[tmp_perm_table](id:{table.id})")
- self.assertEquals(
- table.perm, f"[tmp_database].[tmp_perm_table_v2](id:{table.id})"
- )
- # TODO(bogdan): modify slice schema permissions on the table update.
- self.assertNotEquals(slice.schema_perm, table.schema_perm)
- self.assertIsNone(slice.schema_perm)
- # updating slice refreshes the permissions
- slice.slice_name = "slice_name_v2"
- session.commit()
- self.assertEquals(slice.perm, table.perm)
- self.assertEquals(
- slice.perm, f"[tmp_database].[tmp_perm_table_v2](id:{table.id})"
- )
- self.assertEquals(slice.schema_perm, table.schema_perm)
- self.assertEquals(slice.schema_perm, "[tmp_database].[tmp_perm_schema]")
- session.delete(slice)
- session.delete(table)
- session.delete(database)
- session.commit()
- # TODO test slice permission
- @patch("superset.security.manager.g")
- def test_schemas_accessible_by_user_admin(self, mock_g):
- mock_g.user = security_manager.find_user("admin")
- with self.client.application.test_request_context():
- database = get_example_database()
- schemas = security_manager.schemas_accessible_by_user(
- database, ["1", "2", "3"]
- )
- self.assertEquals(schemas, ["1", "2", "3"]) # no changes
- @patch("superset.security.manager.g")
- def test_schemas_accessible_by_user_schema_access(self, mock_g):
- # User has schema access to the schema 1
- create_schema_perm("[examples].[1]")
- mock_g.user = security_manager.find_user("gamma")
- with self.client.application.test_request_context():
- database = get_example_database()
- schemas = security_manager.schemas_accessible_by_user(
- database, ["1", "2", "3"]
- )
- # temp_schema is not passed in the params
- self.assertEquals(schemas, ["1"])
- delete_schema_perm("[examples].[1]")
- @patch("superset.security.manager.g")
- def test_schemas_accessible_by_user_datasource_access(self, mock_g):
- # User has schema access to the datasource temp_schema.wb_health_population in examples DB.
- mock_g.user = security_manager.find_user("gamma")
- with self.client.application.test_request_context():
- database = get_example_database()
- schemas = security_manager.schemas_accessible_by_user(
- database, ["temp_schema", "2", "3"]
- )
- self.assertEquals(schemas, ["temp_schema"])
- @patch("superset.security.manager.g")
- def test_schemas_accessible_by_user_datasource_and_schema_access(self, mock_g):
- # User has schema access to the datasource temp_schema.wb_health_population in examples DB.
- create_schema_perm("[examples].[2]")
- mock_g.user = security_manager.find_user("gamma")
- with self.client.application.test_request_context():
- database = get_example_database()
- schemas = security_manager.schemas_accessible_by_user(
- database, ["temp_schema", "2", "3"]
- )
- self.assertEquals(schemas, ["temp_schema", "2"])
- vm = security_manager.find_permission_view_menu(
- "schema_access", "[examples].[2]"
- )
- self.assertIsNotNone(vm)
- delete_schema_perm("[examples].[2]")
- def test_gamma_user_schema_access_to_dashboards(self):
- self.login(username="gamma")
- data = str(self.client.get("api/v1/dashboard/").data)
- self.assertIn("/superset/dashboard/world_health/", data)
- self.assertNotIn("/superset/dashboard/births/", data)
- def test_gamma_user_schema_access_to_tables(self):
- self.login(username="gamma")
- data = str(self.client.get("tablemodelview/list/").data)
- self.assertIn("wb_health_population", data)
- self.assertNotIn("birth_names", data)
- def test_gamma_user_schema_access_to_charts(self):
- self.login(username="gamma")
- data = str(self.client.get("api/v1/chart/").data)
- self.assertIn(
- "Life Expectancy VS Rural %", data
- ) # wb_health_population slice, has access
- self.assertIn(
- "Parallel Coordinates", data
- ) # wb_health_population slice, has access
- self.assertNotIn("Girl Name Cloud", data) # birth_names slice, no access
- def test_sqllab_gamma_user_schema_access_to_sqllab(self):
- session = db.session
- example_db = session.query(Database).filter_by(database_name="examples").one()
- example_db.expose_in_sqllab = True
- session.commit()
- arguments = {
- "keys": ["none"],
- "filters": [{"col": "expose_in_sqllab", "opr": "eq", "value": True}],
- "order_columns": "database_name",
- "order_direction": "asc",
- "page": 0,
- "page_size": -1,
- }
- NEW_FLASK_GET_SQL_DBS_REQUEST = f"/api/v1/database/?q={prison.dumps(arguments)}"
- self.login(username="gamma")
- databases_json = self.client.get(NEW_FLASK_GET_SQL_DBS_REQUEST).json
- self.assertEquals(databases_json["count"], 1)
- self.logout()
- def assert_can_read(self, view_menu, permissions_set):
- self.assertIn(("can_list", view_menu), permissions_set)
- def assert_can_write(self, view_menu, permissions_set):
- self.assertIn(("can_add", view_menu), permissions_set)
- self.assertIn(("can_delete", view_menu), permissions_set)
- self.assertIn(("can_edit", view_menu), permissions_set)
- def assert_cannot_write(self, view_menu, permissions_set):
- self.assertNotIn(("can_add", view_menu), permissions_set)
- self.assertNotIn(("can_delete", view_menu), permissions_set)
- self.assertNotIn(("can_edit", view_menu), permissions_set)
- self.assertNotIn(("can_save", view_menu), permissions_set)
- def assert_can_all(self, view_menu, permissions_set):
- self.assert_can_read(view_menu, permissions_set)
- self.assert_can_write(view_menu, permissions_set)
- def assert_can_gamma(self, perm_set):
- self.assert_can_read("TableModelView", perm_set)
- # make sure that user can create slices and dashboards
- self.assert_can_all("SliceModelView", perm_set)
- self.assert_can_all("DashboardModelView", perm_set)
- self.assertIn(("can_add_slices", "Superset"), perm_set)
- self.assertIn(("can_copy_dash", "Superset"), perm_set)
- self.assertIn(("can_created_dashboards", "Superset"), perm_set)
- self.assertIn(("can_created_slices", "Superset"), perm_set)
- self.assertIn(("can_csv", "Superset"), perm_set)
- self.assertIn(("can_dashboard", "Superset"), perm_set)
- self.assertIn(("can_explore", "Superset"), perm_set)
- self.assertIn(("can_explore_json", "Superset"), perm_set)
- self.assertIn(("can_fave_dashboards", "Superset"), perm_set)
- self.assertIn(("can_fave_slices", "Superset"), perm_set)
- self.assertIn(("can_save_dash", "Superset"), perm_set)
- self.assertIn(("can_slice", "Superset"), perm_set)
- self.assertIn(("can_explore", "Superset"), perm_set)
- self.assertIn(("can_explore_json", "Superset"), perm_set)
- self.assertIn(("can_userinfo", "UserDBModelView"), perm_set)
- def assert_can_alpha(self, perm_set):
- self.assert_can_all("TableModelView", perm_set)
- self.assertIn(("all_datasource_access", "all_datasource_access"), perm_set)
- def assert_cannot_alpha(self, perm_set):
- if app.config["ENABLE_ACCESS_REQUEST"]:
- self.assert_cannot_write("AccessRequestsModelView", perm_set)
- self.assert_can_all("AccessRequestsModelView", perm_set)
- self.assert_cannot_write("Queries", perm_set)
- self.assert_cannot_write("RoleModelView", perm_set)
- self.assert_cannot_write("UserDBModelView", perm_set)
- def assert_can_admin(self, perm_set):
- self.assert_can_all("DatabaseView", perm_set)
- self.assert_can_all("RoleModelView", perm_set)
- self.assert_can_all("UserDBModelView", perm_set)
- self.assertIn(("all_database_access", "all_database_access"), perm_set)
- self.assertIn(("can_override_role_permissions", "Superset"), perm_set)
- self.assertIn(("can_sync_druid_source", "Superset"), perm_set)
- self.assertIn(("can_override_role_permissions", "Superset"), perm_set)
- self.assertIn(("can_approve", "Superset"), perm_set)
- def test_is_admin_only(self):
- self.assertFalse(
- security_manager._is_admin_only(
- security_manager.find_permission_view_menu("can_list", "TableModelView")
- )
- )
- self.assertFalse(
- security_manager._is_admin_only(
- security_manager.find_permission_view_menu(
- "all_datasource_access", "all_datasource_access"
- )
- )
- )
- log_permissions = ["can_list", "can_show"]
- for log_permission in log_permissions:
- self.assertTrue(
- security_manager._is_admin_only(
- security_manager.find_permission_view_menu(
- log_permission, "LogModelView"
- )
- )
- )
- if app.config["ENABLE_ACCESS_REQUEST"]:
- self.assertTrue(
- security_manager._is_admin_only(
- security_manager.find_permission_view_menu(
- "can_list", "AccessRequestsModelView"
- )
- )
- )
- self.assertTrue(
- security_manager._is_admin_only(
- security_manager.find_permission_view_menu(
- "can_edit", "UserDBModelView"
- )
- )
- )
- self.assertTrue(
- security_manager._is_admin_only(
- security_manager.find_permission_view_menu("can_approve", "Superset")
- )
- )
- @unittest.skipUnless(
- SupersetTestCase.is_module_installed("pydruid"), "pydruid not installed"
- )
- def test_is_alpha_only(self):
- self.assertFalse(
- security_manager._is_alpha_only(
- security_manager.find_permission_view_menu("can_list", "TableModelView")
- )
- )
- self.assertTrue(
- security_manager._is_alpha_only(
- security_manager.find_permission_view_menu(
- "muldelete", "TableModelView"
- )
- )
- )
- self.assertTrue(
- security_manager._is_alpha_only(
- security_manager.find_permission_view_menu(
- "all_datasource_access", "all_datasource_access"
- )
- )
- )
- self.assertTrue(
- security_manager._is_alpha_only(
- security_manager.find_permission_view_menu(
- "all_database_access", "all_database_access"
- )
- )
- )
- def test_is_gamma_pvm(self):
- self.assertTrue(
- security_manager._is_gamma_pvm(
- security_manager.find_permission_view_menu("can_list", "TableModelView")
- )
- )
- def test_gamma_permissions_basic(self):
- self.assert_can_gamma(get_perm_tuples("Gamma"))
- self.assert_cannot_alpha(get_perm_tuples("Alpha"))
- @unittest.skipUnless(
- SupersetTestCase.is_module_installed("pydruid"), "pydruid not installed"
- )
- def test_alpha_permissions(self):
- alpha_perm_tuples = get_perm_tuples("Alpha")
- self.assert_can_gamma(alpha_perm_tuples)
- self.assert_can_alpha(alpha_perm_tuples)
- self.assert_cannot_alpha(alpha_perm_tuples)
- @unittest.skipUnless(
- SupersetTestCase.is_module_installed("pydruid"), "pydruid not installed"
- )
- def test_admin_permissions(self):
- self.assert_can_gamma(get_perm_tuples("Admin"))
- self.assert_can_alpha(get_perm_tuples("Admin"))
- self.assert_can_admin(get_perm_tuples("Admin"))
- def test_sql_lab_permissions(self):
- sql_lab_set = get_perm_tuples("sql_lab")
- self.assertIn(("can_sql_json", "Superset"), sql_lab_set)
- self.assertIn(("can_csv", "Superset"), sql_lab_set)
- self.assertIn(("can_search_queries", "Superset"), sql_lab_set)
- self.assert_cannot_alpha(sql_lab_set)
- def test_granter_permissions(self):
- granter_set = get_perm_tuples("granter")
- self.assertIn(("can_override_role_permissions", "Superset"), granter_set)
- self.assertIn(("can_approve", "Superset"), granter_set)
- self.assert_cannot_alpha(granter_set)
- def test_gamma_permissions(self):
- def assert_can_read(view_menu):
- self.assertIn(("can_list", view_menu), gamma_perm_set)
- def assert_can_write(view_menu):
- self.assertIn(("can_add", view_menu), gamma_perm_set)
- self.assertIn(("can_delete", view_menu), gamma_perm_set)
- self.assertIn(("can_edit", view_menu), gamma_perm_set)
- def assert_cannot_write(view_menu):
- self.assertNotIn(("can_add", view_menu), gamma_perm_set)
- self.assertNotIn(("can_delete", view_menu), gamma_perm_set)
- self.assertNotIn(("can_edit", view_menu), gamma_perm_set)
- self.assertNotIn(("can_save", view_menu), gamma_perm_set)
- def assert_can_all(view_menu):
- assert_can_read(view_menu)
- assert_can_write(view_menu)
- gamma_perm_set = set()
- for perm in security_manager.find_role("Gamma").permissions:
- gamma_perm_set.add((perm.permission.name, perm.view_menu.name))
- # check read only perms
- assert_can_read("TableModelView")
- # make sure that user can create slices and dashboards
- assert_can_all("SliceModelView")
- assert_can_all("DashboardModelView")
- self.assertIn(("can_add_slices", "Superset"), gamma_perm_set)
- self.assertIn(("can_copy_dash", "Superset"), gamma_perm_set)
- self.assertIn(("can_created_dashboards", "Superset"), gamma_perm_set)
- self.assertIn(("can_created_slices", "Superset"), gamma_perm_set)
- self.assertIn(("can_csv", "Superset"), gamma_perm_set)
- self.assertIn(("can_dashboard", "Superset"), gamma_perm_set)
- self.assertIn(("can_explore", "Superset"), gamma_perm_set)
- self.assertIn(("can_explore_json", "Superset"), gamma_perm_set)
- self.assertIn(("can_fave_dashboards", "Superset"), gamma_perm_set)
- self.assertIn(("can_fave_slices", "Superset"), gamma_perm_set)
- self.assertIn(("can_save_dash", "Superset"), gamma_perm_set)
- self.assertIn(("can_slice", "Superset"), gamma_perm_set)
- self.assertIn(("can_userinfo", "UserDBModelView"), gamma_perm_set)
- def test_views_are_secured(self):
- """Preventing the addition of unsecured views without has_access decorator"""
- # These FAB views are secured in their body as opposed to by decorators
- method_whitelist = ("action", "action_post")
- # List of redirect & other benign views
- views_whitelist = [
- ["MyIndexView", "index"],
- ["UtilView", "back"],
- ["LocaleView", "index"],
- ["AuthDBView", "login"],
- ["AuthDBView", "logout"],
- ["R", "index"],
- ["Superset", "log"],
- ["Superset", "theme"],
- ["Superset", "welcome"],
- ["SecurityApi", "login"],
- ["SecurityApi", "refresh"],
- ["SupersetIndexView", "index"],
- ]
- unsecured_views = []
- for view_class in appbuilder.baseviews:
- class_name = view_class.__class__.__name__
- for name, value in inspect.getmembers(
- view_class, predicate=inspect.ismethod
- ):
- if (
- name not in method_whitelist
- and [class_name, name] not in views_whitelist
- and hasattr(value, "_urls")
- and not hasattr(value, "_permission_name")
- ):
- unsecured_views.append((class_name, name))
- if unsecured_views:
- view_str = "\n".join([str(v) for v in unsecured_views])
- raise Exception(f"Some views are not secured:\n{view_str}")
- class SecurityManagerTests(SupersetTestCase):
- """
- Testing the Security Manager.
- """
- @patch("superset.security.SupersetSecurityManager.datasource_access")
- def test_assert_datasource_permission(self, mock_datasource_access):
- datasource = self.get_datasource_mock()
- # Datasource with the "datasource_access" permission.
- mock_datasource_access.return_value = True
- security_manager.assert_datasource_permission(datasource)
- # Datasource without the "datasource_access" permission.
- mock_datasource_access.return_value = False
- with self.assertRaises(SupersetSecurityException):
- security_manager.assert_datasource_permission(datasource)
- @patch("superset.security.SupersetSecurityManager.datasource_access")
- def test_assert_query_context_permission(self, mock_datasource_access):
- query_context = Mock()
- query_context.datasource = self.get_datasource_mock()
- # Query context with the "datasource_access" permission.
- mock_datasource_access.return_value = True
- security_manager.assert_query_context_permission(query_context)
- # Query context without the "datasource_access" permission.
- mock_datasource_access.return_value = False
- with self.assertRaises(SupersetSecurityException):
- security_manager.assert_query_context_permission(query_context)
- @patch("superset.security.SupersetSecurityManager.datasource_access")
- def test_assert_viz_permission(self, mock_datasource_access):
- test_viz = viz.TableViz(self.get_datasource_mock(), form_data={})
- # Visualization with the "datasource_access" permission.
- mock_datasource_access.return_value = True
- security_manager.assert_viz_permission(test_viz)
- # Visualization without the "datasource_access" permission.
- mock_datasource_access.return_value = False
- with self.assertRaises(SupersetSecurityException):
- security_manager.assert_viz_permission(test_viz)
|