Моделі та методи обробки великих даних
КНУ імені Тараса Шевченка, ФІТ
Великі дані — набори інформації (як структурованої, так і неструктурованої) настільки великих розмірів, що традиційні способи та підходи (здебільшого засновані на рішеннях класу бізнесової аналітики та системах управління базами даних) не можуть бути застосовані до них - Wikipedia
Примітка
Apache Spark на сьогоднішній день є кращим за Hadoop/MapReduce у більшості випадків, оскільки він швидший, простіший у використанні та підтримує різноманітні джерела даних.
sc
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 3.5.5
/_/
Using Python version 3.11.4 (tags/v3.11.4:d2340ef, Jun 7 2023 05:45:37)
Spark context Web UI available at http://AranaurPC:4040
Spark context available as 'sc' (master = local[*], app id = local-1745489304985).
SparkSession available as 'spark'.
parallelize()
textFile()
SparkContextmap()
та filter()
def
def
та lambda
map()
map()
застосовує функцію до всіх елементів у вхідному спискуmap()
map()
filter()
filter()
отримує функцію та список і повертає новий список, для якого функція повертає значення true
filter()
filter()
parallelize()
для створення RDD зі списків на PythontextFile()
для створення RDD із зовнішніх наборів данихПартиція (Partition) — це логічний поділ великого розподіленого набору даних
метод parallelize()
textFile()
Кількість партицій в RDD можна дізнатися за допомогою методу getNumPartitions()
map()
filter()
flatMap()
union()
map()
map()
застосовує функцію до всіх елементів у RDDfilter()
filter()
повертає новий RDD з елементами, які відповідають умовіflatMap()
flatMap()
перетворює RDD у новий RDD, де кожен елемент може бути розгорнутий у кілька елементівunion()
union()
об’єднує два RDD в одинЦе операції, які повертають значення після виконання обчислень на RDD
collect()
take(N)
first()
count()
collect()
та take()
collect()
повертає всі елементи набору даних у вигляді масивуtake(N)
повертає масив перших N
елементів набору данихfirst()
та count()
first()
виводить перший елемент RDDcount()
повертає кількість елементів у RDD
reduceByKey(func)
: Об’єднує значення з однаковим ключемgroupByKey()
: Згрупувати значення з однаковим ключемsortByKey()
: Повернути RDD, відсортований за ключемjoin()
: Об’єднати дві пари RDD на основі їх ключаreduceByKey(func)
reduceByKey(func)
об’єднує значення з однаковим ключемsortByKey()
sortByKey()
повертає RDD, відсортований за ключемgroupByKey()
groupByKey()
згруповує значення з однаковим ключемjoin()
join()
об’єднує дві пари RDD на основі їх ключаreduce()
reduce()
об’єднує всі елементи RDD в одинsaveAsTextFile()
saveAsTextFile()
зберігає RDD у текстовому файліcoalesce()
можна використовувати для збереження RDD у вигляді одного текстового файлуcountByKey()
collectAsMap()
countByKey()
countByKey()
доступний тільки для типу Pair RDDcountByKey()
підраховує кількість елементів для кожного ключаcollectAsMap()
collectAsMap()
повернути пари ключ-значення в RDD у вигляді словникаSELECT * from table
), так і вирази методи ( df.select()
)createDataFrame()
SparkSessionread()
SparkSessionfrom pyspark.sql import SparkSession
spark = SparkSession.builder.appName("DataFrame").getOrCreate()
iphones_RDD = sc.parallelize([
("XS", 2018, 5.65, 2.79, 6.24),
("XR", 2018, 5.94, 2.98, 6.84),
("X10", 2017, 5.65, 2.79, 6.13),
("8Plus", 2017, 6.23, 3.07, 7.12)
])
names = ['Model', 'Year', 'Height', 'Width', 'Weight']
iphones_df = spark.createDataFrame(iphones_RDD, schema=names)
type(iphones_df)
pyspark.sql.dataframe.DataFrame
header=True
inferSchema=True
select()
filter()
groupby()
orderby()
dropDuplicates()
withColumnRenamed()
head()
show()
count()
columns
describe()
Примітка
printSchema()
є методом для будь-якого набору даних/фрейму даних Spark
select()
та show()
select()
вибирає стовпці з DataFrameshow()
виводить перші рядки DataFramefrom pyspark.sql import SparkSession
from datetime import date as sysdate
spark = SparkSession.builder.appName("DataFrame").getOrCreate()
people = spark.read.csv("data/people.csv", header=True, inferSchema=True)
people.select('name').show(5)
+--------------+
| name|
+--------------+
|Penelope Lewis|
| David Anthony|
| Ida Shipp|
| Joanna Moore|
|Lisandra Ortiz|
+--------------+
only showing top 5 rows
filter()
filter()
вибирає рядки з DataFrame, які відповідають умові+---+---------+-----------------+------+-------------------+
|_c0|person_id| name| sex| date of birth|
+---+---------+-----------------+------+-------------------+
| 0| 100| Penelope Lewis|female|1990-08-31 00:00:00|
| 2| 102| Ida Shipp|female|1962-05-24 00:00:00|
| 3| 103| Joanna Moore|female|2017-03-10 00:00:00|
| 4| 104| Lisandra Ortiz|female|2020-08-05 00:00:00|
| 11| 111|Annabelle Rosseau|female|1989-07-13 00:00:00|
+---+---------+-----------------+------+-------------------+
only showing top 5 rows
groupby()
та count()
orderby()
+-----+---------+---------------+------+-------------------+
| _c0|person_id| name| sex| date of birth|
+-----+---------+---------------+------+-------------------+
|57359| 57459| Sharon Perez|female|1899-08-28 00:00:00|
|62233| 62333|Martina Morison|female|1901-04-21 00:00:00|
|96318| 96418| Lisa Garrett|female|1901-05-09 00:00:00|
|39703| 39803| Naomi Davis|female|1902-04-25 00:00:00|
|64563| 64663| Brenda French|female|1902-07-27 00:00:00|
+-----+---------+---------------+------+-------------------+
only showing top 5 rows
dropDuplicates()
withColumnRenamed()
withColumnRenamed()
перейменовує стовпець у DataFrame+---+---------+--------------+------+-------------------+
|_c0|person_id| name| sex| dob|
+---+---------+--------------+------+-------------------+
| 0| 100|Penelope Lewis|female|1990-08-31 00:00:00|
| 1| 101| David Anthony| male|1971-10-14 00:00:00|
| 2| 102| Ida Shipp|female|1962-05-24 00:00:00|
| 3| 103| Joanna Moore|female|2017-03-10 00:00:00|
| 4| 104|Lisandra Ortiz|female|2020-08-05 00:00:00|
+---+---------+--------------+------+-------------------+
only showing top 5 rows
printSchema()
columns
columns
повертає список стовпців у DataFramedescribe()
describe()
повертає статистику для числових стовпців у DataFrame+-------+-----------------+-----------------+-------------+------+
|summary| _c0| person_id| name| sex|
+-------+-----------------+-----------------+-------------+------+
| count| 100000| 100000| 100000| 98080|
| mean| 49999.5| 50099.5| NULL| NULL|
| stddev|28867.65779668774|28867.65779668774| NULL| NULL|
| min| 0| 100|Aaron Addesso|female|
| max| 99999| 100099| Zulma Biggs| male|
+-------+-----------------+-----------------+-------------+------+
sql()
виконує SQL-запитsql()
отримує SQL-запит як аргумент і повертає результат у вигляді DataFramepeople.createOrReplaceTempView("people")
df_sql = spark.sql("SELECT * FROM people LIMIT 5")
df_sql.show()
+---+---------+--------------+------+-------------------+
|_c0|person_id| name| sex| dob|
+---+---------+--------------+------+-------------------+
| 0| 100|Penelope Lewis|female|1990-08-31 00:00:00|
| 1| 101| David Anthony| male|1971-10-14 00:00:00|
| 2| 102| Ida Shipp|female|1962-05-24 00:00:00|
| 3| 103| Joanna Moore|female|2017-03-10 00:00:00|
| 4| 104|Lisandra Ortiz|female|2020-08-05 00:00:00|
+---+---------+--------------+------+-------------------+
query = '''SELECT * FROM people WHERE dob > '2000-01-01' '''
df_sql = spark.sql(query)
df_sql.show(5)
+---+---------+-----------------+------+-------------------+
|_c0|person_id| name| sex| dob|
+---+---------+-----------------+------+-------------------+
| 3| 103| Joanna Moore|female|2017-03-10 00:00:00|
| 4| 104| Lisandra Ortiz|female|2020-08-05 00:00:00|
| 9| 109| Everett Vadala| male|2005-05-24 00:00:00|
| 10| 110| Freddie Claridge| male|2002-05-07 00:00:00|
| 17| 117|Florence Eberhart|female|2024-06-01 00:00:00|
+---+---------+-----------------+------+-------------------+
only showing top 5 rows
Побудова графіків за допомогою фреймів даних PySpark виконується трьома методами
pyspark_dist_explore
toPandas()
HandySpark
pyspark_dist_explore
pyspark_dist_explore
— це бібліотека для візуалізації даних у PySparkhist()
, distplot()
та pandas_histogram()
pyspark_dist_explore
toPandas()
Примітка
Якщо ви маєте великі обсяги даних, використовувати toPandas()
не рекомендується
Rating
у pyspark.mllib.recommendations
Rating
— це обгортка навколо кортежу (користувач, товар і рейтинг)randomSplit()
randomSplit()
PySpark випадковим чином розбиває дані із заданими вагами і повертає декілька RDDspark.mllib
забезпечує колаборативну фільтраціюALS.train(ratings, rank, iterations)
predictAll()
predictAll()
повертає список прогнозованих оцінок для вхідної пари користувач і товар[((1, 1), 1.0), ((1, 2), 2.0), ((2, 1), 2.0)]
[((1, 1), 1.0000327845217156), ((1, 2), 1.9890339755906339)]
Vectors
та LabelledPoint
LabeledPoint
LabeledPoint
— це обгортка для вхідних ознак і прогнозованого значенняHashingTF
HashingTF()
використовується для зіставлення значення ознаки з індексами у векторі ознакLogisticRegressionWithLBFGS
Бібліотека PySpark MLlib наразі підтримує наступні моделі кластеризації
from math import sqrt
def error(point):
center = model.centers[model.predict(point)]
return sqrt(sum([x**2 for x in (point - center)]))
WSSSE = RDD.map(lambda point: error(point)).reduce(lambda x, y: x + y)
print("Within Set Sum of Squared Error = " + str(WSSSE))
Within Set Sum of Squared Error = 136.79460814030136
from pyspark.sql import SparkSession
import pandas as pd
import matplotlib.pyplot as plt
spark = SparkSession.builder.appName("KMeans").getOrCreate()
wine_data_df = spark.createDataFrame(RDD, schema=["col0", "col1", "col2"])
wine_data_df_pandas = wine_data_df.toPandas()
cluster_centers_pandas = pd.DataFrame(
[center[1:] for center in model.clusterCenters], # пропускаємо col0
columns=["col1", "col2"]
)
plt.scatter(wine_data_df_pandas["col1"], wine_data_df_pandas["col2"], c=wine_data_df_pandas["col0"])
plt.scatter(cluster_centers_pandas["col1"], cluster_centers_pandas["col2"], c="red", marker="x")
plt.title("KMeans Clustering")
plt.show()