GCP: Python та BigQuery

Хмарні технології обробки даних

Автор
Приналежність

Ігор Мірошниченко

КНУ імені Тараса Шевченка | ФІТ

Вимоги

  1. Активувати BigQuery API в консолі GCP
  2. Встановити бібліотеку google-cloud-bigquery:
pip install --upgrade google-cloud-bigquery
pip install google-cloud-bigquery-datatransfer
pip install db-dtypes
  1. Створити сервісний ключ для доступу до BigQuery API (Role: Basic, Roles: Owner) та завантажити його на локальний комп’ютер.

Початок роботи з API

import os
from google.cloud import bigquery, bigquery_datatransfer
import google.auth
import time
import pandas as pd
import datetime
import json

# Завантаження сервісного ключа
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "./data/fit-cloud-course-key.json"

Створення клієнта

client = bigquery.Client('fit-cloud-course')

Запити до BigQuery

sql_query = """
SELECT station_id, name, dockcount
FROM `bigquery-public-data.san_francisco.bikeshare_stations`
LIMIT 5
"""

Виконання запиту

query_job = client.query(sql_query)

print(query_job)
QueryJob<project=fit-cloud-course, location=US, id=3d97ffac-0a07-4a5a-bd45-811d1123a875>

Результати запиту

for row in query_job:
    print(row)
Row((4, 'Santa Clara at Almaden', 11), {'station_id': 0, 'name': 1, 'dockcount': 2})
Row((84, 'Ryland Park', 15), {'station_id': 0, 'name': 1, 'dockcount': 2})
Row((8, 'San Salvador at 1st', 15), {'station_id': 0, 'name': 1, 'dockcount': 2})
Row((9, 'Japantown', 15), {'station_id': 0, 'name': 1, 'dockcount': 2})
Row((3, 'San Jose Civic Center', 15), {'station_id': 0, 'name': 1, 'dockcount': 2})
for row in query_job:
    print(row[0], row[1], row[2], sep=" | ")
4 | Santa Clara at Almaden | 11
84 | Ryland Park | 15
8 | San Salvador at 1st | 15
9 | Japantown | 15
3 | San Jose Civic Center | 15
for row in query_job:
    print(row.station_id, row.name, row.dockcount, sep=" | ")
4 | Santa Clara at Almaden | 11
84 | Ryland Park | 15
8 | San Salvador at 1st | 15
9 | Japantown | 15
3 | San Jose Civic Center | 15

Завантаження даних в BigQuery

JSON

За необхідності, переведемо дані у плоский формат:

json_data = json.load(open('./data/orders.json'))
data_file_path = './data/orders_upload.json'

with open(data_file_path, 'w') as f:
    f.write('\n'.join(json.dumps(record) for record in json_data))
dataset_id = 'dsongcp'
table_id = 'orders'
data_file_path = './data/orders.json'
job_config = bigquery.LoadJobConfig(
    schema=[
        bigquery.SchemaField("order_id", "INT64"),
        bigquery.SchemaField("creation_time", "DATETIME"),
        bigquery.SchemaField("product_ids", "INT64", mode="REPEATED"),
        ],
    source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,
    autodetect=False,    
    write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE
)
table_ref = client.dataset(dataset_id).table(table_id)
with open(data_file_path, "rb") as source_file:
    job = client.load_table_from_file(source_file, table_ref, job_config=job_config)

while job.running():
    print('Завантаження даних...')
    time.sleep(3)
print(job.result())
Завантаження даних...
Завантаження даних...
LoadJob<project=fit-cloud-course, location=US, id=3260a434-b0d5-47eb-b7ff-84c64121945c>

GC Storage

bucket_name = 'fit-cloud-course-dsongcp'
source_uri = f"gs://{bucket_name}/flights/raw/*.csv"
table_id = 'flights_py'
job_config = bigquery.LoadJobConfig(
    autodetect=True,
    source_format=bigquery.SourceFormat.CSV,
    skip_leading_rows=1,
    write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE
)
table_ref = client.dataset(dataset_id).table(table_id)
job = client.load_table_from_uri(
    source_uri,
    table_ref,
    job_config=job_config
)

while job.running():
    print('Завантаження даних...')
    time.sleep(3)
print(job.result())
Завантаження даних...
Завантаження даних...
Завантаження даних...
Завантаження даних...
Завантаження даних...
Завантаження даних...
LoadJob<project=fit-cloud-course, location=US, id=65415b17-6160-4395-9dc1-6084ecdeb941>

CSV

dataset_id = 'dsongcp'
table_id = 'users'
data_file_path = './data/users.csv'

job_config = bigquery.LoadJobConfig(
    source_format=bigquery.SourceFormat.CSV,
    skip_leading_rows=1,
    autodetect=True,
    write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE
)

table_ref = client.dataset(dataset_id).table(table_id)
with open(data_file_path, "rb") as source_file:
    job = client.load_table_from_file(source_file, table_ref, job_config=job_config)

while job.running():
    print('Завантаження даних...')
    time.sleep(3)
print(job.result())
Завантаження даних...
LoadJob<project=fit-cloud-course, location=US, id=20d82c1a-126b-4981-8a51-1c4b0235567e>
table = client.get_table(table_ref)
print(f"Таблиця: {table}\nСпостережень: {table.num_rows}\nЗмінних: {len(table.schema)}")
Таблиця: fit-cloud-course.dsongcp.users
Спостережень: 20331
Змінних: 3

Google Sheets

Підключаємо Google Drive API. Копіюємо email-адресу сервісного акаунта.

credentials, project = google.auth.default(
    scopes=[
      "https://www.googleapis.com/auth/drive",
      "https://www.googleapis.com/auth/bigquery"
    ]
)
client = bigquery.Client(credentials=credentials, project=project)

dataset_id = 'dsongcp'
table_id = 'air_traffic'
table_ref = client.dataset(dataset_id).table(table_id)
table = bigquery.Table(table_ref, schema = [
    bigquery.SchemaField("Activity_Period", "INT64"),
    bigquery.SchemaField("Activity_Period_Start_Date", "DATE"),
    bigquery.SchemaField("Operating_Airline", "STRING"),
    bigquery.SchemaField("Operating_Airline_IATA_Code", "STRING"),
    bigquery.SchemaField("Published_Airline", "STRING"),
    bigquery.SchemaField("Published_Airline_IATA_Code", "STRING"),
    bigquery.SchemaField("GEO_Summary", "STRING"),
    bigquery.SchemaField("GEO_Region", "STRING"),
    bigquery.SchemaField("Activity_Type_Code", "STRING"),
    bigquery.SchemaField("Price_Category_Code", "STRING"),
    bigquery.SchemaField("Terminal", "STRING"),
    bigquery.SchemaField("Boarding_Area", "STRING"),
    bigquery.SchemaField("Passenger_Count", "INT64"),
    bigquery.SchemaField("data_as_of", "STRING"),
    bigquery.SchemaField("data_loaded_at", "STRING")
])

external_config = bigquery.ExternalConfig("GOOGLE_SHEETS")
sheet_url = "https://docs.google.com/spreadsheets/d/1vp7bCvxd3R2zciqxc8A4Qoq5DAvqhzfXJnljmG5OOw0/edit?usp=sharing"
external_config.source_uris = [sheet_url]

options = external_config.google_sheets_options
options.skip_leading_rows = 1
options.range = "air_traffic!A:O"

table.external_data_configuration = external_config
client.create_table(table)
Table(TableReference(DatasetReference('fit-cloud-course', 'dsongcp'), 'air_traffic'))

Мітки даних у BigQuery

Мітка – це пара ключ-значення, яку можна призначити ресурсам Google Cloud BigQuery. Ви можете прикріпити мітку до кожного ресурсу, а потім відфільтрувати ресурси на основі їхніх міток.

Ось кілька типових випадків використання міток:

  • Мітки команди: team:research або team:analytics.
  • Мітки компонентів: наприклад, component:redis, component:frontend, component:ingestта, component:dashboard тощо.
  • Мітки середовища: environment:production і environment:test.
  • Мітки стану: state:active, state:readytodelete і state:archive.
  • Мітки власності: використовуються для ідентифікації команд, які відповідають за операції, наприклад team:shopping-cart.

Створення таблиці

# Створення посилання на таблицю
dataset_ref = bigquery.DatasetReference(client.project, 'dsongcp')

# Створення таблиці
table_ref = bigquery.TableReference(dataset_ref, 'flights_auto')

Створення міток

# Створення міток
labels = {
    'type': 'auto',
    'category': 'transport'
}

# Створення таблиці з мітками
flights_auto_table = client.get_table(table_ref)
# Додавання міток
flights_auto_table.labels = labels
# Оновлення таблиці
client.update_table(flights_auto_table, ['labels'])
Table(TableReference(DatasetReference('fit-cloud-course', 'dsongcp'), 'flights_auto'))

Оновлення міток

# Оновлення міток
new_labels = {
    'type': 'auto',
    'category': 'transport',
    'year': '2015'
}

flights_auto_table = client.get_table(table_ref)
flights_auto_table.labels = new_labels
client.update_table(flights_auto_table, ['labels'])
Table(TableReference(DatasetReference('fit-cloud-course', 'dsongcp'), 'flights_auto'))

Видалення міток

table = client.get_table(table_ref)
labels = table.labels
labels = {k: None for k, v in labels.items()}
table.labels = labels
client.update_table(table, ['labels'])
Table(TableReference(DatasetReference('fit-cloud-course', 'dsongcp'), 'flights_auto'))

Додавання/видалення змінних

# Створення таблиці
dataset_ref = bigquery.DatasetReference(client.project, 'dsongcp')
table_ref = bigquery.TableReference(dataset_ref, 'flights_auto')
bigquery_table = client.get_table(table_ref)

Отримання схеми таблиці:

schema = bigquery_table.schema
schema[:5]
[SchemaField('Year', 'INTEGER', 'NULLABLE', None, None, (), None),
 SchemaField('Quarter', 'INTEGER', 'NULLABLE', None, None, (), None),
 SchemaField('Month', 'INTEGER', 'NULLABLE', None, None, (), None),
 SchemaField('DayofMonth', 'INTEGER', 'NULLABLE', None, None, (), None),
 SchemaField('DayOfWeek', 'INTEGER', 'NULLABLE', None, None, (), None)]

Створимо копію схеми:

new_schema = schema.copy()

Додамо нову змінну у схему:

new_schema.append(bigquery.SchemaField('Distance_km', 'FLOAT64', mode='NULLABLE'))
# Передамо оновлену схему таблиці bigquery_table
bigquery_table.schema = new_schema
Примітка

Тепер зробимо запит на оновлення таблиці:

client.update_table(bigquery_table, ['schema'])
Table(TableReference(DatasetReference('fit-cloud-course', 'dsongcp'), 'flights_auto'))

Переглянемо оновлену схему:

bigquery_table = client.get_table(table_ref)
schema = bigquery_table.schema
schema[-5:]
[SchemaField('Div5LongestGTime', 'STRING', 'NULLABLE', None, None, (), None),
 SchemaField('Div5WheelsOff', 'STRING', 'NULLABLE', None, None, (), None),
 SchemaField('Div5TailNum', 'STRING', 'NULLABLE', None, None, (), None),
 SchemaField('string_field_109', 'STRING', 'NULLABLE', None, None, (), None),
 SchemaField('Distance_km', 'FLOAT', 'NULLABLE', None, None, (), None)]

Видалення змінної з схеми:

query_job = client.query("""
    ALTER TABLE dsongcp.flights_auto
    DROP COLUMN IF EXISTS Distance_km;
""")

Іноді запит може займати деякий час на виконання. Щоб перевірити статус запиту, використовуйте метод query_job.state:

while query_job.state != 'DONE': 
    print('Запит виконується...')
    time.sleep(3)
    query_job.reload()
print(query_job.result())
Запит виконується...
<google.cloud.bigquery.table._EmptyRowIterator object at 0x000001B762ADC110>

Збереження результатів запиту в таблицю

Напишемо запит, який вибере середню затримку вильоту та прибуття для кожного аеропорту відправлення:

sql_query = """
SELECT
  ORIGIN,
  AVG(DepDelay) AS dep_delay,
  AVG(ArrDelay) AS arr_delay,
  COUNT(ArrDelay) AS num_flights
FROM
  `dsongcp.flights_auto`
GROUP BY
  ORIGIN
ORDER BY num_flights DESC
LIMIT 10
"""

Створимо окремий датасет, куди ми збережемо результати запиту:

dataset_id = client.project + '.dsongcp_results'
dataset = bigquery.Dataset(dataset_id)
dataset.location = "US"
dataset = client.create_dataset(dataset, exists_ok=True)
print(f"Створено датасет {client.project}.{dataset.dataset_id}")
Створено датасет fit-cloud-course.dsongcp_results

Визначимо таблицю, в яку ми збережемо результати запиту:

project_id = client.project
dataset_id = 'dsongcp_results'
table_id = 'flights_delay'

Використаємо метод from_string() для створення посилання на таблицю:

table_ref = bigquery.TableReference.from_string(f"{project_id}.{dataset_id}.{table_id}")
table_ref
TableReference(DatasetReference('fit-cloud-course', 'dsongcp_results'), 'flights_delay')

Виконаємо запит та збережемо результати в таблицю:

job_config = bigquery.QueryJobConfig(destination=table_ref, write_disposition="WRITE_TRUNCATE")
query_job = client.query(sql_query, job_config=job_config)
while query_job.state != 'DONE': 
    print('Запит виконується...')
    time.sleep(3)
    query_job.reload()
print(query_job.result())
Запит виконується...
<google.cloud.bigquery.table.RowIterator object at 0x000001B70D15DF10>
Примітка

Запит у DataFrame

Для зручності роботи з даними можна використовувати бібліотеку pandas. Для цього використаємо метод to_dataframe():

query_job = client.query(sql_query)
df = query_job.to_dataframe()
df
ORIGIN dep_delay arr_delay num_flights
0 ATL 7.265885 1.080248 29197
1 DFW 11.761812 9.371627 22571
2 ORD 19.962051 17.016132 22316
3 LAX 7.476341 5.542058 17048
4 DEN 15.506798 11.842325 16775
5 IAH 9.073786 5.353499 13191
6 PHX 8.066723 6.197787 13014
7 SFO 10.328127 9.038425 12570
8 LAS 8.566097 5.054353 11499
9 MCO 9.887441 5.820513 9867

Параметризовані запити

Параметризовані запити дозволяють використовувати змінні в запитах. Це дозволяє виконувати один і той же запит з різними значеннями змінних.

Для цього використовується символ @ перед змінною в запиті. Визначимо параметр month та виконаємо запит:

query_params = [
    bigquery.ScalarQueryParameter("month", "INT64", 1)
]
sql_query = """
SELECT
  ORIGIN,
  AVG(DepDelay) AS dep_delay,
  AVG(ArrDelay) AS arr_delay,
  COUNT(ArrDelay) AS num_flights
FROM
  `dsongcp.flights_auto`
WHERE
  Month = @month
GROUP BY
  ORIGIN
ORDER BY num_flights DESC
LIMIT 10
"""
job_config = bigquery.QueryJobConfig(query_parameters=query_params)
query_job = client.query(sql_query, job_config=job_config)
df = query_job.to_dataframe()
df
ORIGIN dep_delay arr_delay num_flights
0 ATL 7.265885 1.080248 29197
1 DFW 11.761812 9.371627 22571
2 ORD 19.962051 17.016132 22316
3 LAX 7.476341 5.542058 17048
4 DEN 15.506798 11.842325 16775
5 IAH 9.073786 5.353499 13191
6 PHX 8.066723 6.197787 13014
7 SFO 10.328127 9.038425 12570
8 LAS 8.566097 5.054353 11499
9 MCO 9.887441 5.820513 9867

У випадку коли потрібно використати зріз по даті:

query_params = [
    bigquery.ScalarQueryParameter("start_date", "DATE", datetime.date(2015, 1, 1)),
    bigquery.ScalarQueryParameter("end_date", "DATE", datetime.date(2015, 1, 7))
]

Тоді запит буде виглядати наступним чином:

sql_query = """
SELECT
  ORIGIN,
  AVG(DepDelay) AS dep_delay,
  AVG(ArrDelay) AS arr_delay,
  COUNT(ArrDelay) AS num_flights
FROM
  `dsongcp.flights_auto`
WHERE
  FlightDate BETWEEN @start_date AND @end_date
GROUP BY
  ORIGIN
ORDER BY num_flights DESC
LIMIT 10
"""
job_config = bigquery.QueryJobConfig(query_parameters=query_params)
query_job = client.query(sql_query, job_config=job_config)
df = query_job.to_dataframe()
df
ORIGIN dep_delay arr_delay num_flights
0 ATL 13.223205 6.869232 6653
1 DFW 28.783017 27.239105 5094
2 ORD 38.539200 38.549260 5065
3 DEN 34.550181 33.524051 4137
4 LAX 14.595703 15.126316 4085
5 IAH 19.919259 14.409795 3165
6 PHX 14.584003 14.319293 3110
7 SFO 12.263669 10.925950 3079
8 LAS 15.962792 13.408916 2602
9 MCO 21.239204 17.322841 2351

І третій варіант - позиційні параметри:

query_params = [
    bigquery.ScalarQueryParameter(None, "FLOAT64", 0),
    bigquery.ScalarQueryParameter(None, "FLOAT64", 0)

]
sql_query = """
SELECT
  ORIGIN,
  AVG(DepDelay) AS dep_delay,
  AVG(ArrDelay) AS arr_delay,
  COUNT(ArrDelay) AS num_flights
FROM
  `dsongcp.flights_auto`
WHERE
  DepDelay < ? AND ArrDelay < ?
GROUP BY
  ORIGIN
ORDER BY num_flights DESC
LIMIT 10
"""
job_config = bigquery.QueryJobConfig(query_parameters=query_params)
query_job = client.query(sql_query, job_config=job_config)
df = query_job.to_dataframe()
df
ORIGIN dep_delay arr_delay num_flights
0 ATL -3.487487 -11.962145 14265
1 DFW -4.595339 -10.509032 8968
2 ORD -4.753784 -13.876310 7729
3 LAX -5.053177 -11.830387 7240
4 DEN -4.942687 -13.282285 5374
5 PHX -4.864400 -11.197356 5295
6 SFO -5.341159 -10.963271 5282
7 IAH -4.987881 -11.642113 5281
8 LAS -5.255193 -11.990661 5247
9 SLC -5.100784 -13.151624 4465

Ще один варіант - використанням масивів:

query_params = [
    bigquery.ArrayQueryParameter("ORIGIN", "STRING", ["PHX", "ATL", "LAS"]),
    bigquery.ScalarQueryParameter("DepDelay", "FLOAT64", 10),
]
sql_query = """
SELECT
  ORIGIN,
  AVG(DepDelay) AS dep_delay,
  AVG(ArrDelay) AS arr_delay,
  COUNT(ArrDelay) AS num_flights
FROM
  `dsongcp.flights_auto`
WHERE
  ORIGIN IN UNNEST(@ORIGIN)
  AND DepDelay > @DepDelay
GROUP BY
  ORIGIN
ORDER BY num_flights DESC
"""
job_config = bigquery.QueryJobConfig(query_parameters=query_params)
query_job = client.query(sql_query, job_config=job_config)
df = query_job.to_dataframe()
df
ORIGIN dep_delay arr_delay num_flights
0 ATL 44.575120 38.036858 5589
1 PHX 44.808705 42.578725 2839
2 LAS 46.203759 42.609581 2651

Таблиця BG -> GC Storage

client = bigquery.Client('fit-cloud-course')
dataset_id = 'dsongcp'
bucket_name = 'fit-cloud-course-dsongcp'
table_ref = bigquery.TableReference.from_string(f"{client.project}.{dataset_id}.flights_auto")
job_config = bigquery.job.ExtractJobConfig()
# job_config.destination_format = bigquery.DestinationFormat.CSV
job_config.destination_format = bigquery.DestinationFormat.NEWLINE_DELIMITED_JSON
destination_uri = f"gs://{bucket_name}/results/flights_delay.json"

extract_job = client.extract_table(
    table_ref,
    destination_uri,
    job_config=job_config,
    location="US"
)
extract_job.result()
ExtractJob<project=fit-cloud-course, location=US, id=a90b3262-15f8-48a1-a259-39aa0e8137cb>

Робота з датасетами

Підключіть BigQuery Data Transfer API.

Створимо клас DatasetManager для роботи з датасетами:

class DatasetManager:
    def __init__(self, client):
        self.client = client

    def delete_dataset(self, dataset_id):
        self.client.delete_dataset(dataset_id, not_found_ok=True, delete_contents=True)

    def create_dataset(self, dataset_id, location='US'):
        dataset = bigquery.Dataset(dataset_id)
        dataset.location = location
        dataset_ = self.client.create_dataset(dataset, timeout=30)
        print(f"Датасет {self.client.project}.{dataset.dataset_id} створено")
        return dataset_

    def list_dataset(self):
        datasets = self.client.list_datasets()
        return [dataset.dataset_id for dataset in datasets]

    def copy_dataset(self, source_project_id, source_dataset_id, destination_project_id, destination_dataset_id, display_name):
        """
        Посилання:
        https://cloud.google.com/bigquery-transfer/docs/reference/datatransfer/rest/v1/projects.transferConfigs

        Надати доступ до IAM:
        https://cloud.google.com/bigquery/docs/enable-transfer-service#grant_bigqueryadmin_access

        Зауважте:
        - це також призведе до переміщення таблиць.
        - підключіть BigQuery Data Transfer API.
        """
        transfer_client = bigquery_datatransfer.DataTransferServiceClient()
        transfer_config = bigquery_datatransfer.TransferConfig(
            destination_dataset_id=destination_dataset_id,
            display_name=display_name,
            data_source_id="cross_region_copy", # 
            params={
                "source_project_id": source_project_id,
                "source_dataset_id": source_dataset_id,
            },
        )
        transfer_config = transfer_client.create_transfer_config(
            parent=transfer_client.common_project_path(destination_project_id),
            transfer_config=transfer_config,
        )
        print(f"Створено конфігурацію передачі: {transfer_config.name}")

Ініціалізуємо клієнт та клас:

client = bigquery.Client('fit-cloud-course')
dataset_manager = DatasetManager(client)

Виведемо список датасетів:

dataset_manager.list_dataset()
['dsongcp', 'dsongcp_results', 'food_delivery']

Створимо новий датасет:

dataset_name = f'{client.project}.ny_taxi_trips'
dataset = dataset_manager.create_dataset(dataset_name)
Датасет fit-cloud-course.ny_taxi_trips створено

Скопіюємо датасет:

source_project_id = 'bigquery-public-data'
source_dataset_id = 'new_york_taxi_trips'
destination_project_id = client.project
destination_dataset_id = 'ny_taxi_trips'
display_name = 'NY Taxi Trips'

dataset_manager.copy_dataset(source_project_id, source_dataset_id, destination_project_id, destination_dataset_id, display_name)
Створено конфігурацію передачі: projects/326133795346/locations/us/transferConfigs/65fb972b-0000-23f7-bdb8-089e0826d31c

Видалимо датасет:

dataset_manager.delete_dataset(dataset_name)