connector_registry.py 2.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879
  1. # pylint: disable=C,R,W
  2. from sqlalchemy.orm import subqueryload
  3. class ConnectorRegistry(object):
  4. """ Central Registry for all available datasource engines"""
  5. sources = {}
  6. @classmethod
  7. def register_sources(cls, datasource_config):
  8. for module_name, class_names in datasource_config.items():
  9. class_names = [str(s) for s in class_names]
  10. module_obj = __import__(module_name, fromlist=class_names)
  11. for class_name in class_names:
  12. source_class = getattr(module_obj, class_name)
  13. cls.sources[source_class.type] = source_class
  14. @classmethod
  15. def get_datasource(cls, datasource_type, datasource_id, session):
  16. return (
  17. session.query(cls.sources[datasource_type])
  18. .filter_by(id=datasource_id)
  19. .first()
  20. )
  21. @classmethod
  22. def get_all_datasources(cls, session):
  23. datasources = []
  24. for source_type in ConnectorRegistry.sources:
  25. source_class = ConnectorRegistry.sources[source_type]
  26. qry = session.query(source_class)
  27. qry = source_class.default_query(qry)
  28. datasources.extend(qry.all())
  29. return datasources
  30. @classmethod
  31. def get_datasource_by_name(cls, session, datasource_type, datasource_name,
  32. schema, database_name):
  33. datasource_class = ConnectorRegistry.sources[datasource_type]
  34. datasources = session.query(datasource_class).all()
  35. # Filter datasoures that don't have database.
  36. db_ds = [d for d in datasources if d.database and
  37. d.database.name == database_name and
  38. d.name == datasource_name and schema == schema]
  39. return db_ds[0]
  40. @classmethod
  41. def query_datasources_by_permissions(cls, session, database, permissions):
  42. datasource_class = ConnectorRegistry.sources[database.type]
  43. return (
  44. session.query(datasource_class)
  45. .filter_by(database_id=database.id)
  46. .filter(datasource_class.perm.in_(permissions))
  47. .all()
  48. )
  49. @classmethod
  50. def get_eager_datasource(cls, session, datasource_type, datasource_id):
  51. """Returns datasource with columns and metrics."""
  52. datasource_class = ConnectorRegistry.sources[datasource_type]
  53. return (
  54. session.query(datasource_class)
  55. .options(
  56. subqueryload(datasource_class.columns),
  57. subqueryload(datasource_class.metrics),
  58. )
  59. .filter_by(id=datasource_id)
  60. .one()
  61. )
  62. @classmethod
  63. def query_datasources_by_name(
  64. cls, session, database, datasource_name, schema=None):
  65. datasource_class = ConnectorRegistry.sources[database.type]
  66. return datasource_class.query_datasources_by_name(
  67. session, database, datasource_name, schema=None)