Основи великих даних з PySpark

Моделі та методи обробки великих даних

Ігор Мірошниченко

КНУ імені Тараса Шевченка, ФІТ

Вступ до аналізу великих даних за допомогою Spark

Що таке великі дані?

Великі дані — набори інформації (як структурованої, так і неструктурованої) настільки великих розмірів, що традиційні способи та підходи (здебільшого засновані на рішеннях класу бізнесової аналітики та системах управління базами даних) не можуть бути застосовані до них - Wikipedia

Три «V» великих даних

  • Обсяг (Volume) — обсяг даних, які потрібно зберігати та обробляти. Це може бути величезна кількість даних, що перевищує можливості традиційних систем зберігання.
  • Швидкість (Velocity) — швидкість, з якою дані генеруються та обробляються. Це може бути потік даних у реальному часі або дані, які надходять з різних джерел.
  • Різноманітність (Variety) — різноманітність типів даних, які потрібно обробляти. Це можуть бути структуровані, напівструктуровані та неструктуровані дані з різних джерел.

Концепції та термінологія великих даних

  • Кластерні обчислення: Об’єднання ресурсів декількох машин
  • Паралельні обчислення: Одночасні обчислення на одному комп’ютері
  • Розподілені обчислення: Сукупність вузлів (мережевих комп’ютерів), які працюють паралельно
  • Пакетна обробка: Розбиття завдання на невеликі частини і запуск їх на окремих машинах
  • Обробка в реальному часі: Негайна обробка даних

Системи обробки великих даних

  • Hadoop/MapReduce: Масштабований та відмовостійкий фреймворк на Java
    • Відкритий вихідний код
    • Пакетна обробка
  • Apache Spark: Універсальна та швидка кластерна обчислювальна система
    • Відкритий вихідний код
    • Пакетна обробка даних та обробка даних у реальному часі

Примітка

Apache Spark на сьогоднішній день є кращим за Hadoop/MapReduce у більшості випадків, оскільки він швидший, простіший у використанні та підтримує різноманітні джерела даних.

Особливості фреймворку Apache Spark

  • Фреймворк розподілених кластерних обчислень
  • Ефективні обчислення в пам’яті для великих масивів даних
  • Блискавична платформа для обробки даних
  • Забезпечує підтримку Java, Scala, Python, R та SQL

Компоненти Apache Spark

SparkArchitecture Core Apache Spark Core RDD API SQL Spark SQL Core->SQL ML MLlib Machine Learning Core->ML GraphX GraphX Core->GraphX Streaming Spark Streaming Core->Streaming

Режими розгортання Spark

  • Локальний режим: Одна машина, наприклад, ваш ноутбук
    • Локальна модель зручна для тестування, налагодження та демонстрації
  • Кластерний режим: Набір заздалегідь визначених машин
    • Добре підходить для реальних сценаріїв, коли потрібно обробляти великі обсяги даних
  • Робочий процес: Локально \(\rightarrow\) кластери
  • Не потрібно змінювати код

PySpark: Spark з Python

Огляд PySpark

  • Apache Spark написано на Scala
  • Для підтримки Python зі Spark, спільнота Apache Spark випустила PySpark
  • Подібна швидкість та потужність обчислень до Scala
  • API PySpark схожі на Pandas та Scikit-learn

Що таке оболонка Spark?

  • Інтерактивне середовище для виконання завдань Spark
  • Допомагає для швидкого інтерактивного прототипування
  • Оболонки Spark дозволяють взаємодіяти з даними на диску або в пам’яті
  • Три різні оболонки Spark:
    • Spark-оболонка для Scala
    • PySpark-оболонка для Python
    • SparkR для R

PySpark shell

  • Оболонка PySpark — це інструмент командного рядка на основі Python
  • Оболонка PySpark дозволяє аналітикам даних взаємодіяти зі структурами даних Spark
  • PySpark shell підтримує підключення до кластера

Розуміння SparkContext

  • SparkContext — це точка входу у світ Spark
  • Точка входу — це спосіб підключення до кластера Spark
  • Точка входу — це як ключ до будинку
  • PySpark має SparkContext за замовчуванням, який називається sc

Перевірка SparkContext

  • Версія: Щоб отримати версію SparkContext
Terminal
pyspark --version
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.5.5
      /_/

Using Python version 3.11.4 (tags/v3.11.4:d2340ef, Jun  7 2023 05:45:37)
Spark context Web UI available at http://AranaurPC:4040
Spark context available as 'sc' (master = local[*], app id = local-1745489304985).
SparkSession available as 'spark'.
from pyspark import SparkContext

sc = SparkContext(master = 'local[*]')
print(sc.version)
3.5.5
  • Версія Python:
print(sc.pythonVer)
3.11
  • Майстер: URL кластера або локального рядка для запуску у локальному режимі SparkContext
print(sc.master)
local[*]

Завантаження даних у PySpark

  • Метод SparkContext parallelize()
rdd = sc.parallelize([1,2,3,4,5])
  • Метод textFile() SparkContext
rdd = sc.textFile("data/data.txt")

Функціональне програмування у Python

Що таке анонімні функції в Python?

  • Лямбда-функції — анонімні функції в Python
  • Дуже потужна функція, яка використовується у Python. Досить ефективні з map() та filter()
  • Лямбда-функції створюють функції для подальшого виклику подібно до def
  • Повертає функції без імені (тобто анонімні)

Синтаксис лямбда-функції

  • Загальний вигляд лямбда-функції має вигляд
lambda arguments: expression
  • Приклад лямбда-функції
double = lambda x: x * 2
print(double(3))
6

Різниця між функціями def та lambda

  • Код на Python для ілюстрації куба числа
def cube(x):
  return x ** 3

g = lambda x: x ** 3

print(g(10))
print(cube(10))
1000
1000
  • Немає оператора повернення для лямбда-функції
  • Можна розміщувати лямбда-функцію будь-де

map()

  • map() застосовує функцію до всіх елементів у вхідному списку
  • Загальний синтаксис map()
map(function, list)
  • Приклад map()
items = [1, 2, 3, 4]
new_items = list(map(lambda x: x + 2 , items))
print(new_items)
[3, 4, 5, 6]

filter()

  • функція filter() отримує функцію та список і повертає новий список, для якого функція повертає значення true
  • Загальний синтаксис filter()
filter(function, list)
  • Приклад filter()
items = [1, 2, 3, 4]
new_items = list(filter(lambda x: (x%2 != 0), items))

print(new_items)
[1, 3]

Програмування в PySpark RDD

Що таке RDD?

  • RDD (Resilient Distributed Datasets) — це основна структура даних у Spark

RDDDistribution cluster_0 Кластер DataFile Файл даних на диску Driver Драйвер Spark створює RDD й розподіляє між вузлами DataFile->Driver Node1 Розділ вузла RDD 4 Driver->Node1 Node2 Розділ вузла RDD 3 Driver->Node2 Node3 Розділ вузла RDD 2 Driver->Node3 Node4 Розділ вузла RDD 1 Driver->Node4

Декомпозиція RDD

  • Стійкі розподілені набори даних
    • Відмовостійкість: Здатність протистояти збоям
    • Розподілені: Розподілені між декількома машинами
    • Набори даних: Колекція розділених даних, наприклад, масивів, таблиць, кортежів тощо.

Створення RDD. Як це зробити?

  • Розпаралелювання існуючої колекції об’єктів
  • Зовнішні набори даних:
    • Файли в HDFS
    • Об’єкти в Amazon S3 bucket
    • рядки в текстовому файлі
  • З існуючих RDD

Розпаралелений збір (розпаралелювання)

  • parallelize() для створення RDD зі списків на Python
numRDD = sc.parallelize([1,2,3,4])

helloRDD = sc.parallelize("Hello world")

type(helloRDD)
pyspark.rdd.RDD

Із зовнішніх наборів даних

  • textFile() для створення RDD із зовнішніх наборів даних
fileRDD = sc.textFile("data/data.txt")

type(fileRDD)
pyspark.rdd.RDD

Розуміння партицій у PySpark

  • Партиція (Partition) — це логічний поділ великого розподіленого набору даних

  • метод parallelize()

numRDD = sc.parallelize(range(10), numSlices = 6)
  • метод textFile()
fileRDD = sc.textFile("README.md", numSlices = 6)

Кількість партицій в RDD можна дізнатися за допомогою методу getNumPartitions()

RDD operations in PySpark

Огляд PySpark operations

SparkOperationsEmoji Transformations 🐛 ➡️ 🐛 ➡️ 🐛 ➡️ 🦋 Transformations Operations Spark 🧠 Operations Transformations->Operations Actions 🖨️ Actions Actions->Operations

  • Перетворення створюють нові RDD
  • Дії виконують обчислення на RDD

RDD Transformations

  • Трансформації виконують ліниві обчислення
  • Базові перетворення RDD
    • map()
    • filter()
    • flatMap()
    • union()

RDDPipeline Storage Сховище RDD1 RDD1 Storage->RDD1 Rdd створюється шляхом читання даних зі сховища RDD2 RDD2 RDD1->RDD2 transformation RDD3 RDD3 RDD2->RDD3 transformation Result Результат RDD3->Result action

map()

  • перетворення map() застосовує функцію до всіх елементів у RDD

MapTransformation A1 1 MAP map x * x A1->MAP A2 2 A2->MAP A3 3 A3->MAP A4 4 A4->MAP B1 1 MAP->B1 B2 4 MAP->B2 B3 9 MAP->B3 B4 16 MAP->B4

RDD = sc.parallelize([1,2,3,4])
RDD_map = RDD.map(lambda x: x * x)

filter()

  • filter() повертає новий RDD з елементами, які відповідають умові

FilterTransformation A1 1 FILTER filter x : x > 2 A1->FILTER A2 2 A2->FILTER A3 3 A3->FILTER A4 4 A4->FILTER B3 3 FILTER->B3 B4 4 FILTER->B4

RDD = sc.parallelize([1,2,3,4])
RDD_filter = RDD.filter(lambda x: x > 2)

flatMap()

  • flatMap() перетворює RDD у новий RDD, де кожен елемент може бути розгорнутий у кілька елементів

FlatMapTransformation A1 ['Hello world', 'How are you?'] FLATMAP flatMap x : x.split(' ') A1->FLATMAP B1 ['Hello', 'world', 'How', 'are', 'you?'] FLATMAP->B1

RDD = sc.parallelize(["hello world", "how are you"])
RDD_flatmap = RDD.flatMap(lambda x: x.split(" "))

union()

  • union() об’єднує два RDD в один

RDD_Pipeline inputRDD inputRDD filter1 Filter inputRDD->filter1 filter2 Filter inputRDD->filter2 errorsRDD errorsRDD union Union errorsRDD->union warningsRDD warningsRDD warningsRDD->union badlinesRDD badlinesRDD filter1->errorsRDD filter2->warningsRDD union->badlinesRDD

inputRDD = sc.textFile("logs.txt")
errorRDD = inputRDD.filter(lambda x: "error" in x.split())
warningsRDD = inputRDD.filter(lambda x: "warnings" in x.split())
combinedRDD = errorRDD.union(warningsRDD)

RDD Actions

Це операції, які повертають значення після виконання обчислень на RDD

  • Базові дії з RDD
    • collect()
    • take(N)
    • first()
    • count()

collect() та take()

  • collect() повертає всі елементи набору даних у вигляді масиву
  • take(N) повертає масив перших N елементів набору даних
RDD_map.collect()
[1, 4, 9, 16]
RDD_map.take(2)
[1, 4]

first() та count()

  • first() виводить перший елемент RDD
RDD_map.first()
1
  • count() повертає кількість елементів у RDD
RDD_flatmap.count()
5

Робота з Pair RDD в PySpark

Вступ до Pair RDD у PySpark

  • Реальні набори даних зазвичай є парами ключ/значення
  • Кожен рядок є ключем і співвідноситься з одним або декількома значеннями
  • Pair RDD — це спеціальна структура даних для роботи з такими наборами даних
  • Pair RDD: ключ — це ідентифікатор, а значення — це дані

Створення Pair RDD

  • Два найпоширеніші способи створення парних RDD
    • Зі списку кортежу ключ-значення
    • Зі звичайного RDD
  • Отримання даних у формі ключ/значення для парного RDD
my_tuple = [('Sam', 23), ('Mary', 34), ('Peter', 25)]
pairRDD_tuple = sc.parallelize(my_tuple)


my_list = ['Sam 23', 'Mary 34', 'Peter 25']
regularRDD = sc.parallelize(my_list)
pairRDD_RDD = regularRDD.map(lambda s: (s.split(' ')[0], s.split(' ')[1]))

Перетворення Pair RDD

  • Всі регулярні перетворення працюють на парі RDD
  • Потрібно передавати функції, які оперують парами значень ключів, а не окремими елементами
  • Приклади парних перетворень RDD
    • reduceByKey(func): Об’єднує значення з однаковим ключем
    • groupByKey(): Згрупувати значення з однаковим ключем
    • sortByKey(): Повернути RDD, відсортований за ключем
    • join(): Об’єднати дві пари RDD на основі їх ключа

reduceByKey(func)

  • reduceByKey(func) об’єднує значення з однаковим ключем
  • Він виконує паралельні операції для кожного ключа в наборі даних
  • Це перетворення, а не дія
regularRDD = sc.parallelize([("Messi", 23), ("Ronaldo", 34),
                              ("Neymar", 22), ("Messi", 24)])

pairRDD_reducebykey = regularRDD.reduceByKey(lambda x,y : x + y)
pairRDD_reducebykey.collect()
[('Neymar', 22), ('Messi', 47), ('Ronaldo', 34)]

sortByKey()

  • sortByKey() повертає RDD, відсортований за ключем
pairRDD_reducebykey_rev = pairRDD_reducebykey.map(lambda x: (x[1], x[0]))

pairRDD_reducebykey_rev.sortByKey(ascending=False).collect()
[(47, 'Messi'), (34, 'Ronaldo'), (22, 'Neymar')]

groupByKey()

  • groupByKey() згруповує значення з однаковим ключем
airports = [("US", "JFK"),("UK", "LHR"),("FR", "CDG"),("US", "SFO")]

regularRDD = sc.parallelize(airports)
pairRDD_group = regularRDD.groupByKey().collect()

for cont, air in pairRDD_group:
  print(cont, list(air))
US ['JFK', 'SFO']
UK ['LHR']
FR ['CDG']

join()

  • join() об’єднує дві пари RDD на основі їх ключа
RDD1 = sc.parallelize([("Messi", 34),("Ronaldo", 32),("Neymar", 24)])
RDD2 = sc.parallelize([("Ronaldo", 80),("Neymar", 120),("Messi", 100)])

RDD1.join(RDD2).collect()
[('Ronaldo', (32, 80)), ('Neymar', (24, 120)), ('Messi', (34, 100))]

Додаткові дії (actions)

reduce()

  • reduce() об’єднує всі елементи RDD в один
  • Функція повинна бути комутативною (зміна порядку операндів не змінює результат) та асоціативною
x = [1,3,4,6]
RDD = sc.parallelize(x)
RDD.reduce(lambda x, y : x + y)
14

saveAsTextFile()

  • saveAsTextFile() зберігає RDD у текстовому файлі
RDD.saveAsTextFile("tempFile")
  • coalesce() можна використовувати для збереження RDD у вигляді одного текстового файлу
RDD.coalesce(1).saveAsTextFile("tempFile")

Action операції над парами RDD

  • Action RDD, доступні для парних RDD у PySpark
  • Парні дії RDD використовують дані ключ-значення
  • Кілька прикладів дій парного RDD включають
    • countByKey()
    • collectAsMap()

countByKey()

  • countByKey() доступний тільки для типу Pair RDD
  • countByKey() підраховує кількість елементів для кожного ключа
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])

for kee, val in rdd.countByKey().items():
  print(kee, val)
a 2
b 1

collectAsMap()

  • collectAsMap() повернути пари ключ-значення в RDD у вигляді словника
sc.parallelize([(1, 2), (3, 4)]).collectAsMap()
{1: 2, 3: 4}

PySpark SQL та DataFrames

PySpark DataFrames

  • PySpark SQL — це бібліотека Spark для структурованих даних. Вона надає більше інформації про структуру даних та обчислення
  • PySpark DataFrame — незмінна розподілена колекція даних з іменованими стовпцями
  • Призначений для обробки як структурованих (наприклад, реляційна база даних), так і напівструктурованих даних (наприклад, JSON)
  • API фреймів даних доступний у Python, R, Scala та Java
  • DataFrames в PySpark підтримують як SQL запити ( SELECT * from table ), так і вирази методи ( df.select() )

SparkSession — точка входу для API DataFrame

  • SparkContext є основною точкою входу для створення RDD
  • SparkSession забезпечує єдину точку входу для взаємодії з фреймами даних Spark
  • SparkSession використовується для створення DataFrame, реєстрації DataFrame, виконання SQL запитів
  • SparkSession доступний в оболонці PySpark як spark

Створення DataFrame

  • Два різні методи створення DataFrame в PySpark
    • З існуючих RDD за допомогою методу createDataFrame() SparkSession
    • З різних джерел даних (CSV, JSON, TXT) за допомогою методу read() SparkSession
  • Схема контролює дані і допомагає DataFrames оптимізувати запити
  • Схема надає інформацію про назву стовпця, тип даних у стовпці, порожні значення тощо.

Створення DataFrame

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("DataFrame").getOrCreate()

iphones_RDD = sc.parallelize([
  ("XS", 2018, 5.65, 2.79, 6.24),
  ("XR", 2018, 5.94, 2.98, 6.84),
  ("X10", 2017, 5.65, 2.79, 6.13),
  ("8Plus", 2017, 6.23, 3.07, 7.12)
])

names = ['Model', 'Year', 'Height', 'Width', 'Weight']

iphones_df = spark.createDataFrame(iphones_RDD, schema=names)
type(iphones_df)
pyspark.sql.dataframe.DataFrame

DataFrame з CSV/JSON/TXT

df_csv = spark.read.csv("people.csv", header=True, inferSchema=True)

df_json = spark.read.json("people.json")

df_txt = spark.read.txt("people.txt")
  • Шлях до файлу та два необов’язкові параметри
  • Два необов’язкові параметри
    • header=True
    • inferSchema=True

Взаємодія з DataFrame

Оператори DataFrame у PySpark

  • Операції з DataFrame: Перетворення та дії
  • Перетворення фрейму даних:
    • select()
    • filter()
    • groupby()
    • orderby()
    • dropDuplicates()
    • withColumnRenamed()
  • Дії з фреймом даних:
    • head()
    • show()
    • count()
    • columns
    • describe()

Примітка

printSchema() є методом для будь-якого набору даних/фрейму даних Spark

select() та show()

  • select() вибирає стовпці з DataFrame
  • show() виводить перші рядки DataFrame
from pyspark.sql import SparkSession
from datetime import date as sysdate

spark = SparkSession.builder.appName("DataFrame").getOrCreate()

people = spark.read.csv("data/people.csv", header=True, inferSchema=True)

people.select('name').show(5)
+--------------+
|          name|
+--------------+
|Penelope Lewis|
| David Anthony|
|     Ida Shipp|
|  Joanna Moore|
|Lisandra Ortiz|
+--------------+
only showing top 5 rows

filter()

  • filter() вибирає рядки з DataFrame, які відповідають умові
people.filter(people.sex == 'female').show(5)
+---+---------+-----------------+------+-------------------+
|_c0|person_id|             name|   sex|      date of birth|
+---+---------+-----------------+------+-------------------+
|  0|      100|   Penelope Lewis|female|1990-08-31 00:00:00|
|  2|      102|        Ida Shipp|female|1962-05-24 00:00:00|
|  3|      103|     Joanna Moore|female|2017-03-10 00:00:00|
|  4|      104|   Lisandra Ortiz|female|2020-08-05 00:00:00|
| 11|      111|Annabelle Rosseau|female|1989-07-13 00:00:00|
+---+---------+-----------------+------+-------------------+
only showing top 5 rows

groupby() та count()

people.groupby('sex').count().show()
+------+-----+
|   sex|count|
+------+-----+
|  NULL| 1920|
|female|49014|
|  male|49066|
+------+-----+

orderby()

people.orderBy('date of birth').show(5)
+-----+---------+---------------+------+-------------------+
|  _c0|person_id|           name|   sex|      date of birth|
+-----+---------+---------------+------+-------------------+
|57359|    57459|   Sharon Perez|female|1899-08-28 00:00:00|
|62233|    62333|Martina Morison|female|1901-04-21 00:00:00|
|96318|    96418|   Lisa Garrett|female|1901-05-09 00:00:00|
|39703|    39803|    Naomi Davis|female|1902-04-25 00:00:00|
|64563|    64663|  Brenda French|female|1902-07-27 00:00:00|
+-----+---------+---------------+------+-------------------+
only showing top 5 rows

dropDuplicates()

print(people.select('name').count())

print(people.select('name').dropDuplicates().count())
100000
92712

withColumnRenamed()

  • withColumnRenamed() перейменовує стовпець у DataFrame
people = people.withColumnRenamed("date of birth", "dob")
people.show(5)
+---+---------+--------------+------+-------------------+
|_c0|person_id|          name|   sex|                dob|
+---+---------+--------------+------+-------------------+
|  0|      100|Penelope Lewis|female|1990-08-31 00:00:00|
|  1|      101| David Anthony|  male|1971-10-14 00:00:00|
|  2|      102|     Ida Shipp|female|1962-05-24 00:00:00|
|  3|      103|  Joanna Moore|female|2017-03-10 00:00:00|
|  4|      104|Lisandra Ortiz|female|2020-08-05 00:00:00|
+---+---------+--------------+------+-------------------+
only showing top 5 rows

printSchema()

people.printSchema()
root
 |-- _c0: integer (nullable = true)
 |-- person_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- dob: timestamp (nullable = true)

columns

  • columns повертає список стовпців у DataFrame
people.columns
['_c0', 'person_id', 'name', 'sex', 'dob']

describe()

  • describe() повертає статистику для числових стовпців у DataFrame
people.describe().show()
+-------+-----------------+-----------------+-------------+------+
|summary|              _c0|        person_id|         name|   sex|
+-------+-----------------+-----------------+-------------+------+
|  count|           100000|           100000|       100000| 98080|
|   mean|          49999.5|          50099.5|         NULL|  NULL|
| stddev|28867.65779668774|28867.65779668774|         NULL|  NULL|
|    min|                0|              100|Aaron Addesso|female|
|    max|            99999|           100099|  Zulma Biggs|  male|
+-------+-----------------+-----------------+-------------+------+

PySpark SQL

DataFrame API vs SQL запити

  • У PySpark ви можете взаємодіяти з SparkSQL через DataFrame API та SQL запити
  • DataFrame API надає програмну мову, специфічну для домену (DSL) для даних
  • Перетворення та дії DataFrame легше конструювати програмно
  • SQL-запити можуть бути стислими, простішими для розуміння та перенесення
  • Операції над DataFrame також можна виконувати за допомогою SQL запитів

SQL запити

  • Метод SparkSession sql() виконує SQL-запит
  • Метод sql() отримує SQL-запит як аргумент і повертає результат у вигляді DataFrame
people.createOrReplaceTempView("people")
df_sql = spark.sql("SELECT * FROM people LIMIT 5")
df_sql.show()
+---+---------+--------------+------+-------------------+
|_c0|person_id|          name|   sex|                dob|
+---+---------+--------------+------+-------------------+
|  0|      100|Penelope Lewis|female|1990-08-31 00:00:00|
|  1|      101| David Anthony|  male|1971-10-14 00:00:00|
|  2|      102|     Ida Shipp|female|1962-05-24 00:00:00|
|  3|      103|  Joanna Moore|female|2017-03-10 00:00:00|
|  4|      104|Lisandra Ortiz|female|2020-08-05 00:00:00|
+---+---------+--------------+------+-------------------+

SQL-запит

query = '''SELECT name FROM people LIMIT 5'''
df_sql = spark.sql(query)
df_sql.show()
+--------------+
|          name|
+--------------+
|Penelope Lewis|
| David Anthony|
|     Ida Shipp|
|  Joanna Moore|
|Lisandra Ortiz|
+--------------+

Групування та агрегація

query = '''SELECT sex, COUNT(*) as count FROM people GROUP BY sex'''
df_sql = spark.sql(query)
df_sql.show()
+------+-----+
|   sex|count|
+------+-----+
|  NULL| 1920|
|female|49014|
|  male|49066|
+------+-----+

Фільтрація

query = '''SELECT * FROM people WHERE dob > '2000-01-01' '''
df_sql = spark.sql(query)
df_sql.show(5)
+---+---------+-----------------+------+-------------------+
|_c0|person_id|             name|   sex|                dob|
+---+---------+-----------------+------+-------------------+
|  3|      103|     Joanna Moore|female|2017-03-10 00:00:00|
|  4|      104|   Lisandra Ortiz|female|2020-08-05 00:00:00|
|  9|      109|   Everett Vadala|  male|2005-05-24 00:00:00|
| 10|      110| Freddie Claridge|  male|2002-05-07 00:00:00|
| 17|      117|Florence Eberhart|female|2024-06-01 00:00:00|
+---+---------+-----------------+------+-------------------+
only showing top 5 rows

Візуалізація даних у PySpark

Візуалізація даних у PySpark

Побудова графіків за допомогою фреймів даних PySpark виконується трьома методами

  • бібліотека pyspark_dist_explore
  • toPandas()
  • бібліотека HandySpark

pyspark_dist_explore

  • pyspark_dist_explore — це бібліотека для візуалізації даних у PySpark
  • Наразі доступні три функції: hist() , distplot() та pandas_histogram()
from pyspark_dist_explore import hist
import matplotlib.pyplot as plt

df = spark.read.csv("data/salaries.csv", header=True, inferSchema=True)

fig, ax = plt.subplots()
hist(ax, df.select('salary_in_usd'), bins = 20);

pyspark_dist_explore

toPandas()

df_pandas = df.toPandas()
df_pandas.hist(column='salary_in_usd', bins=20);

Примітка

Якщо ви маєте великі обсяги даних, використовувати toPandas() не рекомендується

Pandas vs PySpark

  • Pandas DataFrames — це односерверні структури в пам’яті, що базуються на одному сервері, а операції над PySpark виконуються паралельно
  • Результат генерується, коли ми застосовуємо будь-яку операцію в Pandas, в той час як операції в PySpark DataFrame є лінивими
  • Pandas DataFrame є змінюваним, а PySpark DataFrame є незмінним
  • Pandas API підтримує більше операцій, ніж PySpark Dataframe API

PySpark MLlib

Огляд PySpark MLlib

  • MLlib - це компонент Apache Spark для машинного навчання
  • До складу MLlib входять різноманітні інструменти:
    • Алгоритми ML: спільна фільтрація, класифікація та кластеризація
    • Функціоналізація: вилучення ознак, перетворення, зменшення розмірності та вибірка
    • Конвеєри: інструменти для побудови, оцінки та налаштування конвеєрів ML

Чому PySpark MLlib?

  • Scikit-learn - популярна бібліотека Python для інтелектуального аналізу даних та машинного навчання
  • Алгоритми Scikit-learn працюють тільки для невеликих наборів даних на одній машині
  • Алгоритми MLlib Spark призначені для паралельної обробки на кластері
  • Підтримує такі мови, як Scala, Java та R
  • Надає високорівневий API для побудови конвеєрів машинного навчання

Алгоритми PySpark MLlib

  • Класифікація (бінарна та багатокласова) та регресія: ЛLinear SVMs, logistic regression, decision trees, random forests, gradient-boosted trees, naive Bayes, linear least squares, Lasso, ridge regression, isotonic regression
  • Колаборативна фільтрація: Алгоритм змінних найменших квадратів (Alternating least squares, ALS)
  • Кластеризація: K-means, Gaussian mixture, Bisecting K-means and Streaming K-Means

Три «С» машинного навчання в PySpark MLlib

  • Collaborative filtering (рекомендаційні системи): Створення рекомендацій
  • Classification: Визначення, до якої з категорій належить нове спостереження
  • Clustering: Групування даних на основі схожих характеристик

PySpark MLlib imports

  • Collaborative filtering
from pyspark.mllib.recommendation import ALS
  • Classification
from pyspark.mllib.classification import LogisticRegressionWithLBFGS
  • Clustering
from pyspark.mllib.clustering import KMeans

Вступ до Collaborative filtering

Collaborative filtering

  • Колаборативна фільтрація — це пошук користувачів, які мають спільні інтереси
  • Колаборативна фільтрація зазвичай використовується для рекомендаційних систем
  • Підходи до спільної фільтрації:
    • User-User: Знаходить користувачів, схожих на цільового користувача
    • Item-Item: Знаходить і рекомендує елементи, схожі на елементи з цільовим користувачем

Клас Rating у pyspark.mllib.recommendations

  • Клас Rating — це обгортка навколо кортежу (користувач, товар і рейтинг)
  • Корисний для розбору RDD та створення кортежу з користувача, товару та рейтингу
from pyspark.mllib.recommendation import Rating

r = Rating(user = 1, product = 2, rating = 5.0)
print(r)
Rating(user=1, product=2, rating=5.0)

randomSplit()

  • Метод randomSplit() PySpark випадковим чином розбиває дані із заданими вагами і повертає декілька RDD
data = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
training, test=data.randomSplit([0.6, 0.4])
training.collect()
test.collect()
[1, 3, 5, 7]

Alternating Least Squares (ALS)

  • ALS у spark.mllib забезпечує колаборативну фільтрацію
  • ALS.train(ratings, rank, iterations)
r1 = Rating(1, 1, 1.0)
r2 = Rating(1, 2, 2.0)
r3 = Rating(2, 1, 2.0)
ratings = sc.parallelize([r1, r2, r3])
ratings.collect()
[Rating(user=1, product=1, rating=1.0),
 Rating(user=1, product=2, rating=2.0),
 Rating(user=2, product=1, rating=2.0)]
model = ALS.train(ratings, rank=10, iterations=10)

predictAll()

  • Метод predictAll() повертає список прогнозованих оцінок для вхідної пари користувач і товар
  • Метод отримує RDD без оцінок, щоб згенерувати рейтинги
unrated_RDD = sc.parallelize([(1, 2), (1, 1)])

predictions = model.predictAll(unrated_RDD)
predictions.collect()
[Rating(user=1, product=1, rating=1.0000327845217156),
 Rating(user=1, product=2, rating=1.9890339755906339)]

Оцінка моделі

rates = ratings.map(lambda x: ((x[0], x[1]), x[2]))
rates.collect()
[((1, 1), 1.0), ((1, 2), 2.0), ((2, 1), 2.0)]


preds = predictions.map(lambda x: ((x[0], x[1]), x[2]))
preds.collect()
[((1, 1), 1.0000327845217156), ((1, 2), 1.9890339755906339)]


rates_preds = rates.join(preds)
rates_preds.collect()
[((1, 2), (2.0, 1.9890339755906339)), ((1, 1), (1.0, 1.0000327845217156))]
MSE = rates_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
print("Mean Squared Error = " + str(MSE))
Mean Squared Error = 6.012738308583879e-05

Класифікація

Робота з векторами

  • PySpark MLlib містить спеціальні типи даних Vectors та LabelledPoint
  • Два типи векторів
    • Dense Vector: зберігає всі свої записи в масиві чисел з плаваючою комою
    • Sparse Vector: зберігає лише ненульові значення та їхні індекси
from pyspark.mllib.linalg import Vectors, SparseVector, DenseVector

denseVec = Vectors.dense([1.0, 2.0, 3.0])
sparseVec = Vectors.sparse(3, {0: 1.0, 2: 3.0})

print(denseVec)
print(sparseVec)
[1.0,2.0,3.0]
(3,[0,2],[1.0,3.0])

LabeledPoint

  • LabeledPoint — це обгортка для вхідних ознак і прогнозованого значення
  • Для бінарної класифікації логістичної регресії міткою є або 0 (від’ємне значення), або 1 (додатне значення)
from pyspark.mllib.regression import LabeledPoint

positive = LabeledPoint(1.0, [1.0, 0.0, 3.0])
negative = LabeledPoint(0.0, [2.0, 1.0, 1.0])
print(positive)
print(negative)
(1.0,[1.0,0.0,3.0])
(0.0,[2.0,1.0,1.0])

HashingTF

  • Алгоритм HashingTF() використовується для зіставлення значення ознаки з індексами у векторі ознак
from pyspark.mllib.feature import HashingTF

sentence = "hello hello world"
words = sentence.split()
tf = HashingTF(10000)
tf.transform(words)
SparseVector(10000, {1593: 2.0, 5690: 1.0})

Логістична регресія з використанням LogisticRegressionWithLBFGS

data = [
  LabeledPoint(0.0, [0.0, 1.0]),
  LabeledPoint(1.0, [1.0, 0.0]),
]

RDD = sc.parallelize(data)

model = LogisticRegressionWithLBFGS.train(RDD, iterations=10)
model.predict([0.0, 1.0])
model.predict([1.0, 0.0])
1

Кластеризація

Кластеризація

Бібліотека PySpark MLlib наразі підтримує наступні моделі кластеризації

  • K-means
  • Gaussian mixture
  • Power iteration clustering (PIC)
  • Bisecting k-means
  • Streaming k-means

K-means

RDD = sc.textFile("data/wine_data.csv") \
  .filter(lambda line: not line.startswith("class_label")) \
  .map(lambda x: x.split(",")) \
  .map(lambda x: [float(x[0]), float(x[1]), float(x[2])])

RDD.take(5)
[[1.0, 14.23, 1.71],
 [1.0, 13.2, 1.78],
 [1.0, 13.16, 2.36],
 [1.0, 14.37, 1.95],
 [1.0, 13.24, 2.59]]

Навчання K-means

from pyspark.mllib.clustering import KMeans

model = KMeans.train(RDD, k = 3, maxIterations = 10)
model.clusterCenters
[array([ 2.12121212, 12.27030303,  1.62227273]),
 array([ 2.62962963, 13.0737037 ,  3.77259259]),
 array([ 1.0862069 , 13.76362069,  1.81172414])]

Оцінка моделі

from math import sqrt

def error(point):
  center = model.centers[model.predict(point)]
  return sqrt(sum([x**2 for x in (point - center)]))

WSSSE = RDD.map(lambda point: error(point)).reduce(lambda x, y: x + y)
print("Within Set Sum of Squared Error = " + str(WSSSE))
Within Set Sum of Squared Error = 136.79460814030136

Візуалізація

from pyspark.sql import SparkSession
import pandas as pd
import matplotlib.pyplot as plt

spark = SparkSession.builder.appName("KMeans").getOrCreate()

wine_data_df = spark.createDataFrame(RDD, schema=["col0", "col1", "col2"])
wine_data_df_pandas = wine_data_df.toPandas()

cluster_centers_pandas = pd.DataFrame(
    [center[1:] for center in model.clusterCenters],  # пропускаємо col0
    columns=["col1", "col2"]
)

plt.scatter(wine_data_df_pandas["col1"], wine_data_df_pandas["col2"], c=wine_data_df_pandas["col0"])
plt.scatter(cluster_centers_pandas["col1"], cluster_centers_pandas["col2"], c="red", marker="x")
plt.title("KMeans Clustering")
plt.show()

Візуалізація

Дякую за увагу!



Матеріали курсу

ihor.miroshnychenko@knu.ua

Data Mirosh

@ihormiroshnychenko

@aranaur

aranaur.rbind.io