Моделі та методи обробки великих даних
КНУ імені Тараса Шевченка, ФІТ
Великі дані — набори інформації (як структурованої, так і неструктурованої) настільки великих розмірів, що традиційні способи та підходи (здебільшого засновані на рішеннях класу бізнесової аналітики та системах управління базами даних) не можуть бути застосовані до них - Wikipedia
Примітка
Apache Spark на сьогоднішній день є кращим за Hadoop/MapReduce у більшості випадків, оскільки він швидший, простіший у використанні та підтримує різноманітні джерела даних.
scWelcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 3.5.5
/_/
Using Python version 3.11.4 (tags/v3.11.4:d2340ef, Jun 7 2023 05:45:37)
Spark context Web UI available at http://AranaurPC:4040
Spark context available as 'sc' (master = local[*], app id = local-1745489304985).
SparkSession available as 'spark'.
parallelize()textFile() SparkContextmap() та filter()defdef та lambdamap()map() застосовує функцію до всіх елементів у вхідному спискуmap()map()filter()filter() отримує функцію та список і повертає новий список, для якого функція повертає значення truefilter()filter()parallelize() для створення RDD зі списків на PythontextFile() для створення RDD із зовнішніх наборів данихПартиція (Partition) — це логічний поділ великого розподіленого набору даних
метод parallelize()
textFile()Кількість партицій в RDD можна дізнатися за допомогою методу getNumPartitions()
map()filter()flatMap()union()map()map() застосовує функцію до всіх елементів у RDDfilter()filter() повертає новий RDD з елементами, які відповідають умовіflatMap()flatMap() перетворює RDD у новий RDD, де кожен елемент може бути розгорнутий у кілька елементівunion()union() об’єднує два RDD в одинЦе операції, які повертають значення після виконання обчислень на RDD
collect()take(N)first()count()collect() та take()collect() повертає всі елементи набору даних у вигляді масивуtake(N) повертає масив перших N елементів набору данихfirst() та count()first() виводить перший елемент RDDcount() повертає кількість елементів у RDD
reduceByKey(func): Об’єднує значення з однаковим ключемgroupByKey(): Згрупувати значення з однаковим ключемsortByKey(): Повернути RDD, відсортований за ключемjoin(): Об’єднати дві пари RDD на основі їх ключаreduceByKey(func)reduceByKey(func) об’єднує значення з однаковим ключемsortByKey()sortByKey() повертає RDD, відсортований за ключемgroupByKey()groupByKey() згруповує значення з однаковим ключемjoin()join() об’єднує дві пари RDD на основі їх ключаreduce()reduce() об’єднує всі елементи RDD в одинsaveAsTextFile()saveAsTextFile() зберігає RDD у текстовому файліcoalesce() можна використовувати для збереження RDD у вигляді одного текстового файлуcountByKey()collectAsMap()countByKey()countByKey() доступний тільки для типу Pair RDDcountByKey() підраховує кількість елементів для кожного ключаcollectAsMap()collectAsMap() повернути пари ключ-значення в RDD у вигляді словникаSELECT * from table ), так і вирази методи ( df.select() )createDataFrame() SparkSessionread() SparkSessionfrom pyspark.sql import SparkSession
spark = SparkSession.builder.appName("DataFrame").getOrCreate()
iphones_RDD = sc.parallelize([
("XS", 2018, 5.65, 2.79, 6.24),
("XR", 2018, 5.94, 2.98, 6.84),
("X10", 2017, 5.65, 2.79, 6.13),
("8Plus", 2017, 6.23, 3.07, 7.12)
])
names = ['Model', 'Year', 'Height', 'Width', 'Weight']
iphones_df = spark.createDataFrame(iphones_RDD, schema=names)
type(iphones_df)pyspark.sql.dataframe.DataFrame
header=TrueinferSchema=Trueselect()filter()groupby()orderby()dropDuplicates()withColumnRenamed()head()show()count()columnsdescribe()Примітка
printSchema() є методом для будь-якого набору даних/фрейму даних Spark
select() та show()select() вибирає стовпці з DataFrameshow() виводить перші рядки DataFramefrom pyspark.sql import SparkSession
from datetime import date as sysdate
spark = SparkSession.builder.appName("DataFrame").getOrCreate()
people = spark.read.csv("data/people.csv", header=True, inferSchema=True)
people.select('name').show(5)+--------------+
| name|
+--------------+
|Penelope Lewis|
| David Anthony|
| Ida Shipp|
| Joanna Moore|
|Lisandra Ortiz|
+--------------+
only showing top 5 rows
filter()filter() вибирає рядки з DataFrame, які відповідають умові+---+---------+-----------------+------+-------------------+
|_c0|person_id| name| sex| date of birth|
+---+---------+-----------------+------+-------------------+
| 0| 100| Penelope Lewis|female|1990-08-31 00:00:00|
| 2| 102| Ida Shipp|female|1962-05-24 00:00:00|
| 3| 103| Joanna Moore|female|2017-03-10 00:00:00|
| 4| 104| Lisandra Ortiz|female|2020-08-05 00:00:00|
| 11| 111|Annabelle Rosseau|female|1989-07-13 00:00:00|
+---+---------+-----------------+------+-------------------+
only showing top 5 rows
groupby() та count()orderby()+-----+---------+---------------+------+-------------------+
| _c0|person_id| name| sex| date of birth|
+-----+---------+---------------+------+-------------------+
|57359| 57459| Sharon Perez|female|1899-08-28 00:00:00|
|62233| 62333|Martina Morison|female|1901-04-21 00:00:00|
|96318| 96418| Lisa Garrett|female|1901-05-09 00:00:00|
|39703| 39803| Naomi Davis|female|1902-04-25 00:00:00|
|64563| 64663| Brenda French|female|1902-07-27 00:00:00|
+-----+---------+---------------+------+-------------------+
only showing top 5 rows
dropDuplicates()withColumnRenamed()withColumnRenamed() перейменовує стовпець у DataFrame+---+---------+--------------+------+-------------------+
|_c0|person_id| name| sex| dob|
+---+---------+--------------+------+-------------------+
| 0| 100|Penelope Lewis|female|1990-08-31 00:00:00|
| 1| 101| David Anthony| male|1971-10-14 00:00:00|
| 2| 102| Ida Shipp|female|1962-05-24 00:00:00|
| 3| 103| Joanna Moore|female|2017-03-10 00:00:00|
| 4| 104|Lisandra Ortiz|female|2020-08-05 00:00:00|
+---+---------+--------------+------+-------------------+
only showing top 5 rows
printSchema()columnscolumns повертає список стовпців у DataFramedescribe()describe() повертає статистику для числових стовпців у DataFrame+-------+-----------------+-----------------+-------------+------+
|summary| _c0| person_id| name| sex|
+-------+-----------------+-----------------+-------------+------+
| count| 100000| 100000| 100000| 98080|
| mean| 49999.5| 50099.5| NULL| NULL|
| stddev|28867.65779668774|28867.65779668774| NULL| NULL|
| min| 0| 100|Aaron Addesso|female|
| max| 99999| 100099| Zulma Biggs| male|
+-------+-----------------+-----------------+-------------+------+
sql() виконує SQL-запитsql() отримує SQL-запит як аргумент і повертає результат у вигляді DataFramepeople.createOrReplaceTempView("people")
df_sql = spark.sql("SELECT * FROM people LIMIT 5")
df_sql.show()+---+---------+--------------+------+-------------------+
|_c0|person_id| name| sex| dob|
+---+---------+--------------+------+-------------------+
| 0| 100|Penelope Lewis|female|1990-08-31 00:00:00|
| 1| 101| David Anthony| male|1971-10-14 00:00:00|
| 2| 102| Ida Shipp|female|1962-05-24 00:00:00|
| 3| 103| Joanna Moore|female|2017-03-10 00:00:00|
| 4| 104|Lisandra Ortiz|female|2020-08-05 00:00:00|
+---+---------+--------------+------+-------------------+
query = '''SELECT * FROM people WHERE dob > '2000-01-01' '''
df_sql = spark.sql(query)
df_sql.show(5)+---+---------+-----------------+------+-------------------+
|_c0|person_id| name| sex| dob|
+---+---------+-----------------+------+-------------------+
| 3| 103| Joanna Moore|female|2017-03-10 00:00:00|
| 4| 104| Lisandra Ortiz|female|2020-08-05 00:00:00|
| 9| 109| Everett Vadala| male|2005-05-24 00:00:00|
| 10| 110| Freddie Claridge| male|2002-05-07 00:00:00|
| 17| 117|Florence Eberhart|female|2024-06-01 00:00:00|
+---+---------+-----------------+------+-------------------+
only showing top 5 rows
Побудова графіків за допомогою фреймів даних PySpark виконується трьома методами
pyspark_dist_exploretoPandas()HandySparkpyspark_dist_explorepyspark_dist_explore — це бібліотека для візуалізації даних у PySparkhist() , distplot() та pandas_histogram()pyspark_dist_exploretoPandas()Примітка
Якщо ви маєте великі обсяги даних, використовувати toPandas() не рекомендується
Rating у pyspark.mllib.recommendationsRating — це обгортка навколо кортежу (користувач, товар і рейтинг)randomSplit()randomSplit() PySpark випадковим чином розбиває дані із заданими вагами і повертає декілька RDDspark.mllib забезпечує колаборативну фільтраціюALS.train(ratings, rank, iterations)predictAll()predictAll() повертає список прогнозованих оцінок для вхідної пари користувач і товар[((1, 1), 1.0), ((1, 2), 2.0), ((2, 1), 2.0)]
[((1, 1), 1.0000327845217156), ((1, 2), 1.9890339755906339)]
Vectors та LabelledPointLabeledPointLabeledPoint — це обгортка для вхідних ознак і прогнозованого значенняHashingTFHashingTF() використовується для зіставлення значення ознаки з індексами у векторі ознакLogisticRegressionWithLBFGSБібліотека PySpark MLlib наразі підтримує наступні моделі кластеризації
from math import sqrt
def error(point):
center = model.centers[model.predict(point)]
return sqrt(sum([x**2 for x in (point - center)]))
WSSSE = RDD.map(lambda point: error(point)).reduce(lambda x, y: x + y)
print("Within Set Sum of Squared Error = " + str(WSSSE))Within Set Sum of Squared Error = 136.79460814030136
from pyspark.sql import SparkSession
import pandas as pd
import matplotlib.pyplot as plt
spark = SparkSession.builder.appName("KMeans").getOrCreate()
wine_data_df = spark.createDataFrame(RDD, schema=["col0", "col1", "col2"])
wine_data_df_pandas = wine_data_df.toPandas()
cluster_centers_pandas = pd.DataFrame(
[center[1:] for center in model.clusterCenters], # пропускаємо col0
columns=["col1", "col2"]
)
plt.scatter(wine_data_df_pandas["col1"], wine_data_df_pandas["col2"], c=wine_data_df_pandas["col0"])
plt.scatter(cluster_centers_pandas["col1"], cluster_centers_pandas["col2"], c="red", marker="x")
plt.title("KMeans Clustering")
plt.show()