123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201 |
- # Licensed to the Apache Software Foundation (ASF) under one
- # or more contributor license agreements. See the NOTICE file
- # distributed with this work for additional information
- # regarding copyright ownership. The ASF licenses this file
- # to you under the Apache License, Version 2.0 (the
- # "License"); you may not use this file except in compliance
- # with the License. You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing,
- # software distributed under the License is distributed on an
- # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- # KIND, either express or implied. See the License for the
- # specific language governing permissions and limitations
- # under the License.
- import hashlib
- import re
- from datetime import datetime
- from typing import Any, Dict, List, Optional, Tuple, TYPE_CHECKING
- import pandas as pd
- from sqlalchemy import literal_column
- from sqlalchemy.sql.expression import ColumnClause
- from superset.db_engine_specs.base import BaseEngineSpec
- if TYPE_CHECKING:
- from superset.models.core import Database # pylint: disable=unused-import
- class BigQueryEngineSpec(BaseEngineSpec):
- """Engine spec for Google's BigQuery
- As contributed by @mxmzdlv on issue #945"""
- engine = "bigquery"
- max_column_name_length = 128
- """
- https://www.python.org/dev/peps/pep-0249/#arraysize
- raw_connections bypass the pybigquery query execution context and deal with
- raw dbapi connection directly.
- If this value is not set, the default value is set to 1, as described here,
- https://googlecloudplatform.github.io/google-cloud-python/latest/_modules/google/cloud/bigquery/dbapi/cursor.html#Cursor
- The default value of 5000 is derived from the pybigquery.
- https://github.com/mxmzdlv/pybigquery/blob/d214bb089ca0807ca9aaa6ce4d5a01172d40264e/pybigquery/sqlalchemy_bigquery.py#L102
- """
- arraysize = 5000
- _time_grain_functions = {
- None: "{col}",
- "PT1S": "TIMESTAMP_TRUNC({col}, SECOND)",
- "PT1M": "TIMESTAMP_TRUNC({col}, MINUTE)",
- "PT1H": "TIMESTAMP_TRUNC({col}, HOUR)",
- "P1D": "TIMESTAMP_TRUNC({col}, DAY)",
- "P1W": "TIMESTAMP_TRUNC({col}, WEEK)",
- "P1M": "TIMESTAMP_TRUNC({col}, MONTH)",
- "P0.25Y": "TIMESTAMP_TRUNC({col}, QUARTER)",
- "P1Y": "TIMESTAMP_TRUNC({col}, YEAR)",
- }
- @classmethod
- def convert_dttm(cls, target_type: str, dttm: datetime) -> Optional[str]:
- tt = target_type.upper()
- if tt == "DATE":
- return f"CAST('{dttm.date().isoformat()}' AS DATE)"
- if tt == "DATETIME":
- return f"""CAST('{dttm.isoformat(timespec="microseconds")}' AS DATETIME)"""
- if tt == "TIMESTAMP":
- return f"""CAST('{dttm.isoformat(timespec="microseconds")}' AS TIMESTAMP)"""
- return None
- @classmethod
- def fetch_data(cls, cursor: Any, limit: int) -> List[Tuple]:
- data = super(BigQueryEngineSpec, cls).fetch_data(cursor, limit)
- if data and type(data[0]).__name__ == "Row":
- data = [r.values() for r in data] # type: ignore
- return data
- @staticmethod
- def _mutate_label(label: str) -> str:
- """
- BigQuery field_name should start with a letter or underscore and contain only
- alphanumeric characters. Labels that start with a number are prefixed with an
- underscore. Any unsupported characters are replaced with underscores and an
- md5 hash is added to the end of the label to avoid possible collisions.
- :param label: Expected expression label
- :return: Conditionally mutated label
- """
- label_hashed = "_" + hashlib.md5(label.encode("utf-8")).hexdigest()
- # if label starts with number, add underscore as first character
- label_mutated = "_" + label if re.match(r"^\d", label) else label
- # replace non-alphanumeric characters with underscores
- label_mutated = re.sub(r"[^\w]+", "_", label_mutated)
- if label_mutated != label:
- # add first 5 chars from md5 hash to label to avoid possible collisions
- label_mutated += label_hashed[:6]
- return label_mutated
- @classmethod
- def _truncate_label(cls, label: str) -> str:
- """BigQuery requires column names start with either a letter or
- underscore. To make sure this is always the case, an underscore is prefixed
- to the md5 hash of the original label.
- :param label: expected expression label
- :return: truncated label
- """
- return "_" + hashlib.md5(label.encode("utf-8")).hexdigest()
- @classmethod
- def extra_table_metadata(
- cls, database: "Database", table_name: str, schema_name: str
- ) -> Dict[str, Any]:
- indexes = database.get_indexes(table_name, schema_name)
- if not indexes:
- return {}
- partitions_columns = [
- index.get("column_names", [])
- for index in indexes
- if index.get("name") == "partition"
- ]
- cluster_columns = [
- index.get("column_names", [])
- for index in indexes
- if index.get("name") == "clustering"
- ]
- return {
- "partitions": {"cols": partitions_columns},
- "clustering": {"cols": cluster_columns},
- }
- @classmethod
- def _get_fields(cls, cols: List[Dict[str, Any]]) -> List[ColumnClause]:
- """
- BigQuery dialect requires us to not use backtick in the fieldname which are
- nested.
- Using literal_column handles that issue.
- https://docs.sqlalchemy.org/en/latest/core/tutorial.html#using-more-specific-text-with-table-literal-column-and-column
- Also explicility specifying column names so we don't encounter duplicate
- column names in the result.
- """
- return [
- literal_column(c["name"]).label(c["name"].replace(".", "__")) for c in cols
- ]
- @classmethod
- def epoch_to_dttm(cls) -> str:
- return "TIMESTAMP_SECONDS({col})"
- @classmethod
- def epoch_ms_to_dttm(cls) -> str:
- return "TIMESTAMP_MILLIS({col})"
- @classmethod
- def df_to_sql(cls, df: pd.DataFrame, **kwargs: Any) -> None:
- """
- Upload data from a Pandas DataFrame to BigQuery. Calls
- `DataFrame.to_gbq()` which requires `pandas_gbq` to be installed.
- :param df: Dataframe with data to be uploaded
- :param kwargs: kwargs to be passed to to_gbq() method. Requires both `schema
- and ``name` to be present in kwargs, which are combined and passed to
- `to_gbq()` as `destination_table`.
- """
- try:
- import pandas_gbq
- from google.oauth2 import service_account
- except ImportError:
- raise Exception(
- "Could not import the library `pandas_gbq`, which is "
- "required to be installed in your environment in order "
- "to upload data to BigQuery"
- )
- if not ("name" in kwargs and "schema" in kwargs):
- raise Exception("name and schema need to be defined in kwargs")
- gbq_kwargs = {}
- gbq_kwargs["project_id"] = kwargs["con"].engine.url.host
- gbq_kwargs["destination_table"] = f"{kwargs.pop('schema')}.{kwargs.pop('name')}"
- # add credentials if they are set on the SQLAlchemy Dialect:
- creds = kwargs["con"].dialect.credentials_info
- if creds:
- credentials = service_account.Credentials.from_service_account_info(creds)
- gbq_kwargs["credentials"] = credentials
- # Only pass through supported kwargs
- supported_kwarg_keys = {"if_exists"}
- for key in supported_kwarg_keys:
- if key in kwargs:
- gbq_kwargs[key] = kwargs[key]
- pandas_gbq.to_gbq(df, **gbq_kwargs)
|