import os
from google.cloud import bigquery, bigquery_datatransfer
import google.auth
import time
import pandas as pd
import datetime
import json
# Завантаження сервісного ключа
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "./data/fit-cloud-course-key.json"GCP: Python та BigQuery
Хмарні технології обробки даних
Вимоги
- Активувати BigQuery API в консолі GCP
- Встановити бібліотеку
google-cloud-bigquery:
pip install --upgrade google-cloud-bigquery
pip install google-cloud-bigquery-datatransfer
pip install db-dtypes- Створити сервісний ключ для доступу до BigQuery API (Role: Basic, Roles: Owner) та завантажити його на локальний комп’ютер.
Початок роботи з API
Створення клієнта
client = bigquery.Client('fit-cloud-course')Запити до BigQuery
sql_query = """
SELECT station_id, name, dockcount
FROM `bigquery-public-data.san_francisco.bikeshare_stations`
LIMIT 5
"""Виконання запиту
query_job = client.query(sql_query)
print(query_job)QueryJob<project=fit-cloud-course, location=US, id=3d97ffac-0a07-4a5a-bd45-811d1123a875>
Результати запиту
for row in query_job:
print(row)Row((4, 'Santa Clara at Almaden', 11), {'station_id': 0, 'name': 1, 'dockcount': 2})
Row((84, 'Ryland Park', 15), {'station_id': 0, 'name': 1, 'dockcount': 2})
Row((8, 'San Salvador at 1st', 15), {'station_id': 0, 'name': 1, 'dockcount': 2})
Row((9, 'Japantown', 15), {'station_id': 0, 'name': 1, 'dockcount': 2})
Row((3, 'San Jose Civic Center', 15), {'station_id': 0, 'name': 1, 'dockcount': 2})
for row in query_job:
print(row[0], row[1], row[2], sep=" | ")4 | Santa Clara at Almaden | 11
84 | Ryland Park | 15
8 | San Salvador at 1st | 15
9 | Japantown | 15
3 | San Jose Civic Center | 15
for row in query_job:
print(row.station_id, row.name, row.dockcount, sep=" | ")4 | Santa Clara at Almaden | 11
84 | Ryland Park | 15
8 | San Salvador at 1st | 15
9 | Japantown | 15
3 | San Jose Civic Center | 15
Завантаження даних в BigQuery
JSON
За необхідності, переведемо дані у плоский формат:
json_data = json.load(open('./data/orders.json'))
data_file_path = './data/orders_upload.json'
with open(data_file_path, 'w') as f:
f.write('\n'.join(json.dumps(record) for record in json_data))dataset_id = 'dsongcp'
table_id = 'orders'
data_file_path = './data/orders.json'job_config = bigquery.LoadJobConfig(
schema=[
bigquery.SchemaField("order_id", "INT64"),
bigquery.SchemaField("creation_time", "DATETIME"),
bigquery.SchemaField("product_ids", "INT64", mode="REPEATED"),
],
source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,
autodetect=False,
write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE
)table_ref = client.dataset(dataset_id).table(table_id)
with open(data_file_path, "rb") as source_file:
job = client.load_table_from_file(source_file, table_ref, job_config=job_config)
while job.running():
print('Завантаження даних...')
time.sleep(3)
print(job.result())Завантаження даних...
Завантаження даних...
LoadJob<project=fit-cloud-course, location=US, id=3260a434-b0d5-47eb-b7ff-84c64121945c>
GC Storage
bucket_name = 'fit-cloud-course-dsongcp'
source_uri = f"gs://{bucket_name}/flights/raw/*.csv"
table_id = 'flights_py'job_config = bigquery.LoadJobConfig(
autodetect=True,
source_format=bigquery.SourceFormat.CSV,
skip_leading_rows=1,
write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE
)table_ref = client.dataset(dataset_id).table(table_id)
job = client.load_table_from_uri(
source_uri,
table_ref,
job_config=job_config
)
while job.running():
print('Завантаження даних...')
time.sleep(3)
print(job.result())Завантаження даних...
Завантаження даних...
Завантаження даних...
Завантаження даних...
Завантаження даних...
Завантаження даних...
LoadJob<project=fit-cloud-course, location=US, id=65415b17-6160-4395-9dc1-6084ecdeb941>
CSV
dataset_id = 'dsongcp'
table_id = 'users'
data_file_path = './data/users.csv'
job_config = bigquery.LoadJobConfig(
source_format=bigquery.SourceFormat.CSV,
skip_leading_rows=1,
autodetect=True,
write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE
)
table_ref = client.dataset(dataset_id).table(table_id)
with open(data_file_path, "rb") as source_file:
job = client.load_table_from_file(source_file, table_ref, job_config=job_config)
while job.running():
print('Завантаження даних...')
time.sleep(3)
print(job.result())Завантаження даних...
LoadJob<project=fit-cloud-course, location=US, id=20d82c1a-126b-4981-8a51-1c4b0235567e>
table = client.get_table(table_ref)
print(f"Таблиця: {table}\nСпостережень: {table.num_rows}\nЗмінних: {len(table.schema)}")Таблиця: fit-cloud-course.dsongcp.users
Спостережень: 20331
Змінних: 3
Google Sheets
Підключаємо Google Drive API. Копіюємо email-адресу сервісного акаунта.
credentials, project = google.auth.default(
scopes=[
"https://www.googleapis.com/auth/drive",
"https://www.googleapis.com/auth/bigquery"
]
)client = bigquery.Client(credentials=credentials, project=project)
dataset_id = 'dsongcp'
table_id = 'air_traffic'
table_ref = client.dataset(dataset_id).table(table_id)table = bigquery.Table(table_ref, schema = [
bigquery.SchemaField("Activity_Period", "INT64"),
bigquery.SchemaField("Activity_Period_Start_Date", "DATE"),
bigquery.SchemaField("Operating_Airline", "STRING"),
bigquery.SchemaField("Operating_Airline_IATA_Code", "STRING"),
bigquery.SchemaField("Published_Airline", "STRING"),
bigquery.SchemaField("Published_Airline_IATA_Code", "STRING"),
bigquery.SchemaField("GEO_Summary", "STRING"),
bigquery.SchemaField("GEO_Region", "STRING"),
bigquery.SchemaField("Activity_Type_Code", "STRING"),
bigquery.SchemaField("Price_Category_Code", "STRING"),
bigquery.SchemaField("Terminal", "STRING"),
bigquery.SchemaField("Boarding_Area", "STRING"),
bigquery.SchemaField("Passenger_Count", "INT64"),
bigquery.SchemaField("data_as_of", "STRING"),
bigquery.SchemaField("data_loaded_at", "STRING")
])
external_config = bigquery.ExternalConfig("GOOGLE_SHEETS")
sheet_url = "https://docs.google.com/spreadsheets/d/1vp7bCvxd3R2zciqxc8A4Qoq5DAvqhzfXJnljmG5OOw0/edit?usp=sharing"
external_config.source_uris = [sheet_url]
options = external_config.google_sheets_options
options.skip_leading_rows = 1
options.range = "air_traffic!A:O"
table.external_data_configuration = external_config
client.create_table(table)Table(TableReference(DatasetReference('fit-cloud-course', 'dsongcp'), 'air_traffic'))
Мітки даних у BigQuery
Мітка – це пара ключ-значення, яку можна призначити ресурсам Google Cloud BigQuery. Ви можете прикріпити мітку до кожного ресурсу, а потім відфільтрувати ресурси на основі їхніх міток.
Ось кілька типових випадків використання міток:
- Мітки команди:
team:researchабоteam:analytics. - Мітки компонентів: наприклад,
component:redis,component:frontend,component:ingestта,component:dashboardтощо. - Мітки середовища:
environment:productionіenvironment:test. - Мітки стану:
state:active,state:readytodeleteіstate:archive. - Мітки власності: використовуються для ідентифікації команд, які відповідають за операції, наприклад
team:shopping-cart.
Створення таблиці
# Створення посилання на таблицю
dataset_ref = bigquery.DatasetReference(client.project, 'dsongcp')
# Створення таблиці
table_ref = bigquery.TableReference(dataset_ref, 'flights_auto')Створення міток
# Створення міток
labels = {
'type': 'auto',
'category': 'transport'
}
# Створення таблиці з мітками
flights_auto_table = client.get_table(table_ref)
# Додавання міток
flights_auto_table.labels = labels
# Оновлення таблиці
client.update_table(flights_auto_table, ['labels'])Table(TableReference(DatasetReference('fit-cloud-course', 'dsongcp'), 'flights_auto'))
Оновлення міток
# Оновлення міток
new_labels = {
'type': 'auto',
'category': 'transport',
'year': '2015'
}
flights_auto_table = client.get_table(table_ref)
flights_auto_table.labels = new_labels
client.update_table(flights_auto_table, ['labels'])Table(TableReference(DatasetReference('fit-cloud-course', 'dsongcp'), 'flights_auto'))
Видалення міток
table = client.get_table(table_ref)
labels = table.labels
labels = {k: None for k, v in labels.items()}
table.labels = labels
client.update_table(table, ['labels'])Table(TableReference(DatasetReference('fit-cloud-course', 'dsongcp'), 'flights_auto'))
Додавання/видалення змінних
# Створення таблиці
dataset_ref = bigquery.DatasetReference(client.project, 'dsongcp')
table_ref = bigquery.TableReference(dataset_ref, 'flights_auto')
bigquery_table = client.get_table(table_ref)Отримання схеми таблиці:
schema = bigquery_table.schema
schema[:5][SchemaField('Year', 'INTEGER', 'NULLABLE', None, None, (), None),
SchemaField('Quarter', 'INTEGER', 'NULLABLE', None, None, (), None),
SchemaField('Month', 'INTEGER', 'NULLABLE', None, None, (), None),
SchemaField('DayofMonth', 'INTEGER', 'NULLABLE', None, None, (), None),
SchemaField('DayOfWeek', 'INTEGER', 'NULLABLE', None, None, (), None)]
Створимо копію схеми:
new_schema = schema.copy()Додамо нову змінну у схему:
new_schema.append(bigquery.SchemaField('Distance_km', 'FLOAT64', mode='NULLABLE'))
# Передамо оновлену схему таблиці bigquery_table
bigquery_table.schema = new_schemaТипи даних в BigQuery: https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types.
Тепер зробимо запит на оновлення таблиці:
client.update_table(bigquery_table, ['schema'])Table(TableReference(DatasetReference('fit-cloud-course', 'dsongcp'), 'flights_auto'))
Переглянемо оновлену схему:
bigquery_table = client.get_table(table_ref)
schema = bigquery_table.schema
schema[-5:][SchemaField('Div5LongestGTime', 'STRING', 'NULLABLE', None, None, (), None),
SchemaField('Div5WheelsOff', 'STRING', 'NULLABLE', None, None, (), None),
SchemaField('Div5TailNum', 'STRING', 'NULLABLE', None, None, (), None),
SchemaField('string_field_109', 'STRING', 'NULLABLE', None, None, (), None),
SchemaField('Distance_km', 'FLOAT', 'NULLABLE', None, None, (), None)]
Видалення змінної з схеми:
query_job = client.query("""
ALTER TABLE dsongcp.flights_auto
DROP COLUMN IF EXISTS Distance_km;
""")Іноді запит може займати деякий час на виконання. Щоб перевірити статус запиту, використовуйте метод query_job.state:
while query_job.state != 'DONE':
print('Запит виконується...')
time.sleep(3)
query_job.reload()
print(query_job.result())Запит виконується...
<google.cloud.bigquery.table._EmptyRowIterator object at 0x000001B762ADC110>
Збереження результатів запиту в таблицю
Напишемо запит, який вибере середню затримку вильоту та прибуття для кожного аеропорту відправлення:
sql_query = """
SELECT
ORIGIN,
AVG(DepDelay) AS dep_delay,
AVG(ArrDelay) AS arr_delay,
COUNT(ArrDelay) AS num_flights
FROM
`dsongcp.flights_auto`
GROUP BY
ORIGIN
ORDER BY num_flights DESC
LIMIT 10
"""Створимо окремий датасет, куди ми збережемо результати запиту:
dataset_id = client.project + '.dsongcp_results'
dataset = bigquery.Dataset(dataset_id)
dataset.location = "US"
dataset = client.create_dataset(dataset, exists_ok=True)
print(f"Створено датасет {client.project}.{dataset.dataset_id}")Створено датасет fit-cloud-course.dsongcp_results
Визначимо таблицю, в яку ми збережемо результати запиту:
project_id = client.project
dataset_id = 'dsongcp_results'
table_id = 'flights_delay'Використаємо метод from_string() для створення посилання на таблицю:
table_ref = bigquery.TableReference.from_string(f"{project_id}.{dataset_id}.{table_id}")
table_refTableReference(DatasetReference('fit-cloud-course', 'dsongcp_results'), 'flights_delay')
Виконаємо запит та збережемо результати в таблицю:
job_config = bigquery.QueryJobConfig(destination=table_ref, write_disposition="WRITE_TRUNCATE")
query_job = client.query(sql_query, job_config=job_config)
while query_job.state != 'DONE':
print('Запит виконується...')
time.sleep(3)
query_job.reload()
print(query_job.result())Запит виконується...
<google.cloud.bigquery.table.RowIterator object at 0x000001B70D15DF10>
Більше про QueryJobConfig за посиланням https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.job.QueryJobConfig
Запит у DataFrame
Для зручності роботи з даними можна використовувати бібліотеку pandas. Для цього використаємо метод to_dataframe():
query_job = client.query(sql_query)
df = query_job.to_dataframe()
df| ORIGIN | dep_delay | arr_delay | num_flights | |
|---|---|---|---|---|
| 0 | ATL | 7.265885 | 1.080248 | 29197 |
| 1 | DFW | 11.761812 | 9.371627 | 22571 |
| 2 | ORD | 19.962051 | 17.016132 | 22316 |
| 3 | LAX | 7.476341 | 5.542058 | 17048 |
| 4 | DEN | 15.506798 | 11.842325 | 16775 |
| 5 | IAH | 9.073786 | 5.353499 | 13191 |
| 6 | PHX | 8.066723 | 6.197787 | 13014 |
| 7 | SFO | 10.328127 | 9.038425 | 12570 |
| 8 | LAS | 8.566097 | 5.054353 | 11499 |
| 9 | MCO | 9.887441 | 5.820513 | 9867 |
Параметризовані запити
Параметризовані запити дозволяють використовувати змінні в запитах. Це дозволяє виконувати один і той же запит з різними значеннями змінних.
Для цього використовується символ @ перед змінною в запиті. Визначимо параметр month та виконаємо запит:
query_params = [
bigquery.ScalarQueryParameter("month", "INT64", 1)
]sql_query = """
SELECT
ORIGIN,
AVG(DepDelay) AS dep_delay,
AVG(ArrDelay) AS arr_delay,
COUNT(ArrDelay) AS num_flights
FROM
`dsongcp.flights_auto`
WHERE
Month = @month
GROUP BY
ORIGIN
ORDER BY num_flights DESC
LIMIT 10
"""job_config = bigquery.QueryJobConfig(query_parameters=query_params)
query_job = client.query(sql_query, job_config=job_config)
df = query_job.to_dataframe()
df| ORIGIN | dep_delay | arr_delay | num_flights | |
|---|---|---|---|---|
| 0 | ATL | 7.265885 | 1.080248 | 29197 |
| 1 | DFW | 11.761812 | 9.371627 | 22571 |
| 2 | ORD | 19.962051 | 17.016132 | 22316 |
| 3 | LAX | 7.476341 | 5.542058 | 17048 |
| 4 | DEN | 15.506798 | 11.842325 | 16775 |
| 5 | IAH | 9.073786 | 5.353499 | 13191 |
| 6 | PHX | 8.066723 | 6.197787 | 13014 |
| 7 | SFO | 10.328127 | 9.038425 | 12570 |
| 8 | LAS | 8.566097 | 5.054353 | 11499 |
| 9 | MCO | 9.887441 | 5.820513 | 9867 |
У випадку коли потрібно використати зріз по даті:
query_params = [
bigquery.ScalarQueryParameter("start_date", "DATE", datetime.date(2015, 1, 1)),
bigquery.ScalarQueryParameter("end_date", "DATE", datetime.date(2015, 1, 7))
]Тоді запит буде виглядати наступним чином:
sql_query = """
SELECT
ORIGIN,
AVG(DepDelay) AS dep_delay,
AVG(ArrDelay) AS arr_delay,
COUNT(ArrDelay) AS num_flights
FROM
`dsongcp.flights_auto`
WHERE
FlightDate BETWEEN @start_date AND @end_date
GROUP BY
ORIGIN
ORDER BY num_flights DESC
LIMIT 10
"""job_config = bigquery.QueryJobConfig(query_parameters=query_params)
query_job = client.query(sql_query, job_config=job_config)
df = query_job.to_dataframe()
df| ORIGIN | dep_delay | arr_delay | num_flights | |
|---|---|---|---|---|
| 0 | ATL | 13.223205 | 6.869232 | 6653 |
| 1 | DFW | 28.783017 | 27.239105 | 5094 |
| 2 | ORD | 38.539200 | 38.549260 | 5065 |
| 3 | DEN | 34.550181 | 33.524051 | 4137 |
| 4 | LAX | 14.595703 | 15.126316 | 4085 |
| 5 | IAH | 19.919259 | 14.409795 | 3165 |
| 6 | PHX | 14.584003 | 14.319293 | 3110 |
| 7 | SFO | 12.263669 | 10.925950 | 3079 |
| 8 | LAS | 15.962792 | 13.408916 | 2602 |
| 9 | MCO | 21.239204 | 17.322841 | 2351 |
І третій варіант - позиційні параметри:
query_params = [
bigquery.ScalarQueryParameter(None, "FLOAT64", 0),
bigquery.ScalarQueryParameter(None, "FLOAT64", 0)
]sql_query = """
SELECT
ORIGIN,
AVG(DepDelay) AS dep_delay,
AVG(ArrDelay) AS arr_delay,
COUNT(ArrDelay) AS num_flights
FROM
`dsongcp.flights_auto`
WHERE
DepDelay < ? AND ArrDelay < ?
GROUP BY
ORIGIN
ORDER BY num_flights DESC
LIMIT 10
"""job_config = bigquery.QueryJobConfig(query_parameters=query_params)
query_job = client.query(sql_query, job_config=job_config)
df = query_job.to_dataframe()
df| ORIGIN | dep_delay | arr_delay | num_flights | |
|---|---|---|---|---|
| 0 | ATL | -3.487487 | -11.962145 | 14265 |
| 1 | DFW | -4.595339 | -10.509032 | 8968 |
| 2 | ORD | -4.753784 | -13.876310 | 7729 |
| 3 | LAX | -5.053177 | -11.830387 | 7240 |
| 4 | DEN | -4.942687 | -13.282285 | 5374 |
| 5 | PHX | -4.864400 | -11.197356 | 5295 |
| 6 | SFO | -5.341159 | -10.963271 | 5282 |
| 7 | IAH | -4.987881 | -11.642113 | 5281 |
| 8 | LAS | -5.255193 | -11.990661 | 5247 |
| 9 | SLC | -5.100784 | -13.151624 | 4465 |
Ще один варіант - використанням масивів:
query_params = [
bigquery.ArrayQueryParameter("ORIGIN", "STRING", ["PHX", "ATL", "LAS"]),
bigquery.ScalarQueryParameter("DepDelay", "FLOAT64", 10),
]sql_query = """
SELECT
ORIGIN,
AVG(DepDelay) AS dep_delay,
AVG(ArrDelay) AS arr_delay,
COUNT(ArrDelay) AS num_flights
FROM
`dsongcp.flights_auto`
WHERE
ORIGIN IN UNNEST(@ORIGIN)
AND DepDelay > @DepDelay
GROUP BY
ORIGIN
ORDER BY num_flights DESC
"""job_config = bigquery.QueryJobConfig(query_parameters=query_params)
query_job = client.query(sql_query, job_config=job_config)
df = query_job.to_dataframe()
df| ORIGIN | dep_delay | arr_delay | num_flights | |
|---|---|---|---|---|
| 0 | ATL | 44.575120 | 38.036858 | 5589 |
| 1 | PHX | 44.808705 | 42.578725 | 2839 |
| 2 | LAS | 46.203759 | 42.609581 | 2651 |
Таблиця BG -> GC Storage
client = bigquery.Client('fit-cloud-course')
dataset_id = 'dsongcp'
bucket_name = 'fit-cloud-course-dsongcp'table_ref = bigquery.TableReference.from_string(f"{client.project}.{dataset_id}.flights_auto")job_config = bigquery.job.ExtractJobConfig()
# job_config.destination_format = bigquery.DestinationFormat.CSV
job_config.destination_format = bigquery.DestinationFormat.NEWLINE_DELIMITED_JSONdestination_uri = f"gs://{bucket_name}/results/flights_delay.json"
extract_job = client.extract_table(
table_ref,
destination_uri,
job_config=job_config,
location="US"
)
extract_job.result()ExtractJob<project=fit-cloud-course, location=US, id=a90b3262-15f8-48a1-a259-39aa0e8137cb>
Робота з датасетами
Підключіть BigQuery Data Transfer API.
Створимо клас DatasetManager для роботи з датасетами:
class DatasetManager:
def __init__(self, client):
self.client = client
def delete_dataset(self, dataset_id):
self.client.delete_dataset(dataset_id, not_found_ok=True, delete_contents=True)
def create_dataset(self, dataset_id, location='US'):
dataset = bigquery.Dataset(dataset_id)
dataset.location = location
dataset_ = self.client.create_dataset(dataset, timeout=30)
print(f"Датасет {self.client.project}.{dataset.dataset_id} створено")
return dataset_
def list_dataset(self):
datasets = self.client.list_datasets()
return [dataset.dataset_id for dataset in datasets]
def copy_dataset(self, source_project_id, source_dataset_id, destination_project_id, destination_dataset_id, display_name):
"""
Посилання:
https://cloud.google.com/bigquery-transfer/docs/reference/datatransfer/rest/v1/projects.transferConfigs
Надати доступ до IAM:
https://cloud.google.com/bigquery/docs/enable-transfer-service#grant_bigqueryadmin_access
Зауважте:
- це також призведе до переміщення таблиць.
- підключіть BigQuery Data Transfer API.
"""
transfer_client = bigquery_datatransfer.DataTransferServiceClient()
transfer_config = bigquery_datatransfer.TransferConfig(
destination_dataset_id=destination_dataset_id,
display_name=display_name,
data_source_id="cross_region_copy", #
params={
"source_project_id": source_project_id,
"source_dataset_id": source_dataset_id,
},
)
transfer_config = transfer_client.create_transfer_config(
parent=transfer_client.common_project_path(destination_project_id),
transfer_config=transfer_config,
)
print(f"Створено конфігурацію передачі: {transfer_config.name}")Ініціалізуємо клієнт та клас:
client = bigquery.Client('fit-cloud-course')
dataset_manager = DatasetManager(client)Виведемо список датасетів:
dataset_manager.list_dataset()['dsongcp', 'dsongcp_results', 'food_delivery']
Створимо новий датасет:
dataset_name = f'{client.project}.ny_taxi_trips'
dataset = dataset_manager.create_dataset(dataset_name)Датасет fit-cloud-course.ny_taxi_trips створено
Скопіюємо датасет:
source_project_id = 'bigquery-public-data'
source_dataset_id = 'new_york_taxi_trips'
destination_project_id = client.project
destination_dataset_id = 'ny_taxi_trips'
display_name = 'NY Taxi Trips'
dataset_manager.copy_dataset(source_project_id, source_dataset_id, destination_project_id, destination_dataset_id, display_name)Створено конфігурацію передачі: projects/326133795346/locations/us/transferConfigs/65fb972b-0000-23f7-bdb8-089e0826d31c
Видалимо датасет:
dataset_manager.delete_dataset(dataset_name)