import os
from google.cloud import bigquery, bigquery_datatransfer
import google.auth
import time
import pandas as pd
import datetime
import json
# Завантаження сервісного ключа
"GOOGLE_APPLICATION_CREDENTIALS"] = "./data/fit-cloud-course-key.json" os.environ[
GCP: Python та BigQuery
Хмарні технології обробки даних
Вимоги
- Активувати BigQuery API в консолі GCP
- Встановити бібліотеку
google-cloud-bigquery
:
pip install --upgrade google-cloud-bigquery
pip install google-cloud-bigquery-datatransfer
pip install db-dtypes
- Створити сервісний ключ для доступу до BigQuery API (Role: Basic, Roles: Owner) та завантажити його на локальний комп’ютер.
Початок роботи з API
Створення клієнта
= bigquery.Client('fit-cloud-course') client
Запити до BigQuery
= """
sql_query SELECT station_id, name, dockcount
FROM `bigquery-public-data.san_francisco.bikeshare_stations`
LIMIT 5
"""
Виконання запиту
= client.query(sql_query)
query_job
print(query_job)
QueryJob<project=fit-cloud-course, location=US, id=3d97ffac-0a07-4a5a-bd45-811d1123a875>
Результати запиту
for row in query_job:
print(row)
Row((4, 'Santa Clara at Almaden', 11), {'station_id': 0, 'name': 1, 'dockcount': 2})
Row((84, 'Ryland Park', 15), {'station_id': 0, 'name': 1, 'dockcount': 2})
Row((8, 'San Salvador at 1st', 15), {'station_id': 0, 'name': 1, 'dockcount': 2})
Row((9, 'Japantown', 15), {'station_id': 0, 'name': 1, 'dockcount': 2})
Row((3, 'San Jose Civic Center', 15), {'station_id': 0, 'name': 1, 'dockcount': 2})
for row in query_job:
print(row[0], row[1], row[2], sep=" | ")
4 | Santa Clara at Almaden | 11
84 | Ryland Park | 15
8 | San Salvador at 1st | 15
9 | Japantown | 15
3 | San Jose Civic Center | 15
for row in query_job:
print(row.station_id, row.name, row.dockcount, sep=" | ")
4 | Santa Clara at Almaden | 11
84 | Ryland Park | 15
8 | San Salvador at 1st | 15
9 | Japantown | 15
3 | San Jose Civic Center | 15
Завантаження даних в BigQuery
JSON
За необхідності, переведемо дані у плоский формат:
= json.load(open('./data/orders.json'))
json_data = './data/orders_upload.json'
data_file_path
with open(data_file_path, 'w') as f:
'\n'.join(json.dumps(record) for record in json_data)) f.write(
= 'dsongcp'
dataset_id = 'orders'
table_id = './data/orders.json' data_file_path
= bigquery.LoadJobConfig(
job_config =[
schema"order_id", "INT64"),
bigquery.SchemaField("creation_time", "DATETIME"),
bigquery.SchemaField("product_ids", "INT64", mode="REPEATED"),
bigquery.SchemaField(
],=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,
source_format=False,
autodetect=bigquery.WriteDisposition.WRITE_TRUNCATE
write_disposition )
= client.dataset(dataset_id).table(table_id)
table_ref with open(data_file_path, "rb") as source_file:
= client.load_table_from_file(source_file, table_ref, job_config=job_config)
job
while job.running():
print('Завантаження даних...')
3)
time.sleep(print(job.result())
Завантаження даних...
Завантаження даних...
LoadJob<project=fit-cloud-course, location=US, id=3260a434-b0d5-47eb-b7ff-84c64121945c>
GC Storage
= 'fit-cloud-course-dsongcp'
bucket_name = f"gs://{bucket_name}/flights/raw/*.csv"
source_uri = 'flights_py' table_id
= bigquery.LoadJobConfig(
job_config =True,
autodetect=bigquery.SourceFormat.CSV,
source_format=1,
skip_leading_rows=bigquery.WriteDisposition.WRITE_TRUNCATE
write_disposition )
= client.dataset(dataset_id).table(table_id)
table_ref = client.load_table_from_uri(
job
source_uri,
table_ref,=job_config
job_config
)
while job.running():
print('Завантаження даних...')
3)
time.sleep(print(job.result())
Завантаження даних...
Завантаження даних...
Завантаження даних...
Завантаження даних...
Завантаження даних...
Завантаження даних...
LoadJob<project=fit-cloud-course, location=US, id=65415b17-6160-4395-9dc1-6084ecdeb941>
CSV
= 'dsongcp'
dataset_id = 'users'
table_id = './data/users.csv'
data_file_path
= bigquery.LoadJobConfig(
job_config =bigquery.SourceFormat.CSV,
source_format=1,
skip_leading_rows=True,
autodetect=bigquery.WriteDisposition.WRITE_TRUNCATE
write_disposition
)
= client.dataset(dataset_id).table(table_id)
table_ref with open(data_file_path, "rb") as source_file:
= client.load_table_from_file(source_file, table_ref, job_config=job_config)
job
while job.running():
print('Завантаження даних...')
3)
time.sleep(print(job.result())
Завантаження даних...
LoadJob<project=fit-cloud-course, location=US, id=20d82c1a-126b-4981-8a51-1c4b0235567e>
= client.get_table(table_ref)
table print(f"Таблиця: {table}\nСпостережень: {table.num_rows}\nЗмінних: {len(table.schema)}")
Таблиця: fit-cloud-course.dsongcp.users
Спостережень: 20331
Змінних: 3
Google Sheets
Підключаємо Google Drive API. Копіюємо email-адресу сервісного акаунта.
= google.auth.default(
credentials, project =[
scopes"https://www.googleapis.com/auth/drive",
"https://www.googleapis.com/auth/bigquery"
] )
= bigquery.Client(credentials=credentials, project=project)
client
= 'dsongcp'
dataset_id = 'air_traffic'
table_id = client.dataset(dataset_id).table(table_id) table_ref
= bigquery.Table(table_ref, schema = [
table "Activity_Period", "INT64"),
bigquery.SchemaField("Activity_Period_Start_Date", "DATE"),
bigquery.SchemaField("Operating_Airline", "STRING"),
bigquery.SchemaField("Operating_Airline_IATA_Code", "STRING"),
bigquery.SchemaField("Published_Airline", "STRING"),
bigquery.SchemaField("Published_Airline_IATA_Code", "STRING"),
bigquery.SchemaField("GEO_Summary", "STRING"),
bigquery.SchemaField("GEO_Region", "STRING"),
bigquery.SchemaField("Activity_Type_Code", "STRING"),
bigquery.SchemaField("Price_Category_Code", "STRING"),
bigquery.SchemaField("Terminal", "STRING"),
bigquery.SchemaField("Boarding_Area", "STRING"),
bigquery.SchemaField("Passenger_Count", "INT64"),
bigquery.SchemaField("data_as_of", "STRING"),
bigquery.SchemaField("data_loaded_at", "STRING")
bigquery.SchemaField(
])
= bigquery.ExternalConfig("GOOGLE_SHEETS")
external_config = "https://docs.google.com/spreadsheets/d/1vp7bCvxd3R2zciqxc8A4Qoq5DAvqhzfXJnljmG5OOw0/edit?usp=sharing"
sheet_url = [sheet_url]
external_config.source_uris
= external_config.google_sheets_options
options = 1
options.skip_leading_rows range = "air_traffic!A:O"
options.
= external_config
table.external_data_configuration client.create_table(table)
Table(TableReference(DatasetReference('fit-cloud-course', 'dsongcp'), 'air_traffic'))
Мітки даних у BigQuery
Мітка – це пара ключ-значення, яку можна призначити ресурсам Google Cloud BigQuery. Ви можете прикріпити мітку до кожного ресурсу, а потім відфільтрувати ресурси на основі їхніх міток.
Ось кілька типових випадків використання міток:
- Мітки команди:
team:research
абоteam:analytics
. - Мітки компонентів: наприклад,
component:redis
,component:frontend
,component:ingestта
,component:dashboard
тощо. - Мітки середовища:
environment:production
іenvironment:test
. - Мітки стану:
state:active
,state:readytodelete
іstate:archive
. - Мітки власності: використовуються для ідентифікації команд, які відповідають за операції, наприклад
team:shopping-cart
.
Створення таблиці
# Створення посилання на таблицю
= bigquery.DatasetReference(client.project, 'dsongcp')
dataset_ref
# Створення таблиці
= bigquery.TableReference(dataset_ref, 'flights_auto') table_ref
Створення міток
# Створення міток
= {
labels 'type': 'auto',
'category': 'transport'
}
# Створення таблиці з мітками
= client.get_table(table_ref)
flights_auto_table # Додавання міток
= labels
flights_auto_table.labels # Оновлення таблиці
'labels']) client.update_table(flights_auto_table, [
Table(TableReference(DatasetReference('fit-cloud-course', 'dsongcp'), 'flights_auto'))
Оновлення міток
# Оновлення міток
= {
new_labels 'type': 'auto',
'category': 'transport',
'year': '2015'
}
= client.get_table(table_ref)
flights_auto_table = new_labels
flights_auto_table.labels 'labels']) client.update_table(flights_auto_table, [
Table(TableReference(DatasetReference('fit-cloud-course', 'dsongcp'), 'flights_auto'))
Видалення міток
= client.get_table(table_ref)
table = table.labels
labels = {k: None for k, v in labels.items()}
labels = labels
table.labels 'labels']) client.update_table(table, [
Table(TableReference(DatasetReference('fit-cloud-course', 'dsongcp'), 'flights_auto'))
Додавання/видалення змінних
# Створення таблиці
= bigquery.DatasetReference(client.project, 'dsongcp')
dataset_ref = bigquery.TableReference(dataset_ref, 'flights_auto')
table_ref = client.get_table(table_ref) bigquery_table
Отримання схеми таблиці:
= bigquery_table.schema
schema 5] schema[:
[SchemaField('Year', 'INTEGER', 'NULLABLE', None, None, (), None),
SchemaField('Quarter', 'INTEGER', 'NULLABLE', None, None, (), None),
SchemaField('Month', 'INTEGER', 'NULLABLE', None, None, (), None),
SchemaField('DayofMonth', 'INTEGER', 'NULLABLE', None, None, (), None),
SchemaField('DayOfWeek', 'INTEGER', 'NULLABLE', None, None, (), None)]
Створимо копію схеми:
= schema.copy() new_schema
Додамо нову змінну у схему:
'Distance_km', 'FLOAT64', mode='NULLABLE'))
new_schema.append(bigquery.SchemaField(# Передамо оновлену схему таблиці bigquery_table
= new_schema bigquery_table.schema
Типи даних в BigQuery: https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types.
Тепер зробимо запит на оновлення таблиці:
'schema']) client.update_table(bigquery_table, [
Table(TableReference(DatasetReference('fit-cloud-course', 'dsongcp'), 'flights_auto'))
Переглянемо оновлену схему:
= client.get_table(table_ref)
bigquery_table = bigquery_table.schema
schema -5:] schema[
[SchemaField('Div5LongestGTime', 'STRING', 'NULLABLE', None, None, (), None),
SchemaField('Div5WheelsOff', 'STRING', 'NULLABLE', None, None, (), None),
SchemaField('Div5TailNum', 'STRING', 'NULLABLE', None, None, (), None),
SchemaField('string_field_109', 'STRING', 'NULLABLE', None, None, (), None),
SchemaField('Distance_km', 'FLOAT', 'NULLABLE', None, None, (), None)]
Видалення змінної з схеми:
= client.query("""
query_job ALTER TABLE dsongcp.flights_auto
DROP COLUMN IF EXISTS Distance_km;
""")
Іноді запит може займати деякий час на виконання. Щоб перевірити статус запиту, використовуйте метод query_job.state
:
while query_job.state != 'DONE':
print('Запит виконується...')
3)
time.sleep(reload()
query_job.print(query_job.result())
Запит виконується...
<google.cloud.bigquery.table._EmptyRowIterator object at 0x000001B762ADC110>
Збереження результатів запиту в таблицю
Напишемо запит, який вибере середню затримку вильоту та прибуття для кожного аеропорту відправлення:
= """
sql_query SELECT
ORIGIN,
AVG(DepDelay) AS dep_delay,
AVG(ArrDelay) AS arr_delay,
COUNT(ArrDelay) AS num_flights
FROM
`dsongcp.flights_auto`
GROUP BY
ORIGIN
ORDER BY num_flights DESC
LIMIT 10
"""
Створимо окремий датасет, куди ми збережемо результати запиту:
= client.project + '.dsongcp_results'
dataset_id = bigquery.Dataset(dataset_id)
dataset = "US"
dataset.location = client.create_dataset(dataset, exists_ok=True)
dataset print(f"Створено датасет {client.project}.{dataset.dataset_id}")
Створено датасет fit-cloud-course.dsongcp_results
Визначимо таблицю, в яку ми збережемо результати запиту:
= client.project
project_id = 'dsongcp_results'
dataset_id = 'flights_delay' table_id
Використаємо метод from_string()
для створення посилання на таблицю:
= bigquery.TableReference.from_string(f"{project_id}.{dataset_id}.{table_id}")
table_ref table_ref
TableReference(DatasetReference('fit-cloud-course', 'dsongcp_results'), 'flights_delay')
Виконаємо запит та збережемо результати в таблицю:
= bigquery.QueryJobConfig(destination=table_ref, write_disposition="WRITE_TRUNCATE")
job_config = client.query(sql_query, job_config=job_config)
query_job while query_job.state != 'DONE':
print('Запит виконується...')
3)
time.sleep(reload()
query_job.print(query_job.result())
Запит виконується...
<google.cloud.bigquery.table.RowIterator object at 0x000001B70D15DF10>
Більше про QueryJobConfig
за посиланням https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.job.QueryJobConfig
Запит у DataFrame
Для зручності роботи з даними можна використовувати бібліотеку pandas
. Для цього використаємо метод to_dataframe()
:
= client.query(sql_query)
query_job = query_job.to_dataframe()
df df
ORIGIN | dep_delay | arr_delay | num_flights | |
---|---|---|---|---|
0 | ATL | 7.265885 | 1.080248 | 29197 |
1 | DFW | 11.761812 | 9.371627 | 22571 |
2 | ORD | 19.962051 | 17.016132 | 22316 |
3 | LAX | 7.476341 | 5.542058 | 17048 |
4 | DEN | 15.506798 | 11.842325 | 16775 |
5 | IAH | 9.073786 | 5.353499 | 13191 |
6 | PHX | 8.066723 | 6.197787 | 13014 |
7 | SFO | 10.328127 | 9.038425 | 12570 |
8 | LAS | 8.566097 | 5.054353 | 11499 |
9 | MCO | 9.887441 | 5.820513 | 9867 |
Параметризовані запити
Параметризовані запити дозволяють використовувати змінні в запитах. Це дозволяє виконувати один і той же запит з різними значеннями змінних.
Для цього використовується символ @
перед змінною в запиті. Визначимо параметр month
та виконаємо запит:
= [
query_params "month", "INT64", 1)
bigquery.ScalarQueryParameter( ]
= """
sql_query SELECT
ORIGIN,
AVG(DepDelay) AS dep_delay,
AVG(ArrDelay) AS arr_delay,
COUNT(ArrDelay) AS num_flights
FROM
`dsongcp.flights_auto`
WHERE
Month = @month
GROUP BY
ORIGIN
ORDER BY num_flights DESC
LIMIT 10
"""
= bigquery.QueryJobConfig(query_parameters=query_params)
job_config = client.query(sql_query, job_config=job_config)
query_job = query_job.to_dataframe()
df df
ORIGIN | dep_delay | arr_delay | num_flights | |
---|---|---|---|---|
0 | ATL | 7.265885 | 1.080248 | 29197 |
1 | DFW | 11.761812 | 9.371627 | 22571 |
2 | ORD | 19.962051 | 17.016132 | 22316 |
3 | LAX | 7.476341 | 5.542058 | 17048 |
4 | DEN | 15.506798 | 11.842325 | 16775 |
5 | IAH | 9.073786 | 5.353499 | 13191 |
6 | PHX | 8.066723 | 6.197787 | 13014 |
7 | SFO | 10.328127 | 9.038425 | 12570 |
8 | LAS | 8.566097 | 5.054353 | 11499 |
9 | MCO | 9.887441 | 5.820513 | 9867 |
У випадку коли потрібно використати зріз по даті:
= [
query_params "start_date", "DATE", datetime.date(2015, 1, 1)),
bigquery.ScalarQueryParameter("end_date", "DATE", datetime.date(2015, 1, 7))
bigquery.ScalarQueryParameter( ]
Тоді запит буде виглядати наступним чином:
= """
sql_query SELECT
ORIGIN,
AVG(DepDelay) AS dep_delay,
AVG(ArrDelay) AS arr_delay,
COUNT(ArrDelay) AS num_flights
FROM
`dsongcp.flights_auto`
WHERE
FlightDate BETWEEN @start_date AND @end_date
GROUP BY
ORIGIN
ORDER BY num_flights DESC
LIMIT 10
"""
= bigquery.QueryJobConfig(query_parameters=query_params)
job_config = client.query(sql_query, job_config=job_config)
query_job = query_job.to_dataframe()
df df
ORIGIN | dep_delay | arr_delay | num_flights | |
---|---|---|---|---|
0 | ATL | 13.223205 | 6.869232 | 6653 |
1 | DFW | 28.783017 | 27.239105 | 5094 |
2 | ORD | 38.539200 | 38.549260 | 5065 |
3 | DEN | 34.550181 | 33.524051 | 4137 |
4 | LAX | 14.595703 | 15.126316 | 4085 |
5 | IAH | 19.919259 | 14.409795 | 3165 |
6 | PHX | 14.584003 | 14.319293 | 3110 |
7 | SFO | 12.263669 | 10.925950 | 3079 |
8 | LAS | 15.962792 | 13.408916 | 2602 |
9 | MCO | 21.239204 | 17.322841 | 2351 |
І третій варіант - позиційні параметри:
= [
query_params None, "FLOAT64", 0),
bigquery.ScalarQueryParameter(None, "FLOAT64", 0)
bigquery.ScalarQueryParameter(
]
= """
sql_query SELECT
ORIGIN,
AVG(DepDelay) AS dep_delay,
AVG(ArrDelay) AS arr_delay,
COUNT(ArrDelay) AS num_flights
FROM
`dsongcp.flights_auto`
WHERE
DepDelay < ? AND ArrDelay < ?
GROUP BY
ORIGIN
ORDER BY num_flights DESC
LIMIT 10
"""
= bigquery.QueryJobConfig(query_parameters=query_params)
job_config = client.query(sql_query, job_config=job_config)
query_job = query_job.to_dataframe()
df df
ORIGIN | dep_delay | arr_delay | num_flights | |
---|---|---|---|---|
0 | ATL | -3.487487 | -11.962145 | 14265 |
1 | DFW | -4.595339 | -10.509032 | 8968 |
2 | ORD | -4.753784 | -13.876310 | 7729 |
3 | LAX | -5.053177 | -11.830387 | 7240 |
4 | DEN | -4.942687 | -13.282285 | 5374 |
5 | PHX | -4.864400 | -11.197356 | 5295 |
6 | SFO | -5.341159 | -10.963271 | 5282 |
7 | IAH | -4.987881 | -11.642113 | 5281 |
8 | LAS | -5.255193 | -11.990661 | 5247 |
9 | SLC | -5.100784 | -13.151624 | 4465 |
Ще один варіант - використанням масивів:
= [
query_params "ORIGIN", "STRING", ["PHX", "ATL", "LAS"]),
bigquery.ArrayQueryParameter("DepDelay", "FLOAT64", 10),
bigquery.ScalarQueryParameter( ]
= """
sql_query SELECT
ORIGIN,
AVG(DepDelay) AS dep_delay,
AVG(ArrDelay) AS arr_delay,
COUNT(ArrDelay) AS num_flights
FROM
`dsongcp.flights_auto`
WHERE
ORIGIN IN UNNEST(@ORIGIN)
AND DepDelay > @DepDelay
GROUP BY
ORIGIN
ORDER BY num_flights DESC
"""
= bigquery.QueryJobConfig(query_parameters=query_params)
job_config = client.query(sql_query, job_config=job_config)
query_job = query_job.to_dataframe()
df df
ORIGIN | dep_delay | arr_delay | num_flights | |
---|---|---|---|---|
0 | ATL | 44.575120 | 38.036858 | 5589 |
1 | PHX | 44.808705 | 42.578725 | 2839 |
2 | LAS | 46.203759 | 42.609581 | 2651 |
Таблиця BG -> GC Storage
= bigquery.Client('fit-cloud-course')
client = 'dsongcp'
dataset_id = 'fit-cloud-course-dsongcp' bucket_name
= bigquery.TableReference.from_string(f"{client.project}.{dataset_id}.flights_auto") table_ref
= bigquery.job.ExtractJobConfig()
job_config # job_config.destination_format = bigquery.DestinationFormat.CSV
= bigquery.DestinationFormat.NEWLINE_DELIMITED_JSON job_config.destination_format
= f"gs://{bucket_name}/results/flights_delay.json"
destination_uri
= client.extract_table(
extract_job
table_ref,
destination_uri,=job_config,
job_config="US"
location
) extract_job.result()
ExtractJob<project=fit-cloud-course, location=US, id=a90b3262-15f8-48a1-a259-39aa0e8137cb>
Робота з датасетами
Підключіть BigQuery Data Transfer API.
Створимо клас DatasetManager
для роботи з датасетами:
class DatasetManager:
def __init__(self, client):
self.client = client
def delete_dataset(self, dataset_id):
self.client.delete_dataset(dataset_id, not_found_ok=True, delete_contents=True)
def create_dataset(self, dataset_id, location='US'):
= bigquery.Dataset(dataset_id)
dataset = location
dataset.location = self.client.create_dataset(dataset, timeout=30)
dataset_ print(f"Датасет {self.client.project}.{dataset.dataset_id} створено")
return dataset_
def list_dataset(self):
= self.client.list_datasets()
datasets return [dataset.dataset_id for dataset in datasets]
def copy_dataset(self, source_project_id, source_dataset_id, destination_project_id, destination_dataset_id, display_name):
"""
Посилання:
https://cloud.google.com/bigquery-transfer/docs/reference/datatransfer/rest/v1/projects.transferConfigs
Надати доступ до IAM:
https://cloud.google.com/bigquery/docs/enable-transfer-service#grant_bigqueryadmin_access
Зауважте:
- це також призведе до переміщення таблиць.
- підключіть BigQuery Data Transfer API.
"""
= bigquery_datatransfer.DataTransferServiceClient()
transfer_client = bigquery_datatransfer.TransferConfig(
transfer_config =destination_dataset_id,
destination_dataset_id=display_name,
display_name="cross_region_copy", #
data_source_id={
params"source_project_id": source_project_id,
"source_dataset_id": source_dataset_id,
},
)= transfer_client.create_transfer_config(
transfer_config =transfer_client.common_project_path(destination_project_id),
parent=transfer_config,
transfer_config
)print(f"Створено конфігурацію передачі: {transfer_config.name}")
Ініціалізуємо клієнт та клас:
= bigquery.Client('fit-cloud-course')
client = DatasetManager(client) dataset_manager
Виведемо список датасетів:
dataset_manager.list_dataset()
['dsongcp', 'dsongcp_results', 'food_delivery']
Створимо новий датасет:
= f'{client.project}.ny_taxi_trips'
dataset_name = dataset_manager.create_dataset(dataset_name) dataset
Датасет fit-cloud-course.ny_taxi_trips створено
Скопіюємо датасет:
= 'bigquery-public-data'
source_project_id = 'new_york_taxi_trips'
source_dataset_id = client.project
destination_project_id = 'ny_taxi_trips'
destination_dataset_id = 'NY Taxi Trips'
display_name
dataset_manager.copy_dataset(source_project_id, source_dataset_id, destination_project_id, destination_dataset_id, display_name)
Створено конфігурацію передачі: projects/326133795346/locations/us/transferConfigs/65fb972b-0000-23f7-bdb8-089e0826d31c
Видалимо датасет:
dataset_manager.delete_dataset(dataset_name)