models.py 58 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600
  1. # pylint: disable=C,R,W
  2. # pylint: disable=invalid-unary-operand-type
  3. from collections import OrderedDict
  4. from copy import deepcopy
  5. from datetime import datetime, timedelta
  6. from distutils.version import LooseVersion
  7. import json
  8. import logging
  9. from multiprocessing.pool import ThreadPool
  10. import re
  11. from dateutil.parser import parse as dparse
  12. from flask import escape, Markup
  13. from flask_appbuilder import Model
  14. from flask_appbuilder.models.decorators import renders
  15. from flask_babel import lazy_gettext as _
  16. import pandas
  17. from pydruid.client import PyDruid
  18. from pydruid.utils.aggregators import count
  19. from pydruid.utils.dimensions import MapLookupExtraction, RegexExtraction
  20. from pydruid.utils.filters import Dimension, Filter
  21. from pydruid.utils.having import Aggregation
  22. from pydruid.utils.postaggregator import (
  23. Const, Field, HyperUniqueCardinality, Postaggregator, Quantile, Quantiles,
  24. )
  25. import requests
  26. import sqlalchemy as sa
  27. from sqlalchemy import (
  28. Boolean, Column, DateTime, ForeignKey, Integer, String, Text, UniqueConstraint,
  29. )
  30. from sqlalchemy.orm import backref, relationship
  31. from superset import conf, db, import_util, security_manager, utils
  32. from superset.connectors.base.models import BaseColumn, BaseDatasource, BaseMetric
  33. from superset.exceptions import MetricPermException, SupersetException
  34. from superset.models.helpers import (
  35. AuditMixinNullable, ImportMixin, QueryResult,
  36. )
  37. from superset.utils import (
  38. DimSelector, DTTM_ALIAS, flasher,
  39. )
  40. DRUID_TZ = conf.get('DRUID_TZ')
  41. POST_AGG_TYPE = 'postagg'
  42. # Function wrapper because bound methods cannot
  43. # be passed to processes
  44. def _fetch_metadata_for(datasource):
  45. return datasource.latest_metadata()
  46. class JavascriptPostAggregator(Postaggregator):
  47. def __init__(self, name, field_names, function):
  48. self.post_aggregator = {
  49. 'type': 'javascript',
  50. 'fieldNames': field_names,
  51. 'name': name,
  52. 'function': function,
  53. }
  54. self.name = name
  55. class CustomPostAggregator(Postaggregator):
  56. """A way to allow users to specify completely custom PostAggregators"""
  57. def __init__(self, name, post_aggregator):
  58. self.name = name
  59. self.post_aggregator = post_aggregator
  60. class DruidCluster(Model, AuditMixinNullable, ImportMixin):
  61. """ORM object referencing the Druid clusters"""
  62. __tablename__ = 'clusters'
  63. type = 'druid'
  64. id = Column(Integer, primary_key=True)
  65. verbose_name = Column(String(250), unique=True)
  66. # short unique name, used in permissions
  67. cluster_name = Column(String(250), unique=True)
  68. coordinator_host = Column(String(255))
  69. coordinator_port = Column(Integer, default=8081)
  70. coordinator_endpoint = Column(
  71. String(255), default='druid/coordinator/v1/metadata')
  72. broker_host = Column(String(255))
  73. broker_port = Column(Integer, default=8082)
  74. broker_endpoint = Column(String(255), default='druid/v2')
  75. metadata_last_refreshed = Column(DateTime)
  76. cache_timeout = Column(Integer)
  77. export_fields = ('cluster_name', 'coordinator_host', 'coordinator_port',
  78. 'coordinator_endpoint', 'broker_host', 'broker_port',
  79. 'broker_endpoint', 'cache_timeout')
  80. update_from_object_fields = export_fields
  81. export_children = ['datasources']
  82. def __repr__(self):
  83. return self.verbose_name if self.verbose_name else self.cluster_name
  84. def __html__(self):
  85. return self.__repr__()
  86. @property
  87. def data(self):
  88. return {
  89. 'id': self.id,
  90. 'name': self.cluster_name,
  91. 'backend': 'druid',
  92. }
  93. @staticmethod
  94. def get_base_url(host, port):
  95. if not re.match('http(s)?://', host):
  96. host = 'http://' + host
  97. url = '{0}:{1}'.format(host, port) if port else host
  98. return url
  99. def get_base_broker_url(self):
  100. base_url = self.get_base_url(
  101. self.broker_host, self.broker_port)
  102. return '{base_url}/{self.broker_endpoint}'.format(**locals())
  103. def get_pydruid_client(self):
  104. cli = PyDruid(
  105. self.get_base_url(self.broker_host, self.broker_port),
  106. self.broker_endpoint)
  107. return cli
  108. def get_datasources(self):
  109. endpoint = self.get_base_broker_url() + '/datasources'
  110. return json.loads(requests.get(endpoint).text)
  111. def get_druid_version(self):
  112. endpoint = self.get_base_url(
  113. self.coordinator_host, self.coordinator_port) + '/status'
  114. return json.loads(requests.get(endpoint).text)['version']
  115. @property
  116. @utils.memoized
  117. def druid_version(self):
  118. return self.get_druid_version()
  119. def refresh_datasources(
  120. self,
  121. datasource_name=None,
  122. merge_flag=True,
  123. refreshAll=True):
  124. """Refresh metadata of all datasources in the cluster
  125. If ``datasource_name`` is specified, only that datasource is updated
  126. """
  127. ds_list = self.get_datasources()
  128. blacklist = conf.get('DRUID_DATA_SOURCE_BLACKLIST', [])
  129. ds_refresh = []
  130. if not datasource_name:
  131. ds_refresh = list(filter(lambda ds: ds not in blacklist, ds_list))
  132. elif datasource_name not in blacklist and datasource_name in ds_list:
  133. ds_refresh.append(datasource_name)
  134. else:
  135. return
  136. self.refresh(ds_refresh, merge_flag, refreshAll)
  137. def refresh(self, datasource_names, merge_flag, refreshAll):
  138. """
  139. Fetches metadata for the specified datasources and
  140. merges to the Superset database
  141. """
  142. session = db.session
  143. ds_list = (
  144. session.query(DruidDatasource)
  145. .filter(DruidDatasource.cluster_name == self.cluster_name)
  146. .filter(DruidDatasource.datasource_name.in_(datasource_names))
  147. )
  148. ds_map = {ds.name: ds for ds in ds_list}
  149. for ds_name in datasource_names:
  150. datasource = ds_map.get(ds_name, None)
  151. if not datasource:
  152. datasource = DruidDatasource(datasource_name=ds_name)
  153. with session.no_autoflush:
  154. session.add(datasource)
  155. flasher(
  156. _('Adding new datasource [{}]').format(ds_name), 'success')
  157. ds_map[ds_name] = datasource
  158. elif refreshAll:
  159. flasher(
  160. _('Refreshing datasource [{}]').format(ds_name), 'info')
  161. else:
  162. del ds_map[ds_name]
  163. continue
  164. datasource.cluster = self
  165. datasource.merge_flag = merge_flag
  166. session.flush()
  167. # Prepare multithreaded executation
  168. pool = ThreadPool()
  169. ds_refresh = list(ds_map.values())
  170. metadata = pool.map(_fetch_metadata_for, ds_refresh)
  171. pool.close()
  172. pool.join()
  173. for i in range(0, len(ds_refresh)):
  174. datasource = ds_refresh[i]
  175. cols = metadata[i]
  176. if cols:
  177. col_objs_list = (
  178. session.query(DruidColumn)
  179. .filter(DruidColumn.datasource_id == datasource.id)
  180. .filter(DruidColumn.column_name.in_(cols.keys()))
  181. )
  182. col_objs = {col.column_name: col for col in col_objs_list}
  183. for col in cols:
  184. if col == '__time': # skip the time column
  185. continue
  186. col_obj = col_objs.get(col)
  187. if not col_obj:
  188. col_obj = DruidColumn(
  189. datasource_id=datasource.id,
  190. column_name=col)
  191. with session.no_autoflush:
  192. session.add(col_obj)
  193. col_obj.type = cols[col]['type']
  194. col_obj.datasource = datasource
  195. if col_obj.type == 'STRING':
  196. col_obj.groupby = True
  197. col_obj.filterable = True
  198. if col_obj.type == 'hyperUnique' or col_obj.type == 'thetaSketch':
  199. col_obj.count_distinct = True
  200. if col_obj.is_num:
  201. col_obj.sum = True
  202. col_obj.min = True
  203. col_obj.max = True
  204. datasource.refresh_metrics()
  205. session.commit()
  206. @property
  207. def perm(self):
  208. return '[{obj.cluster_name}].(id:{obj.id})'.format(obj=self)
  209. def get_perm(self):
  210. return self.perm
  211. @property
  212. def name(self):
  213. return self.verbose_name if self.verbose_name else self.cluster_name
  214. @property
  215. def unique_name(self):
  216. return self.verbose_name if self.verbose_name else self.cluster_name
  217. class DruidColumn(Model, BaseColumn):
  218. """ORM model for storing Druid datasource column metadata"""
  219. __tablename__ = 'columns'
  220. __table_args__ = (UniqueConstraint('column_name', 'datasource_id'),)
  221. datasource_id = Column(
  222. Integer,
  223. ForeignKey('datasources.id'))
  224. # Setting enable_typechecks=False disables polymorphic inheritance.
  225. datasource = relationship(
  226. 'DruidDatasource',
  227. backref=backref('columns', cascade='all, delete-orphan'),
  228. enable_typechecks=False)
  229. dimension_spec_json = Column(Text)
  230. export_fields = (
  231. 'datasource_id', 'column_name', 'is_active', 'type', 'groupby',
  232. 'count_distinct', 'sum', 'avg', 'max', 'min', 'filterable',
  233. 'description', 'dimension_spec_json', 'verbose_name',
  234. )
  235. update_from_object_fields = export_fields
  236. export_parent = 'datasource'
  237. def __repr__(self):
  238. return self.column_name
  239. @property
  240. def expression(self):
  241. return self.dimension_spec_json
  242. @property
  243. def dimension_spec(self):
  244. if self.dimension_spec_json:
  245. return json.loads(self.dimension_spec_json)
  246. def get_metrics(self):
  247. metrics = {}
  248. metrics['count'] = DruidMetric(
  249. metric_name='count',
  250. verbose_name='COUNT(*)',
  251. metric_type='count',
  252. json=json.dumps({'type': 'count', 'name': 'count'}),
  253. )
  254. # Somehow we need to reassign this for UDAFs
  255. if self.type in ('DOUBLE', 'FLOAT'):
  256. corrected_type = 'DOUBLE'
  257. else:
  258. corrected_type = self.type
  259. if self.sum and self.is_num:
  260. mt = corrected_type.lower() + 'Sum'
  261. name = 'sum__' + self.column_name
  262. metrics[name] = DruidMetric(
  263. metric_name=name,
  264. metric_type='sum',
  265. verbose_name='SUM({})'.format(self.column_name),
  266. json=json.dumps({
  267. 'type': mt, 'name': name, 'fieldName': self.column_name}),
  268. )
  269. if self.avg and self.is_num:
  270. mt = corrected_type.lower() + 'Avg'
  271. name = 'avg__' + self.column_name
  272. metrics[name] = DruidMetric(
  273. metric_name=name,
  274. metric_type='avg',
  275. verbose_name='AVG({})'.format(self.column_name),
  276. json=json.dumps({
  277. 'type': mt, 'name': name, 'fieldName': self.column_name}),
  278. )
  279. if self.min and self.is_num:
  280. mt = corrected_type.lower() + 'Min'
  281. name = 'min__' + self.column_name
  282. metrics[name] = DruidMetric(
  283. metric_name=name,
  284. metric_type='min',
  285. verbose_name='MIN({})'.format(self.column_name),
  286. json=json.dumps({
  287. 'type': mt, 'name': name, 'fieldName': self.column_name}),
  288. )
  289. if self.max and self.is_num:
  290. mt = corrected_type.lower() + 'Max'
  291. name = 'max__' + self.column_name
  292. metrics[name] = DruidMetric(
  293. metric_name=name,
  294. metric_type='max',
  295. verbose_name='MAX({})'.format(self.column_name),
  296. json=json.dumps({
  297. 'type': mt, 'name': name, 'fieldName': self.column_name}),
  298. )
  299. if self.count_distinct:
  300. name = 'count_distinct__' + self.column_name
  301. if self.type == 'hyperUnique' or self.type == 'thetaSketch':
  302. metrics[name] = DruidMetric(
  303. metric_name=name,
  304. verbose_name='COUNT(DISTINCT {})'.format(self.column_name),
  305. metric_type=self.type,
  306. json=json.dumps({
  307. 'type': self.type,
  308. 'name': name,
  309. 'fieldName': self.column_name,
  310. }),
  311. )
  312. else:
  313. metrics[name] = DruidMetric(
  314. metric_name=name,
  315. verbose_name='COUNT(DISTINCT {})'.format(self.column_name),
  316. metric_type='count_distinct',
  317. json=json.dumps({
  318. 'type': 'cardinality',
  319. 'name': name,
  320. 'fieldNames': [self.column_name]}),
  321. )
  322. return metrics
  323. def refresh_metrics(self):
  324. """Refresh metrics based on the column metadata"""
  325. metrics = self.get_metrics()
  326. dbmetrics = (
  327. db.session.query(DruidMetric)
  328. .filter(DruidMetric.datasource_id == self.datasource_id)
  329. .filter(DruidMetric.metric_name.in_(metrics.keys()))
  330. )
  331. dbmetrics = {metric.metric_name: metric for metric in dbmetrics}
  332. for metric in metrics.values():
  333. dbmetric = dbmetrics.get(metric.metric_name)
  334. if dbmetric:
  335. for attr in ['json', 'metric_type']:
  336. setattr(dbmetric, attr, getattr(metric, attr))
  337. else:
  338. with db.session.no_autoflush:
  339. metric.datasource_id = self.datasource_id
  340. db.session.add(metric)
  341. @classmethod
  342. def import_obj(cls, i_column):
  343. def lookup_obj(lookup_column):
  344. return db.session.query(DruidColumn).filter(
  345. DruidColumn.datasource_id == lookup_column.datasource_id,
  346. DruidColumn.column_name == lookup_column.column_name).first()
  347. return import_util.import_simple_obj(db.session, i_column, lookup_obj)
  348. class DruidMetric(Model, BaseMetric):
  349. """ORM object referencing Druid metrics for a datasource"""
  350. __tablename__ = 'metrics'
  351. __table_args__ = (UniqueConstraint('metric_name', 'datasource_id'),)
  352. datasource_id = Column(
  353. Integer,
  354. ForeignKey('datasources.id'))
  355. # Setting enable_typechecks=False disables polymorphic inheritance.
  356. datasource = relationship(
  357. 'DruidDatasource',
  358. backref=backref('metrics', cascade='all, delete-orphan'),
  359. enable_typechecks=False)
  360. json = Column(Text)
  361. export_fields = (
  362. 'metric_name', 'verbose_name', 'metric_type', 'datasource_id',
  363. 'json', 'description', 'is_restricted', 'd3format', 'warning_text',
  364. )
  365. update_from_object_fields = export_fields
  366. export_parent = 'datasource'
  367. @property
  368. def expression(self):
  369. return self.json
  370. @property
  371. def json_obj(self):
  372. try:
  373. obj = json.loads(self.json)
  374. except Exception:
  375. obj = {}
  376. return obj
  377. @property
  378. def perm(self):
  379. return (
  380. '{parent_name}.[{obj.metric_name}](id:{obj.id})'
  381. ).format(obj=self,
  382. parent_name=self.datasource.full_name,
  383. ) if self.datasource else None
  384. @classmethod
  385. def import_obj(cls, i_metric):
  386. def lookup_obj(lookup_metric):
  387. return db.session.query(DruidMetric).filter(
  388. DruidMetric.datasource_id == lookup_metric.datasource_id,
  389. DruidMetric.metric_name == lookup_metric.metric_name).first()
  390. return import_util.import_simple_obj(db.session, i_metric, lookup_obj)
  391. class DruidDatasource(Model, BaseDatasource):
  392. """ORM object referencing Druid datasources (tables)"""
  393. __tablename__ = 'datasources'
  394. __table_args__ = (UniqueConstraint('datasource_name', 'cluster_name'),)
  395. type = 'druid'
  396. query_language = 'json'
  397. cluster_class = DruidCluster
  398. metric_class = DruidMetric
  399. column_class = DruidColumn
  400. baselink = 'druiddatasourcemodelview'
  401. # Columns
  402. datasource_name = Column(String(255))
  403. is_hidden = Column(Boolean, default=False)
  404. filter_select_enabled = Column(Boolean, default=True) # override default
  405. fetch_values_from = Column(String(100))
  406. cluster_name = Column(
  407. String(250), ForeignKey('clusters.cluster_name'))
  408. cluster = relationship(
  409. 'DruidCluster', backref='datasources', foreign_keys=[cluster_name])
  410. user_id = Column(Integer, ForeignKey('ab_user.id'))
  411. owner = relationship(
  412. security_manager.user_model,
  413. backref=backref('datasources', cascade='all, delete-orphan'),
  414. foreign_keys=[user_id])
  415. UniqueConstraint('cluster_name', 'datasource_name')
  416. export_fields = (
  417. 'datasource_name', 'is_hidden', 'description', 'default_endpoint',
  418. 'cluster_name', 'offset', 'cache_timeout', 'params',
  419. 'filter_select_enabled',
  420. )
  421. update_from_object_fields = export_fields
  422. export_parent = 'cluster'
  423. export_children = ['columns', 'metrics']
  424. @property
  425. def database(self):
  426. return self.cluster
  427. @property
  428. def connection(self):
  429. return str(self.database)
  430. @property
  431. def num_cols(self):
  432. return [c.column_name for c in self.columns if c.is_num]
  433. @property
  434. def name(self):
  435. return self.datasource_name
  436. @property
  437. def schema(self):
  438. ds_name = self.datasource_name or ''
  439. name_pieces = ds_name.split('.')
  440. if len(name_pieces) > 1:
  441. return name_pieces[0]
  442. else:
  443. return None
  444. @property
  445. def schema_perm(self):
  446. """Returns schema permission if present, cluster one otherwise."""
  447. return security_manager.get_schema_perm(self.cluster, self.schema)
  448. def get_perm(self):
  449. return (
  450. '[{obj.cluster_name}].[{obj.datasource_name}]'
  451. '(id:{obj.id})').format(obj=self)
  452. def update_from_object(self, obj):
  453. return NotImplementedError()
  454. @property
  455. def link(self):
  456. name = escape(self.datasource_name)
  457. return Markup('<a href="{self.url}">{name}</a>').format(**locals())
  458. @property
  459. def full_name(self):
  460. return utils.get_datasource_full_name(
  461. self.cluster_name, self.datasource_name)
  462. @property
  463. def time_column_grains(self):
  464. return {
  465. 'time_columns': [
  466. 'all', '5 seconds', '30 seconds', '1 minute', '5 minutes'
  467. '30 minutes', '1 hour', '6 hour', '1 day', '7 days',
  468. 'week', 'week_starting_sunday', 'week_ending_saturday',
  469. 'month', 'quarter', 'year',
  470. ],
  471. 'time_grains': ['now'],
  472. }
  473. def __repr__(self):
  474. return self.datasource_name
  475. @renders('datasource_name')
  476. def datasource_link(self):
  477. url = '/superset/explore/{obj.type}/{obj.id}/'.format(obj=self)
  478. name = escape(self.datasource_name)
  479. return Markup('<a href="{url}">{name}</a>'.format(**locals()))
  480. def get_metric_obj(self, metric_name):
  481. return [
  482. m.json_obj for m in self.metrics
  483. if m.metric_name == metric_name
  484. ][0]
  485. @classmethod
  486. def import_obj(cls, i_datasource, import_time=None):
  487. """Imports the datasource from the object to the database.
  488. Metrics and columns and datasource will be overridden if exists.
  489. This function can be used to import/export dashboards between multiple
  490. superset instances. Audit metadata isn't copies over.
  491. """
  492. def lookup_datasource(d):
  493. return db.session.query(DruidDatasource).filter(
  494. DruidDatasource.datasource_name == d.datasource_name,
  495. DruidCluster.cluster_name == d.cluster_name,
  496. ).first()
  497. def lookup_cluster(d):
  498. return db.session.query(DruidCluster).filter_by(
  499. cluster_name=d.cluster_name).one()
  500. return import_util.import_datasource(
  501. db.session, i_datasource, lookup_cluster, lookup_datasource,
  502. import_time)
  503. def latest_metadata(self):
  504. """Returns segment metadata from the latest segment"""
  505. logging.info('Syncing datasource [{}]'.format(self.datasource_name))
  506. client = self.cluster.get_pydruid_client()
  507. try:
  508. results = client.time_boundary(datasource=self.datasource_name)
  509. except IOError:
  510. results = None
  511. if results:
  512. max_time = results[0]['result']['maxTime']
  513. max_time = dparse(max_time)
  514. else:
  515. max_time = datetime.now()
  516. # Query segmentMetadata for 7 days back. However, due to a bug,
  517. # we need to set this interval to more than 1 day ago to exclude
  518. # realtime segments, which triggered a bug (fixed in druid 0.8.2).
  519. # https://groups.google.com/forum/#!topic/druid-user/gVCqqspHqOQ
  520. lbound = (max_time - timedelta(days=7)).isoformat()
  521. if LooseVersion(self.cluster.druid_version) < LooseVersion('0.8.2'):
  522. rbound = (max_time - timedelta(1)).isoformat()
  523. else:
  524. rbound = max_time.isoformat()
  525. segment_metadata = None
  526. try:
  527. segment_metadata = client.segment_metadata(
  528. datasource=self.datasource_name,
  529. intervals=lbound + '/' + rbound,
  530. merge=self.merge_flag,
  531. analysisTypes=[])
  532. except Exception as e:
  533. logging.warning('Failed first attempt to get latest segment')
  534. logging.exception(e)
  535. if not segment_metadata:
  536. # if no segments in the past 7 days, look at all segments
  537. lbound = datetime(1901, 1, 1).isoformat()[:10]
  538. if LooseVersion(self.cluster.druid_version) < LooseVersion('0.8.2'):
  539. rbound = datetime.now().isoformat()
  540. else:
  541. rbound = datetime(2050, 1, 1).isoformat()[:10]
  542. try:
  543. segment_metadata = client.segment_metadata(
  544. datasource=self.datasource_name,
  545. intervals=lbound + '/' + rbound,
  546. merge=self.merge_flag,
  547. analysisTypes=[])
  548. except Exception as e:
  549. logging.warning('Failed 2nd attempt to get latest segment')
  550. logging.exception(e)
  551. if segment_metadata:
  552. return segment_metadata[-1]['columns']
  553. def refresh_metrics(self):
  554. for col in self.columns:
  555. col.refresh_metrics()
  556. @classmethod
  557. def sync_to_db_from_config(
  558. cls,
  559. druid_config,
  560. user,
  561. cluster,
  562. refresh=True):
  563. """Merges the ds config from druid_config into one stored in the db."""
  564. session = db.session
  565. datasource = (
  566. session.query(cls)
  567. .filter_by(datasource_name=druid_config['name'])
  568. .first()
  569. )
  570. # Create a new datasource.
  571. if not datasource:
  572. datasource = cls(
  573. datasource_name=druid_config['name'],
  574. cluster=cluster,
  575. owner=user,
  576. changed_by_fk=user.id,
  577. created_by_fk=user.id,
  578. )
  579. session.add(datasource)
  580. elif not refresh:
  581. return
  582. dimensions = druid_config['dimensions']
  583. col_objs = (
  584. session.query(DruidColumn)
  585. .filter(DruidColumn.datasource_id == datasource.id)
  586. .filter(DruidColumn.column_name.in_(dimensions))
  587. )
  588. col_objs = {col.column_name: col for col in col_objs}
  589. for dim in dimensions:
  590. col_obj = col_objs.get(dim, None)
  591. if not col_obj:
  592. col_obj = DruidColumn(
  593. datasource_id=datasource.id,
  594. column_name=dim,
  595. groupby=True,
  596. filterable=True,
  597. # TODO: fetch type from Hive.
  598. type='STRING',
  599. datasource=datasource,
  600. )
  601. session.add(col_obj)
  602. # Import Druid metrics
  603. metric_objs = (
  604. session.query(DruidMetric)
  605. .filter(DruidMetric.datasource_id == datasource.id)
  606. .filter(DruidMetric.metric_name.in_(
  607. spec['name'] for spec in druid_config['metrics_spec']
  608. ))
  609. )
  610. metric_objs = {metric.metric_name: metric for metric in metric_objs}
  611. for metric_spec in druid_config['metrics_spec']:
  612. metric_name = metric_spec['name']
  613. metric_type = metric_spec['type']
  614. metric_json = json.dumps(metric_spec)
  615. if metric_type == 'count':
  616. metric_type = 'longSum'
  617. metric_json = json.dumps({
  618. 'type': 'longSum',
  619. 'name': metric_name,
  620. 'fieldName': metric_name,
  621. })
  622. metric_obj = metric_objs.get(metric_name, None)
  623. if not metric_obj:
  624. metric_obj = DruidMetric(
  625. metric_name=metric_name,
  626. metric_type=metric_type,
  627. verbose_name='%s(%s)' % (metric_type, metric_name),
  628. datasource=datasource,
  629. json=metric_json,
  630. description=(
  631. 'Imported from the airolap config dir for %s' %
  632. druid_config['name']),
  633. )
  634. session.add(metric_obj)
  635. session.commit()
  636. @staticmethod
  637. def time_offset(granularity):
  638. if granularity == 'week_ending_saturday':
  639. return 6 * 24 * 3600 * 1000 # 6 days
  640. return 0
  641. # uses https://en.wikipedia.org/wiki/ISO_8601
  642. # http://druid.io/docs/0.8.0/querying/granularities.html
  643. # TODO: pass origin from the UI
  644. @staticmethod
  645. def granularity(period_name, timezone=None, origin=None):
  646. if not period_name or period_name == 'all':
  647. return 'all'
  648. iso_8601_dict = {
  649. '5 seconds': 'PT5S',
  650. '30 seconds': 'PT30S',
  651. '1 minute': 'PT1M',
  652. '5 minutes': 'PT5M',
  653. '30 minutes': 'PT30M',
  654. '1 hour': 'PT1H',
  655. '6 hour': 'PT6H',
  656. 'one day': 'P1D',
  657. '1 day': 'P1D',
  658. '7 days': 'P7D',
  659. 'week': 'P1W',
  660. 'week_starting_sunday': 'P1W',
  661. 'week_ending_saturday': 'P1W',
  662. 'month': 'P1M',
  663. 'quarter': 'P3M',
  664. 'year': 'P1Y',
  665. }
  666. granularity = {'type': 'period'}
  667. if timezone:
  668. granularity['timeZone'] = timezone
  669. if origin:
  670. dttm = utils.parse_human_datetime(origin)
  671. granularity['origin'] = dttm.isoformat()
  672. if period_name in iso_8601_dict:
  673. granularity['period'] = iso_8601_dict[period_name]
  674. if period_name in ('week_ending_saturday', 'week_starting_sunday'):
  675. # use Sunday as start of the week
  676. granularity['origin'] = '2016-01-03T00:00:00'
  677. elif not isinstance(period_name, str):
  678. granularity['type'] = 'duration'
  679. granularity['duration'] = period_name
  680. elif period_name.startswith('P'):
  681. # identify if the string is the iso_8601 period
  682. granularity['period'] = period_name
  683. else:
  684. granularity['type'] = 'duration'
  685. granularity['duration'] = utils.parse_human_timedelta(
  686. period_name).total_seconds() * 1000
  687. return granularity
  688. @staticmethod
  689. def get_post_agg(mconf):
  690. """
  691. For a metric specified as `postagg` returns the
  692. kind of post aggregation for pydruid.
  693. """
  694. if mconf.get('type') == 'javascript':
  695. return JavascriptPostAggregator(
  696. name=mconf.get('name', ''),
  697. field_names=mconf.get('fieldNames', []),
  698. function=mconf.get('function', ''))
  699. elif mconf.get('type') == 'quantile':
  700. return Quantile(
  701. mconf.get('name', ''),
  702. mconf.get('probability', ''),
  703. )
  704. elif mconf.get('type') == 'quantiles':
  705. return Quantiles(
  706. mconf.get('name', ''),
  707. mconf.get('probabilities', ''),
  708. )
  709. elif mconf.get('type') == 'fieldAccess':
  710. return Field(mconf.get('name'))
  711. elif mconf.get('type') == 'constant':
  712. return Const(
  713. mconf.get('value'),
  714. output_name=mconf.get('name', ''),
  715. )
  716. elif mconf.get('type') == 'hyperUniqueCardinality':
  717. return HyperUniqueCardinality(
  718. mconf.get('name'),
  719. )
  720. elif mconf.get('type') == 'arithmetic':
  721. return Postaggregator(
  722. mconf.get('fn', '/'),
  723. mconf.get('fields', []),
  724. mconf.get('name', ''))
  725. else:
  726. return CustomPostAggregator(
  727. mconf.get('name', ''),
  728. mconf)
  729. @staticmethod
  730. def find_postaggs_for(postagg_names, metrics_dict):
  731. """Return a list of metrics that are post aggregations"""
  732. postagg_metrics = [
  733. metrics_dict[name] for name in postagg_names
  734. if metrics_dict[name].metric_type == POST_AGG_TYPE
  735. ]
  736. # Remove post aggregations that were found
  737. for postagg in postagg_metrics:
  738. postagg_names.remove(postagg.metric_name)
  739. return postagg_metrics
  740. @staticmethod
  741. def recursive_get_fields(_conf):
  742. _type = _conf.get('type')
  743. _field = _conf.get('field')
  744. _fields = _conf.get('fields')
  745. field_names = []
  746. if _type in ['fieldAccess', 'hyperUniqueCardinality',
  747. 'quantile', 'quantiles']:
  748. field_names.append(_conf.get('fieldName', ''))
  749. if _field:
  750. field_names += DruidDatasource.recursive_get_fields(_field)
  751. if _fields:
  752. for _f in _fields:
  753. field_names += DruidDatasource.recursive_get_fields(_f)
  754. return list(set(field_names))
  755. @staticmethod
  756. def resolve_postagg(postagg, post_aggs, agg_names, visited_postaggs, metrics_dict):
  757. mconf = postagg.json_obj
  758. required_fields = set(
  759. DruidDatasource.recursive_get_fields(mconf) +
  760. mconf.get('fieldNames', []))
  761. # Check if the fields are already in aggs
  762. # or is a previous postagg
  763. required_fields = set([
  764. field for field in required_fields
  765. if field not in visited_postaggs and field not in agg_names
  766. ])
  767. # First try to find postaggs that match
  768. if len(required_fields) > 0:
  769. missing_postaggs = DruidDatasource.find_postaggs_for(
  770. required_fields, metrics_dict)
  771. for missing_metric in required_fields:
  772. agg_names.add(missing_metric)
  773. for missing_postagg in missing_postaggs:
  774. # Add to visited first to avoid infinite recursion
  775. # if post aggregations are cyclicly dependent
  776. visited_postaggs.add(missing_postagg.metric_name)
  777. for missing_postagg in missing_postaggs:
  778. DruidDatasource.resolve_postagg(
  779. missing_postagg, post_aggs, agg_names, visited_postaggs, metrics_dict)
  780. post_aggs[postagg.metric_name] = DruidDatasource.get_post_agg(postagg.json_obj)
  781. @staticmethod
  782. def metrics_and_post_aggs(metrics, metrics_dict, druid_version=None):
  783. # Separate metrics into those that are aggregations
  784. # and those that are post aggregations
  785. saved_agg_names = set()
  786. adhoc_agg_configs = []
  787. postagg_names = []
  788. for metric in metrics:
  789. if utils.is_adhoc_metric(metric):
  790. adhoc_agg_configs.append(metric)
  791. elif metrics_dict[metric].metric_type != POST_AGG_TYPE:
  792. saved_agg_names.add(metric)
  793. else:
  794. postagg_names.append(metric)
  795. # Create the post aggregations, maintain order since postaggs
  796. # may depend on previous ones
  797. post_aggs = OrderedDict()
  798. visited_postaggs = set()
  799. for postagg_name in postagg_names:
  800. postagg = metrics_dict[postagg_name]
  801. visited_postaggs.add(postagg_name)
  802. DruidDatasource.resolve_postagg(
  803. postagg, post_aggs, saved_agg_names, visited_postaggs, metrics_dict)
  804. aggs = DruidDatasource.get_aggregations(
  805. metrics_dict,
  806. saved_agg_names,
  807. adhoc_agg_configs,
  808. )
  809. return aggs, post_aggs
  810. def values_for_column(self,
  811. column_name,
  812. limit=10000):
  813. """Retrieve some values for the given column"""
  814. logging.info(
  815. 'Getting values for columns [{}] limited to [{}]'
  816. .format(column_name, limit))
  817. # TODO: Use Lexicographic TopNMetricSpec once supported by PyDruid
  818. if self.fetch_values_from:
  819. from_dttm = utils.parse_human_datetime(self.fetch_values_from)
  820. else:
  821. from_dttm = datetime(1970, 1, 1)
  822. qry = dict(
  823. datasource=self.datasource_name,
  824. granularity='all',
  825. intervals=from_dttm.isoformat() + '/' + datetime.now().isoformat(),
  826. aggregations=dict(count=count('count')),
  827. dimension=column_name,
  828. metric='count',
  829. threshold=limit,
  830. )
  831. client = self.cluster.get_pydruid_client()
  832. client.topn(**qry)
  833. df = client.export_pandas()
  834. return [row[column_name] for row in df.to_records(index=False)]
  835. def get_query_str(self, query_obj, phase=1, client=None):
  836. return self.run_query(client=client, phase=phase, **query_obj)
  837. def _add_filter_from_pre_query_data(self, df, dimensions, dim_filter):
  838. ret = dim_filter
  839. if df is not None and not df.empty:
  840. new_filters = []
  841. for unused, row in df.iterrows():
  842. fields = []
  843. for dim in dimensions:
  844. f = None
  845. # Check if this dimension uses an extraction function
  846. # If so, create the appropriate pydruid extraction object
  847. if isinstance(dim, dict) and 'extractionFn' in dim:
  848. (col, extraction_fn) = DruidDatasource._create_extraction_fn(dim)
  849. dim_val = dim['outputName']
  850. f = Filter(
  851. dimension=col,
  852. value=row[dim_val],
  853. extraction_function=extraction_fn,
  854. )
  855. elif isinstance(dim, dict):
  856. dim_val = dim['outputName']
  857. if dim_val:
  858. f = Dimension(dim_val) == row[dim_val]
  859. else:
  860. f = Dimension(dim) == row[dim]
  861. if f:
  862. fields.append(f)
  863. if len(fields) > 1:
  864. term = Filter(type='and', fields=fields)
  865. new_filters.append(term)
  866. elif fields:
  867. new_filters.append(fields[0])
  868. if new_filters:
  869. ff = Filter(type='or', fields=new_filters)
  870. if not dim_filter:
  871. ret = ff
  872. else:
  873. ret = Filter(type='and', fields=[ff, dim_filter])
  874. return ret
  875. @staticmethod
  876. def druid_type_from_adhoc_metric(adhoc_metric):
  877. column_type = adhoc_metric['column']['type'].lower()
  878. aggregate = adhoc_metric['aggregate'].lower()
  879. if aggregate == 'count':
  880. return 'count'
  881. if aggregate == 'count_distinct':
  882. return 'cardinality'
  883. else:
  884. return column_type + aggregate.capitalize()
  885. @staticmethod
  886. def get_aggregations(metrics_dict, saved_metrics, adhoc_metrics=[]):
  887. """
  888. Returns a dictionary of aggregation metric names to aggregation json objects
  889. :param metrics_dict: dictionary of all the metrics
  890. :param saved_metrics: list of saved metric names
  891. :param adhoc_metrics: list of adhoc metric names
  892. :raise SupersetException: if one or more metric names are not aggregations
  893. """
  894. aggregations = OrderedDict()
  895. invalid_metric_names = []
  896. for metric_name in saved_metrics:
  897. if metric_name in metrics_dict:
  898. metric = metrics_dict[metric_name]
  899. if metric.metric_type == POST_AGG_TYPE:
  900. invalid_metric_names.append(metric_name)
  901. else:
  902. aggregations[metric_name] = metric.json_obj
  903. else:
  904. invalid_metric_names.append(metric_name)
  905. if len(invalid_metric_names) > 0:
  906. raise SupersetException(
  907. _('Metric(s) {} must be aggregations.').format(invalid_metric_names))
  908. for adhoc_metric in adhoc_metrics:
  909. aggregations[adhoc_metric['label']] = {
  910. 'fieldName': adhoc_metric['column']['column_name'],
  911. 'fieldNames': [adhoc_metric['column']['column_name']],
  912. 'type': DruidDatasource.druid_type_from_adhoc_metric(adhoc_metric),
  913. 'name': adhoc_metric['label'],
  914. }
  915. return aggregations
  916. def check_restricted_metrics(self, aggregations):
  917. rejected_metrics = [
  918. m.metric_name for m in self.metrics
  919. if m.is_restricted and
  920. m.metric_name in aggregations.keys() and
  921. not security_manager.has_access('metric_access', m.perm)
  922. ]
  923. if rejected_metrics:
  924. raise MetricPermException(
  925. 'Access to the metrics denied: ' + ', '.join(rejected_metrics),
  926. )
  927. def get_dimensions(self, groupby, columns_dict):
  928. dimensions = []
  929. groupby = [gb for gb in groupby if gb in columns_dict]
  930. for column_name in groupby:
  931. col = columns_dict.get(column_name)
  932. dim_spec = col.dimension_spec if col else None
  933. if dim_spec:
  934. dimensions.append(dim_spec)
  935. else:
  936. dimensions.append(column_name)
  937. return dimensions
  938. def intervals_from_dttms(self, from_dttm, to_dttm):
  939. # Couldn't find a way to just not filter on time...
  940. from_dttm = from_dttm or datetime(1901, 1, 1)
  941. to_dttm = to_dttm or datetime(2101, 1, 1)
  942. # add tzinfo to native datetime with config
  943. from_dttm = from_dttm.replace(tzinfo=DRUID_TZ)
  944. to_dttm = to_dttm.replace(tzinfo=DRUID_TZ)
  945. return '{}/{}'.format(
  946. from_dttm.isoformat() if from_dttm else '',
  947. to_dttm.isoformat() if to_dttm else '',
  948. )
  949. @staticmethod
  950. def _dimensions_to_values(dimensions):
  951. """
  952. Replace dimensions specs with their `dimension`
  953. values, and ignore those without
  954. """
  955. values = []
  956. for dimension in dimensions:
  957. if isinstance(dimension, dict):
  958. if 'extractionFn' in dimension:
  959. values.append(dimension)
  960. elif 'dimension' in dimension:
  961. values.append(dimension['dimension'])
  962. else:
  963. values.append(dimension)
  964. return values
  965. @staticmethod
  966. def sanitize_metric_object(metric):
  967. """
  968. Update a metric with the correct type if necessary.
  969. :param dict metric: The metric to sanitize
  970. """
  971. if (
  972. utils.is_adhoc_metric(metric) and
  973. metric['column']['type'].upper() == 'FLOAT'
  974. ):
  975. metric['column']['type'] = 'DOUBLE'
  976. def run_query( # noqa / druid
  977. self,
  978. groupby, metrics,
  979. granularity,
  980. from_dttm, to_dttm,
  981. filter=None, # noqa
  982. is_timeseries=True,
  983. timeseries_limit=None,
  984. timeseries_limit_metric=None,
  985. row_limit=None,
  986. inner_from_dttm=None, inner_to_dttm=None,
  987. orderby=None,
  988. extras=None, # noqa
  989. columns=None, phase=2, client=None,
  990. order_desc=True,
  991. prequeries=None,
  992. is_prequery=False,
  993. ):
  994. """Runs a query against Druid and returns a dataframe.
  995. """
  996. # TODO refactor into using a TBD Query object
  997. client = client or self.cluster.get_pydruid_client()
  998. row_limit = row_limit or conf.get('ROW_LIMIT')
  999. if not is_timeseries:
  1000. granularity = 'all'
  1001. if granularity == 'all':
  1002. phase = 1
  1003. inner_from_dttm = inner_from_dttm or from_dttm
  1004. inner_to_dttm = inner_to_dttm or to_dttm
  1005. timezone = from_dttm.replace(tzinfo=DRUID_TZ).tzname() if from_dttm else None
  1006. query_str = ''
  1007. metrics_dict = {m.metric_name: m for m in self.metrics}
  1008. columns_dict = {c.column_name: c for c in self.columns}
  1009. if (
  1010. self.cluster and
  1011. LooseVersion(self.cluster.get_druid_version()) < LooseVersion('0.11.0')
  1012. ):
  1013. for metric in metrics:
  1014. self.sanitize_metric_object(metric)
  1015. self.sanitize_metric_object(timeseries_limit_metric)
  1016. aggregations, post_aggs = DruidDatasource.metrics_and_post_aggs(
  1017. metrics,
  1018. metrics_dict)
  1019. self.check_restricted_metrics(aggregations)
  1020. # the dimensions list with dimensionSpecs expanded
  1021. dimensions = self.get_dimensions(groupby, columns_dict)
  1022. extras = extras or {}
  1023. qry = dict(
  1024. datasource=self.datasource_name,
  1025. dimensions=dimensions,
  1026. aggregations=aggregations,
  1027. granularity=DruidDatasource.granularity(
  1028. granularity,
  1029. timezone=timezone,
  1030. origin=extras.get('druid_time_origin'),
  1031. ),
  1032. post_aggregations=post_aggs,
  1033. intervals=self.intervals_from_dttms(from_dttm, to_dttm),
  1034. )
  1035. filters = DruidDatasource.get_filters(filter, self.num_cols, columns_dict)
  1036. if filters:
  1037. qry['filter'] = filters
  1038. having_filters = self.get_having_filters(extras.get('having_druid'))
  1039. if having_filters:
  1040. qry['having'] = having_filters
  1041. order_direction = 'descending' if order_desc else 'ascending'
  1042. if columns:
  1043. columns.append('__time')
  1044. del qry['post_aggregations']
  1045. del qry['aggregations']
  1046. qry['dimensions'] = columns
  1047. qry['metrics'] = []
  1048. qry['granularity'] = 'all'
  1049. qry['limit'] = row_limit
  1050. client.scan(**qry)
  1051. elif len(groupby) == 0 and not having_filters:
  1052. logging.info('Running timeseries query for no groupby values')
  1053. del qry['dimensions']
  1054. client.timeseries(**qry)
  1055. elif (
  1056. not having_filters and
  1057. len(groupby) == 1 and
  1058. order_desc
  1059. ):
  1060. dim = list(qry.get('dimensions'))[0]
  1061. logging.info('Running two-phase topn query for dimension [{}]'.format(dim))
  1062. pre_qry = deepcopy(qry)
  1063. if timeseries_limit_metric:
  1064. order_by = utils.get_metric_name(timeseries_limit_metric)
  1065. aggs_dict, post_aggs_dict = DruidDatasource.metrics_and_post_aggs(
  1066. [timeseries_limit_metric],
  1067. metrics_dict)
  1068. if phase == 1:
  1069. pre_qry['aggregations'].update(aggs_dict)
  1070. pre_qry['post_aggregations'].update(post_aggs_dict)
  1071. else:
  1072. pre_qry['aggregations'] = aggs_dict
  1073. pre_qry['post_aggregations'] = post_aggs_dict
  1074. else:
  1075. order_by = list(qry['aggregations'].keys())[0]
  1076. # Limit on the number of timeseries, doing a two-phases query
  1077. pre_qry['granularity'] = 'all'
  1078. pre_qry['threshold'] = min(row_limit,
  1079. timeseries_limit or row_limit)
  1080. pre_qry['metric'] = order_by
  1081. pre_qry['dimension'] = self._dimensions_to_values(qry.get('dimensions'))[0]
  1082. del pre_qry['dimensions']
  1083. client.topn(**pre_qry)
  1084. logging.info('Phase 1 Complete')
  1085. if phase == 2:
  1086. query_str += '// Two phase query\n// Phase 1\n'
  1087. query_str += json.dumps(
  1088. client.query_builder.last_query.query_dict, indent=2)
  1089. query_str += '\n'
  1090. if phase == 1:
  1091. return query_str
  1092. query_str += (
  1093. "// Phase 2 (built based on phase one's results)\n")
  1094. df = client.export_pandas()
  1095. qry['filter'] = self._add_filter_from_pre_query_data(
  1096. df,
  1097. [pre_qry['dimension']],
  1098. filters)
  1099. qry['threshold'] = timeseries_limit or 1000
  1100. if row_limit and granularity == 'all':
  1101. qry['threshold'] = row_limit
  1102. qry['dimension'] = dim
  1103. del qry['dimensions']
  1104. qry['metric'] = list(qry['aggregations'].keys())[0]
  1105. client.topn(**qry)
  1106. logging.info('Phase 2 Complete')
  1107. elif len(groupby) > 0 or having_filters:
  1108. # If grouping on multiple fields or using a having filter
  1109. # we have to force a groupby query
  1110. logging.info('Running groupby query for dimensions [{}]'.format(dimensions))
  1111. if timeseries_limit and is_timeseries:
  1112. logging.info('Running two-phase query for timeseries')
  1113. pre_qry = deepcopy(qry)
  1114. pre_qry_dims = self._dimensions_to_values(qry['dimensions'])
  1115. # Can't use set on an array with dicts
  1116. # Use set with non-dict items only
  1117. non_dict_dims = list(
  1118. set([x for x in pre_qry_dims if not isinstance(x, dict)]),
  1119. )
  1120. dict_dims = [x for x in pre_qry_dims if isinstance(x, dict)]
  1121. pre_qry['dimensions'] = non_dict_dims + dict_dims
  1122. order_by = None
  1123. if metrics:
  1124. order_by = utils.get_metric_name(metrics[0])
  1125. else:
  1126. order_by = pre_qry_dims[0]
  1127. if timeseries_limit_metric:
  1128. order_by = utils.get_metric_name(timeseries_limit_metric)
  1129. aggs_dict, post_aggs_dict = DruidDatasource.metrics_and_post_aggs(
  1130. [timeseries_limit_metric],
  1131. metrics_dict)
  1132. if phase == 1:
  1133. pre_qry['aggregations'].update(aggs_dict)
  1134. pre_qry['post_aggregations'].update(post_aggs_dict)
  1135. else:
  1136. pre_qry['aggregations'] = aggs_dict
  1137. pre_qry['post_aggregations'] = post_aggs_dict
  1138. # Limit on the number of timeseries, doing a two-phases query
  1139. pre_qry['granularity'] = 'all'
  1140. pre_qry['limit_spec'] = {
  1141. 'type': 'default',
  1142. 'limit': min(timeseries_limit, row_limit),
  1143. 'intervals': self.intervals_from_dttms(
  1144. inner_from_dttm, inner_to_dttm),
  1145. 'columns': [{
  1146. 'dimension': order_by,
  1147. 'direction': order_direction,
  1148. }],
  1149. }
  1150. client.groupby(**pre_qry)
  1151. logging.info('Phase 1 Complete')
  1152. query_str += '// Two phase query\n// Phase 1\n'
  1153. query_str += json.dumps(
  1154. client.query_builder.last_query.query_dict, indent=2)
  1155. query_str += '\n'
  1156. if phase == 1:
  1157. return query_str
  1158. query_str += (
  1159. "// Phase 2 (built based on phase one's results)\n")
  1160. df = client.export_pandas()
  1161. qry['filter'] = self._add_filter_from_pre_query_data(
  1162. df,
  1163. pre_qry['dimensions'],
  1164. filters,
  1165. )
  1166. qry['limit_spec'] = None
  1167. if row_limit:
  1168. dimension_values = self._dimensions_to_values(dimensions)
  1169. qry['limit_spec'] = {
  1170. 'type': 'default',
  1171. 'limit': row_limit,
  1172. 'columns': [{
  1173. 'dimension': (
  1174. utils.get_metric_name(
  1175. metrics[0],
  1176. ) if metrics else dimension_values[0]
  1177. ),
  1178. 'direction': order_direction,
  1179. }],
  1180. }
  1181. client.groupby(**qry)
  1182. logging.info('Query Complete')
  1183. query_str += json.dumps(
  1184. client.query_builder.last_query.query_dict, indent=2)
  1185. return query_str
  1186. @staticmethod
  1187. def homogenize_types(df, groupby_cols):
  1188. """Converting all GROUPBY columns to strings
  1189. When grouping by a numeric (say FLOAT) column, pydruid returns
  1190. strings in the dataframe. This creates issues downstream related
  1191. to having mixed types in the dataframe
  1192. Here we replace None with <NULL> and make the whole series a
  1193. str instead of an object.
  1194. """
  1195. for col in groupby_cols:
  1196. df[col] = df[col].fillna('<NULL>').astype('unicode')
  1197. return df
  1198. def query(self, query_obj):
  1199. qry_start_dttm = datetime.now()
  1200. client = self.cluster.get_pydruid_client()
  1201. query_str = self.get_query_str(
  1202. client=client, query_obj=query_obj, phase=2)
  1203. df = client.export_pandas()
  1204. if df is None or df.size == 0:
  1205. return QueryResult(
  1206. df=pandas.DataFrame([]),
  1207. query=query_str,
  1208. duration=datetime.now() - qry_start_dttm)
  1209. df = self.homogenize_types(df, query_obj.get('groupby', []))
  1210. df.columns = [
  1211. DTTM_ALIAS if c in ('timestamp', '__time') else c
  1212. for c in df.columns
  1213. ]
  1214. is_timeseries = query_obj['is_timeseries'] \
  1215. if 'is_timeseries' in query_obj else True
  1216. if (
  1217. not is_timeseries and
  1218. DTTM_ALIAS in df.columns):
  1219. del df[DTTM_ALIAS]
  1220. # Reordering columns
  1221. cols = []
  1222. if DTTM_ALIAS in df.columns:
  1223. cols += [DTTM_ALIAS]
  1224. cols += query_obj.get('groupby') or []
  1225. cols += query_obj.get('columns') or []
  1226. cols += query_obj.get('metrics') or []
  1227. cols = utils.get_metric_names(cols)
  1228. cols = [col for col in cols if col in df.columns]
  1229. df = df[cols]
  1230. time_offset = DruidDatasource.time_offset(query_obj['granularity'])
  1231. def increment_timestamp(ts):
  1232. dt = utils.parse_human_datetime(ts).replace(
  1233. tzinfo=DRUID_TZ)
  1234. return dt + timedelta(milliseconds=time_offset)
  1235. if DTTM_ALIAS in df.columns and time_offset:
  1236. df[DTTM_ALIAS] = df[DTTM_ALIAS].apply(increment_timestamp)
  1237. return QueryResult(
  1238. df=df,
  1239. query=query_str,
  1240. duration=datetime.now() - qry_start_dttm)
  1241. @staticmethod
  1242. def _create_extraction_fn(dim_spec):
  1243. extraction_fn = None
  1244. if dim_spec and 'extractionFn' in dim_spec:
  1245. col = dim_spec['dimension']
  1246. fn = dim_spec['extractionFn']
  1247. ext_type = fn.get('type')
  1248. if ext_type == 'lookup' and fn['lookup'].get('type') == 'map':
  1249. replace_missing_values = fn.get('replaceMissingValueWith')
  1250. retain_missing_values = fn.get('retainMissingValue', False)
  1251. injective = fn.get('isOneToOne', False)
  1252. extraction_fn = MapLookupExtraction(
  1253. fn['lookup']['map'],
  1254. replace_missing_values=replace_missing_values,
  1255. retain_missing_values=retain_missing_values,
  1256. injective=injective,
  1257. )
  1258. elif ext_type == 'regex':
  1259. extraction_fn = RegexExtraction(fn['expr'])
  1260. else:
  1261. raise Exception(_('Unsupported extraction function: ' + ext_type))
  1262. return (col, extraction_fn)
  1263. @classmethod
  1264. def get_filters(cls, raw_filters, num_cols, columns_dict): # noqa
  1265. """Given Superset filter data structure, returns pydruid Filter(s)"""
  1266. filters = None
  1267. for flt in raw_filters:
  1268. col = flt.get('col')
  1269. op = flt.get('op')
  1270. eq = flt.get('val')
  1271. if (
  1272. not col or
  1273. not op or
  1274. (eq is None and op not in ('IS NULL', 'IS NOT NULL'))):
  1275. continue
  1276. # Check if this dimension uses an extraction function
  1277. # If so, create the appropriate pydruid extraction object
  1278. column_def = columns_dict.get(col)
  1279. dim_spec = column_def.dimension_spec if column_def else None
  1280. extraction_fn = None
  1281. if dim_spec and 'extractionFn' in dim_spec:
  1282. (col, extraction_fn) = DruidDatasource._create_extraction_fn(dim_spec)
  1283. cond = None
  1284. is_numeric_col = col in num_cols
  1285. is_list_target = op in ('in', 'not in')
  1286. eq = cls.filter_values_handler(
  1287. eq, is_list_target=is_list_target,
  1288. target_column_is_numeric=is_numeric_col)
  1289. # For these two ops, could have used Dimension,
  1290. # but it doesn't support extraction functions
  1291. if op == '==':
  1292. cond = Filter(dimension=col, value=eq, extraction_function=extraction_fn)
  1293. elif op == '!=':
  1294. cond = ~Filter(dimension=col, value=eq, extraction_function=extraction_fn)
  1295. elif op in ('in', 'not in'):
  1296. fields = []
  1297. # ignore the filter if it has no value
  1298. if not len(eq):
  1299. continue
  1300. # if it uses an extraction fn, use the "in" operator
  1301. # as Dimension isn't supported
  1302. elif extraction_fn is not None:
  1303. cond = Filter(
  1304. dimension=col,
  1305. values=eq,
  1306. type='in',
  1307. extraction_function=extraction_fn,
  1308. )
  1309. elif len(eq) == 1:
  1310. cond = Dimension(col) == eq[0]
  1311. else:
  1312. for s in eq:
  1313. fields.append(Dimension(col) == s)
  1314. cond = Filter(type='or', fields=fields)
  1315. if op == 'not in':
  1316. cond = ~cond
  1317. elif op == 'regex':
  1318. cond = Filter(
  1319. extraction_function=extraction_fn,
  1320. type='regex',
  1321. pattern=eq,
  1322. dimension=col,
  1323. )
  1324. # For the ops below, could have used pydruid's Bound,
  1325. # but it doesn't support extraction functions
  1326. elif op == '>=':
  1327. cond = Filter(
  1328. type='bound',
  1329. extraction_function=extraction_fn,
  1330. dimension=col,
  1331. lowerStrict=False,
  1332. upperStrict=False,
  1333. lower=eq,
  1334. upper=None,
  1335. alphaNumeric=is_numeric_col,
  1336. )
  1337. elif op == '<=':
  1338. cond = Filter(
  1339. type='bound',
  1340. extraction_function=extraction_fn,
  1341. dimension=col,
  1342. lowerStrict=False,
  1343. upperStrict=False,
  1344. lower=None,
  1345. upper=eq,
  1346. alphaNumeric=is_numeric_col,
  1347. )
  1348. elif op == '>':
  1349. cond = Filter(
  1350. type='bound',
  1351. extraction_function=extraction_fn,
  1352. lowerStrict=True,
  1353. upperStrict=False,
  1354. dimension=col,
  1355. lower=eq,
  1356. upper=None,
  1357. alphaNumeric=is_numeric_col,
  1358. )
  1359. elif op == '<':
  1360. cond = Filter(
  1361. type='bound',
  1362. extraction_function=extraction_fn,
  1363. upperStrict=True,
  1364. lowerStrict=False,
  1365. dimension=col,
  1366. lower=None,
  1367. upper=eq,
  1368. alphaNumeric=is_numeric_col,
  1369. )
  1370. elif op == 'IS NULL':
  1371. cond = Dimension(col) == None # NOQA
  1372. elif op == 'IS NOT NULL':
  1373. cond = Dimension(col) != None # NOQA
  1374. if filters:
  1375. filters = Filter(type='and', fields=[
  1376. cond,
  1377. filters,
  1378. ])
  1379. else:
  1380. filters = cond
  1381. return filters
  1382. def _get_having_obj(self, col, op, eq):
  1383. cond = None
  1384. if op == '==':
  1385. if col in self.column_names:
  1386. cond = DimSelector(dimension=col, value=eq)
  1387. else:
  1388. cond = Aggregation(col) == eq
  1389. elif op == '>':
  1390. cond = Aggregation(col) > eq
  1391. elif op == '<':
  1392. cond = Aggregation(col) < eq
  1393. return cond
  1394. def get_having_filters(self, raw_filters):
  1395. filters = None
  1396. reversed_op_map = {
  1397. '!=': '==',
  1398. '>=': '<',
  1399. '<=': '>',
  1400. }
  1401. for flt in raw_filters:
  1402. if not all(f in flt for f in ['col', 'op', 'val']):
  1403. continue
  1404. col = flt['col']
  1405. op = flt['op']
  1406. eq = flt['val']
  1407. cond = None
  1408. if op in ['==', '>', '<']:
  1409. cond = self._get_having_obj(col, op, eq)
  1410. elif op in reversed_op_map:
  1411. cond = ~self._get_having_obj(col, reversed_op_map[op], eq)
  1412. if filters:
  1413. filters = filters & cond
  1414. else:
  1415. filters = cond
  1416. return filters
  1417. @classmethod
  1418. def query_datasources_by_name(
  1419. cls, session, database, datasource_name, schema=None):
  1420. return (
  1421. session.query(cls)
  1422. .filter_by(cluster_name=database.id)
  1423. .filter_by(datasource_name=datasource_name)
  1424. .all()
  1425. )
  1426. def external_metadata(self):
  1427. self.merge_flag = True
  1428. return [
  1429. {
  1430. 'name': k,
  1431. 'type': v.get('type'),
  1432. }
  1433. for k, v in self.latest_metadata().items()
  1434. ]
  1435. sa.event.listen(DruidDatasource, 'after_insert', security_manager.set_perm)
  1436. sa.event.listen(DruidDatasource, 'after_update', security_manager.set_perm)