123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590 |
- # Licensed to the Apache Software Foundation (ASF) under one
- # or more contributor license agreements. See the NOTICE file
- # distributed with this work for additional information
- # regarding copyright ownership. The ASF licenses this file
- # to you under the Apache License, Version 2.0 (the
- # "License"); you may not use this file except in compliance
- # with the License. You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing,
- # software distributed under the License is distributed on an
- # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- # KIND, either express or implied. See the License for the
- # specific language governing permissions and limitations
- # under the License.
- # isort:skip_file
- """Unit tests for Superset"""
- import json
- import unittest
- from datetime import datetime
- from unittest.mock import Mock, patch
- from tests.test_app import app
- from superset import db, security_manager
- from .base_tests import SupersetTestCase
- try:
- from superset.connectors.druid.models import (
- DruidCluster,
- DruidColumn,
- DruidDatasource,
- DruidMetric,
- )
- except ImportError:
- pass
- class PickableMock(Mock):
- def __reduce__(self):
- return (Mock, ())
- SEGMENT_METADATA = [
- {
- "id": "some_id",
- "intervals": ["2013-05-13T00:00:00.000Z/2013-05-14T00:00:00.000Z"],
- "columns": {
- "__time": {
- "type": "LONG",
- "hasMultipleValues": False,
- "size": 407240380,
- "cardinality": None,
- "errorMessage": None,
- },
- "dim1": {
- "type": "STRING",
- "hasMultipleValues": False,
- "size": 100000,
- "cardinality": 1944,
- "errorMessage": None,
- },
- "dim2": {
- "type": "STRING",
- "hasMultipleValues": True,
- "size": 100000,
- "cardinality": 1504,
- "errorMessage": None,
- },
- "metric1": {
- "type": "FLOAT",
- "hasMultipleValues": False,
- "size": 100000,
- "cardinality": None,
- "errorMessage": None,
- },
- },
- "aggregators": {
- "metric1": {"type": "longSum", "name": "metric1", "fieldName": "metric1"}
- },
- "size": 300000,
- "numRows": 5000000,
- }
- ]
- GB_RESULT_SET = [
- {
- "version": "v1",
- "timestamp": "2012-01-01T00:00:00.000Z",
- "event": {"dim1": "Canada", "dim2": "boy", "count": 12345678},
- },
- {
- "version": "v1",
- "timestamp": "2012-01-01T00:00:00.000Z",
- "event": {"dim1": "USA", "dim2": "girl", "count": 12345678 / 2},
- },
- ]
- DruidCluster.get_druid_version = lambda _: "0.9.1" # type: ignore
- class DruidTests(SupersetTestCase):
- """Testing interactions with Druid"""
- @classmethod
- def setUpClass(cls):
- cls.create_druid_test_objects()
- def get_test_cluster_obj(self):
- return DruidCluster(
- cluster_name="test_cluster",
- broker_host="localhost",
- broker_port=7980,
- broker_endpoint="druid/v2",
- metadata_last_refreshed=datetime.now(),
- )
- def get_cluster(self, PyDruid):
- instance = PyDruid.return_value
- instance.time_boundary.return_value = [{"result": {"maxTime": "2016-01-01"}}]
- instance.segment_metadata.return_value = SEGMENT_METADATA
- cluster = (
- db.session.query(DruidCluster)
- .filter_by(cluster_name="test_cluster")
- .first()
- )
- if cluster:
- for datasource in (
- db.session.query(DruidDatasource).filter_by(cluster_id=cluster.id).all()
- ):
- db.session.delete(datasource)
- db.session.delete(cluster)
- db.session.commit()
- cluster = self.get_test_cluster_obj()
- db.session.add(cluster)
- cluster.get_datasources = PickableMock(return_value=["test_datasource"])
- return cluster
- @unittest.skipUnless(
- SupersetTestCase.is_module_installed("pydruid"), "pydruid not installed"
- )
- @patch("superset.connectors.druid.models.PyDruid")
- def test_client(self, PyDruid):
- self.login(username="admin")
- cluster = self.get_cluster(PyDruid)
- cluster.refresh_datasources()
- cluster.refresh_datasources(merge_flag=True)
- datasource_id = cluster.datasources[0].id
- db.session.commit()
- nres = [
- list(v["event"].items()) + [("timestamp", v["timestamp"])]
- for v in GB_RESULT_SET
- ]
- nres = [dict(v) for v in nres]
- import pandas as pd
- df = pd.DataFrame(nres)
- instance = PyDruid.return_value
- instance.export_pandas.return_value = df
- instance.query_dict = {}
- instance.query_builder.last_query.query_dict = {}
- resp = self.get_resp("/superset/explore/druid/{}/".format(datasource_id))
- self.assertIn("test_datasource", resp)
- form_data = {
- "viz_type": "table",
- "granularity": "one+day",
- "druid_time_origin": "",
- "since": "7+days+ago",
- "until": "now",
- "row_limit": 5000,
- "include_search": "false",
- "metrics": ["count"],
- "groupby": ["dim1"],
- "force": "true",
- }
- # One groupby
- url = "/superset/explore_json/druid/{}/".format(datasource_id)
- resp = self.get_json_resp(url, {"form_data": json.dumps(form_data)})
- self.assertEqual("Canada", resp["data"]["records"][0]["dim1"])
- form_data = {
- "viz_type": "table",
- "granularity": "one+day",
- "druid_time_origin": "",
- "since": "7+days+ago",
- "until": "now",
- "row_limit": 5000,
- "include_search": "false",
- "metrics": ["count"],
- "groupby": ["dim1", "dim2"],
- "force": "true",
- }
- # two groupby
- url = "/superset/explore_json/druid/{}/".format(datasource_id)
- resp = self.get_json_resp(url, {"form_data": json.dumps(form_data)})
- self.assertEqual("Canada", resp["data"]["records"][0]["dim1"])
- @unittest.skipUnless(
- SupersetTestCase.is_module_installed("pydruid"), "pydruid not installed"
- )
- def test_druid_sync_from_config(self):
- CLUSTER_NAME = "new_druid"
- self.login()
- cluster = self.get_or_create(
- DruidCluster, {"cluster_name": CLUSTER_NAME}, db.session
- )
- db.session.merge(cluster)
- db.session.commit()
- ds = (
- db.session.query(DruidDatasource)
- .filter_by(datasource_name="test_click")
- .first()
- )
- if ds:
- db.session.delete(ds)
- db.session.commit()
- cfg = {
- "user": "admin",
- "cluster": CLUSTER_NAME,
- "config": {
- "name": "test_click",
- "dimensions": ["affiliate_id", "campaign", "first_seen"],
- "metrics_spec": [
- {"type": "count", "name": "count"},
- {"type": "sum", "name": "sum"},
- ],
- "batch_ingestion": {
- "sql": "SELECT * FROM clicks WHERE d='{{ ds }}'",
- "ts_column": "d",
- "sources": [{"table": "clicks", "partition": "d='{{ ds }}'"}],
- },
- },
- }
- def check():
- resp = self.client.post("/superset/sync_druid/", data=json.dumps(cfg))
- druid_ds = (
- db.session.query(DruidDatasource)
- .filter_by(datasource_name="test_click")
- .one()
- )
- col_names = set([c.column_name for c in druid_ds.columns])
- assert {"affiliate_id", "campaign", "first_seen"} == col_names
- metric_names = {m.metric_name for m in druid_ds.metrics}
- assert {"count", "sum"} == metric_names
- assert resp.status_code == 201
- check()
- # checking twice to make sure a second sync yields the same results
- check()
- # datasource exists, add new metrics and dimensions
- cfg = {
- "user": "admin",
- "cluster": CLUSTER_NAME,
- "config": {
- "name": "test_click",
- "dimensions": ["affiliate_id", "second_seen"],
- "metrics_spec": [
- {"type": "bla", "name": "sum"},
- {"type": "unique", "name": "unique"},
- ],
- },
- }
- resp = self.client.post("/superset/sync_druid/", data=json.dumps(cfg))
- druid_ds = (
- db.session.query(DruidDatasource)
- .filter_by(datasource_name="test_click")
- .one()
- )
- # columns and metrics are not deleted if config is changed as
- # user could define his own dimensions / metrics and want to keep them
- assert set([c.column_name for c in druid_ds.columns]) == set(
- ["affiliate_id", "campaign", "first_seen", "second_seen"]
- )
- assert set([m.metric_name for m in druid_ds.metrics]) == set(
- ["count", "sum", "unique"]
- )
- # metric type will not be overridden, sum stays instead of bla
- assert set([m.metric_type for m in druid_ds.metrics]) == set(
- ["longSum", "sum", "unique"]
- )
- assert resp.status_code == 201
- @unittest.skipUnless(
- SupersetTestCase.is_module_installed("pydruid"), "pydruid not installed"
- )
- @unittest.skipUnless(app.config["DRUID_IS_ACTIVE"], "DRUID_IS_ACTIVE is false")
- def test_filter_druid_datasource(self):
- CLUSTER_NAME = "new_druid"
- cluster = self.get_or_create(
- DruidCluster, {"cluster_name": CLUSTER_NAME}, db.session
- )
- db.session.merge(cluster)
- gamma_ds = self.get_or_create(
- DruidDatasource,
- {"datasource_name": "datasource_for_gamma", "cluster": cluster},
- db.session,
- )
- gamma_ds.cluster = cluster
- db.session.merge(gamma_ds)
- no_gamma_ds = self.get_or_create(
- DruidDatasource,
- {"datasource_name": "datasource_not_for_gamma", "cluster": cluster},
- db.session,
- )
- no_gamma_ds.cluster = cluster
- db.session.merge(no_gamma_ds)
- db.session.commit()
- security_manager.add_permission_view_menu("datasource_access", gamma_ds.perm)
- security_manager.add_permission_view_menu("datasource_access", no_gamma_ds.perm)
- perm = security_manager.find_permission_view_menu(
- "datasource_access", gamma_ds.get_perm()
- )
- security_manager.add_permission_role(security_manager.find_role("Gamma"), perm)
- security_manager.get_session.commit()
- self.login(username="gamma")
- url = "/druiddatasourcemodelview/list/"
- resp = self.get_resp(url)
- self.assertIn("datasource_for_gamma", resp)
- self.assertNotIn("datasource_not_for_gamma", resp)
- @unittest.skipUnless(
- SupersetTestCase.is_module_installed("pydruid"), "pydruid not installed"
- )
- @patch("superset.connectors.druid.models.PyDruid")
- def test_sync_druid_perm(self, PyDruid):
- self.login(username="admin")
- instance = PyDruid.return_value
- instance.time_boundary.return_value = [{"result": {"maxTime": "2016-01-01"}}]
- instance.segment_metadata.return_value = SEGMENT_METADATA
- cluster = (
- db.session.query(DruidCluster)
- .filter_by(cluster_name="test_cluster")
- .first()
- )
- if cluster:
- for datasource in (
- db.session.query(DruidDatasource).filter_by(cluster_id=cluster.id).all()
- ):
- db.session.delete(datasource)
- db.session.delete(cluster)
- db.session.commit()
- cluster = DruidCluster(
- cluster_name="test_cluster",
- broker_host="localhost",
- broker_port=7980,
- metadata_last_refreshed=datetime.now(),
- )
- db.session.add(cluster)
- cluster.get_datasources = PickableMock(return_value=["test_datasource"])
- cluster.refresh_datasources()
- cluster.datasources[0].merge_flag = True
- metadata = cluster.datasources[0].latest_metadata()
- self.assertEqual(len(metadata), 4)
- db.session.commit()
- view_menu_name = cluster.datasources[0].get_perm()
- view_menu = security_manager.find_view_menu(view_menu_name)
- permission = security_manager.find_permission("datasource_access")
- pv = (
- security_manager.get_session.query(security_manager.permissionview_model)
- .filter_by(permission=permission, view_menu=view_menu)
- .first()
- )
- assert pv is not None
- @unittest.skipUnless(
- SupersetTestCase.is_module_installed("pydruid"), "pydruid not installed"
- )
- @patch("superset.connectors.druid.models.PyDruid")
- def test_refresh_metadata(self, PyDruid):
- self.login(username="admin")
- cluster = self.get_cluster(PyDruid)
- cluster.refresh_datasources()
- datasource = cluster.datasources[0]
- cols = db.session.query(DruidColumn).filter(
- DruidColumn.datasource_id == datasource.id
- )
- for col in cols:
- self.assertIn(col.column_name, SEGMENT_METADATA[0]["columns"].keys())
- metrics = (
- db.session.query(DruidMetric)
- .filter(DruidMetric.datasource_id == datasource.id)
- .filter(DruidMetric.metric_name.like("%__metric1"))
- )
- for metric in metrics:
- agg, _ = metric.metric_name.split("__")
- self.assertEqual(
- json.loads(metric.json)["type"], "double{}".format(agg.capitalize())
- )
- @unittest.skipUnless(
- SupersetTestCase.is_module_installed("pydruid"), "pydruid not installed"
- )
- @patch("superset.connectors.druid.models.PyDruid")
- def test_refresh_metadata_augment_type(self, PyDruid):
- self.login(username="admin")
- cluster = self.get_cluster(PyDruid)
- cluster.refresh_datasources()
- metadata = SEGMENT_METADATA[:]
- metadata[0]["columns"]["metric1"]["type"] = "LONG"
- instance = PyDruid.return_value
- instance.segment_metadata.return_value = metadata
- cluster.refresh_datasources()
- datasource = cluster.datasources[0]
- column = (
- db.session.query(DruidColumn)
- .filter(DruidColumn.datasource_id == datasource.id)
- .filter(DruidColumn.column_name == "metric1")
- ).one()
- self.assertEqual(column.type, "LONG")
- metrics = (
- db.session.query(DruidMetric)
- .filter(DruidMetric.datasource_id == datasource.id)
- .filter(DruidMetric.metric_name.like("%__metric1"))
- )
- for metric in metrics:
- agg, _ = metric.metric_name.split("__")
- self.assertEqual(metric.json_obj["type"], "long{}".format(agg.capitalize()))
- @unittest.skipUnless(
- SupersetTestCase.is_module_installed("pydruid"), "pydruid not installed"
- )
- @patch("superset.connectors.druid.models.PyDruid")
- def test_refresh_metadata_augment_verbose_name(self, PyDruid):
- self.login(username="admin")
- cluster = self.get_cluster(PyDruid)
- cluster.refresh_datasources()
- datasource = cluster.datasources[0]
- metrics = (
- db.session.query(DruidMetric)
- .filter(DruidMetric.datasource_id == datasource.id)
- .filter(DruidMetric.metric_name.like("%__metric1"))
- )
- for metric in metrics:
- metric.verbose_name = metric.metric_name
- db.session.commit()
- # The verbose name should not change during a refresh.
- cluster.refresh_datasources()
- datasource = cluster.datasources[0]
- metrics = (
- db.session.query(DruidMetric)
- .filter(DruidMetric.datasource_id == datasource.id)
- .filter(DruidMetric.metric_name.like("%__metric1"))
- )
- for metric in metrics:
- self.assertEqual(metric.verbose_name, metric.metric_name)
- @unittest.skipUnless(
- SupersetTestCase.is_module_installed("pydruid"), "pydruid not installed"
- )
- def test_urls(self):
- cluster = self.get_test_cluster_obj()
- self.assertEqual(
- cluster.get_base_url("localhost", "9999"), "http://localhost:9999"
- )
- self.assertEqual(
- cluster.get_base_url("http://localhost", "9999"), "http://localhost:9999"
- )
- self.assertEqual(
- cluster.get_base_url("https://localhost", "9999"), "https://localhost:9999"
- )
- self.assertEqual(
- cluster.get_base_broker_url(), "http://localhost:7980/druid/v2"
- )
- @unittest.skipUnless(
- SupersetTestCase.is_module_installed("pydruid"), "pydruid not installed"
- )
- @patch("superset.connectors.druid.models.PyDruid")
- def test_druid_time_granularities(self, PyDruid):
- self.login(username="admin")
- cluster = self.get_cluster(PyDruid)
- cluster.refresh_datasources()
- cluster.refresh_datasources(merge_flag=True)
- datasource_id = cluster.datasources[0].id
- db.session.commit()
- nres = [
- list(v["event"].items()) + [("timestamp", v["timestamp"])]
- for v in GB_RESULT_SET
- ]
- nres = [dict(v) for v in nres]
- import pandas as pd
- df = pd.DataFrame(nres)
- instance = PyDruid.return_value
- instance.export_pandas.return_value = df
- instance.query_dict = {}
- instance.query_builder.last_query.query_dict = {}
- form_data = {
- "viz_type": "table",
- "since": "7+days+ago",
- "until": "now",
- "metrics": ["count"],
- "groupby": [],
- "include_time": "true",
- }
- granularity_map = {
- "5 seconds": "PT5S",
- "30 seconds": "PT30S",
- "1 minute": "PT1M",
- "5 minutes": "PT5M",
- "1 hour": "PT1H",
- "6 hour": "PT6H",
- "one day": "P1D",
- "1 day": "P1D",
- "7 days": "P7D",
- "week": "P1W",
- "week_starting_sunday": "P1W",
- "week_ending_saturday": "P1W",
- "month": "P1M",
- "quarter": "P3M",
- "year": "P1Y",
- }
- url = "/superset/explore_json/druid/{}/".format(datasource_id)
- for granularity_mapping in granularity_map:
- form_data["granularity"] = granularity_mapping
- self.get_json_resp(url, {"form_data": json.dumps(form_data)})
- self.assertEqual(
- granularity_map[granularity_mapping],
- instance.timeseries.call_args[1]["granularity"]["period"],
- )
- @unittest.skipUnless(
- SupersetTestCase.is_module_installed("pydruid"), "pydruid not installed"
- )
- @patch("superset.connectors.druid.models.PyDruid")
- def test_external_metadata(self, PyDruid):
- self.login(username="admin")
- self.login(username="admin")
- cluster = self.get_cluster(PyDruid)
- cluster.refresh_datasources()
- datasource = cluster.datasources[0]
- url = "/datasource/external_metadata/druid/{}/".format(datasource.id)
- resp = self.get_json_resp(url)
- col_names = {o.get("name") for o in resp}
- self.assertEqual(col_names, {"__time", "dim1", "dim2", "metric1"})
- if __name__ == "__main__":
- unittest.main()
|