bigquery.py 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201
  1. # Licensed to the Apache Software Foundation (ASF) under one
  2. # or more contributor license agreements. See the NOTICE file
  3. # distributed with this work for additional information
  4. # regarding copyright ownership. The ASF licenses this file
  5. # to you under the Apache License, Version 2.0 (the
  6. # "License"); you may not use this file except in compliance
  7. # with the License. You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing,
  12. # software distributed under the License is distributed on an
  13. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  14. # KIND, either express or implied. See the License for the
  15. # specific language governing permissions and limitations
  16. # under the License.
  17. import hashlib
  18. import re
  19. from datetime import datetime
  20. from typing import Any, Dict, List, Optional, Tuple, TYPE_CHECKING
  21. import pandas as pd
  22. from sqlalchemy import literal_column
  23. from sqlalchemy.sql.expression import ColumnClause
  24. from superset.db_engine_specs.base import BaseEngineSpec
  25. if TYPE_CHECKING:
  26. from superset.models.core import Database # pylint: disable=unused-import
  27. class BigQueryEngineSpec(BaseEngineSpec):
  28. """Engine spec for Google's BigQuery
  29. As contributed by @mxmzdlv on issue #945"""
  30. engine = "bigquery"
  31. max_column_name_length = 128
  32. """
  33. https://www.python.org/dev/peps/pep-0249/#arraysize
  34. raw_connections bypass the pybigquery query execution context and deal with
  35. raw dbapi connection directly.
  36. If this value is not set, the default value is set to 1, as described here,
  37. https://googlecloudplatform.github.io/google-cloud-python/latest/_modules/google/cloud/bigquery/dbapi/cursor.html#Cursor
  38. The default value of 5000 is derived from the pybigquery.
  39. https://github.com/mxmzdlv/pybigquery/blob/d214bb089ca0807ca9aaa6ce4d5a01172d40264e/pybigquery/sqlalchemy_bigquery.py#L102
  40. """
  41. arraysize = 5000
  42. _time_grain_functions = {
  43. None: "{col}",
  44. "PT1S": "TIMESTAMP_TRUNC({col}, SECOND)",
  45. "PT1M": "TIMESTAMP_TRUNC({col}, MINUTE)",
  46. "PT1H": "TIMESTAMP_TRUNC({col}, HOUR)",
  47. "P1D": "TIMESTAMP_TRUNC({col}, DAY)",
  48. "P1W": "TIMESTAMP_TRUNC({col}, WEEK)",
  49. "P1M": "TIMESTAMP_TRUNC({col}, MONTH)",
  50. "P0.25Y": "TIMESTAMP_TRUNC({col}, QUARTER)",
  51. "P1Y": "TIMESTAMP_TRUNC({col}, YEAR)",
  52. }
  53. @classmethod
  54. def convert_dttm(cls, target_type: str, dttm: datetime) -> Optional[str]:
  55. tt = target_type.upper()
  56. if tt == "DATE":
  57. return f"CAST('{dttm.date().isoformat()}' AS DATE)"
  58. if tt == "DATETIME":
  59. return f"""CAST('{dttm.isoformat(timespec="microseconds")}' AS DATETIME)"""
  60. if tt == "TIMESTAMP":
  61. return f"""CAST('{dttm.isoformat(timespec="microseconds")}' AS TIMESTAMP)"""
  62. return None
  63. @classmethod
  64. def fetch_data(cls, cursor: Any, limit: int) -> List[Tuple]:
  65. data = super(BigQueryEngineSpec, cls).fetch_data(cursor, limit)
  66. if data and type(data[0]).__name__ == "Row":
  67. data = [r.values() for r in data] # type: ignore
  68. return data
  69. @staticmethod
  70. def _mutate_label(label: str) -> str:
  71. """
  72. BigQuery field_name should start with a letter or underscore and contain only
  73. alphanumeric characters. Labels that start with a number are prefixed with an
  74. underscore. Any unsupported characters are replaced with underscores and an
  75. md5 hash is added to the end of the label to avoid possible collisions.
  76. :param label: Expected expression label
  77. :return: Conditionally mutated label
  78. """
  79. label_hashed = "_" + hashlib.md5(label.encode("utf-8")).hexdigest()
  80. # if label starts with number, add underscore as first character
  81. label_mutated = "_" + label if re.match(r"^\d", label) else label
  82. # replace non-alphanumeric characters with underscores
  83. label_mutated = re.sub(r"[^\w]+", "_", label_mutated)
  84. if label_mutated != label:
  85. # add first 5 chars from md5 hash to label to avoid possible collisions
  86. label_mutated += label_hashed[:6]
  87. return label_mutated
  88. @classmethod
  89. def _truncate_label(cls, label: str) -> str:
  90. """BigQuery requires column names start with either a letter or
  91. underscore. To make sure this is always the case, an underscore is prefixed
  92. to the md5 hash of the original label.
  93. :param label: expected expression label
  94. :return: truncated label
  95. """
  96. return "_" + hashlib.md5(label.encode("utf-8")).hexdigest()
  97. @classmethod
  98. def extra_table_metadata(
  99. cls, database: "Database", table_name: str, schema_name: str
  100. ) -> Dict[str, Any]:
  101. indexes = database.get_indexes(table_name, schema_name)
  102. if not indexes:
  103. return {}
  104. partitions_columns = [
  105. index.get("column_names", [])
  106. for index in indexes
  107. if index.get("name") == "partition"
  108. ]
  109. cluster_columns = [
  110. index.get("column_names", [])
  111. for index in indexes
  112. if index.get("name") == "clustering"
  113. ]
  114. return {
  115. "partitions": {"cols": partitions_columns},
  116. "clustering": {"cols": cluster_columns},
  117. }
  118. @classmethod
  119. def _get_fields(cls, cols: List[Dict[str, Any]]) -> List[ColumnClause]:
  120. """
  121. BigQuery dialect requires us to not use backtick in the fieldname which are
  122. nested.
  123. Using literal_column handles that issue.
  124. https://docs.sqlalchemy.org/en/latest/core/tutorial.html#using-more-specific-text-with-table-literal-column-and-column
  125. Also explicility specifying column names so we don't encounter duplicate
  126. column names in the result.
  127. """
  128. return [
  129. literal_column(c["name"]).label(c["name"].replace(".", "__")) for c in cols
  130. ]
  131. @classmethod
  132. def epoch_to_dttm(cls) -> str:
  133. return "TIMESTAMP_SECONDS({col})"
  134. @classmethod
  135. def epoch_ms_to_dttm(cls) -> str:
  136. return "TIMESTAMP_MILLIS({col})"
  137. @classmethod
  138. def df_to_sql(cls, df: pd.DataFrame, **kwargs: Any) -> None:
  139. """
  140. Upload data from a Pandas DataFrame to BigQuery. Calls
  141. `DataFrame.to_gbq()` which requires `pandas_gbq` to be installed.
  142. :param df: Dataframe with data to be uploaded
  143. :param kwargs: kwargs to be passed to to_gbq() method. Requires both `schema
  144. and ``name` to be present in kwargs, which are combined and passed to
  145. `to_gbq()` as `destination_table`.
  146. """
  147. try:
  148. import pandas_gbq
  149. from google.oauth2 import service_account
  150. except ImportError:
  151. raise Exception(
  152. "Could not import the library `pandas_gbq`, which is "
  153. "required to be installed in your environment in order "
  154. "to upload data to BigQuery"
  155. )
  156. if not ("name" in kwargs and "schema" in kwargs):
  157. raise Exception("name and schema need to be defined in kwargs")
  158. gbq_kwargs = {}
  159. gbq_kwargs["project_id"] = kwargs["con"].engine.url.host
  160. gbq_kwargs["destination_table"] = f"{kwargs.pop('schema')}.{kwargs.pop('name')}"
  161. # add credentials if they are set on the SQLAlchemy Dialect:
  162. creds = kwargs["con"].dialect.credentials_info
  163. if creds:
  164. credentials = service_account.Credentials.from_service_account_info(creds)
  165. gbq_kwargs["credentials"] = credentials
  166. # Only pass through supported kwargs
  167. supported_kwarg_keys = {"if_exists"}
  168. for key in supported_kwarg_keys:
  169. if key in kwargs:
  170. gbq_kwargs[key] = kwargs[key]
  171. pandas_gbq.to_gbq(df, **gbq_kwargs)