A. Collect data to dataframe

  • From File

Upload “iris_v1.csv” to Jupyer notebook

import findspark 
import pyspark 
findspark.init() 
from pyspark.sql import SparkSession 
from pyspark import SparkContext, SparkConf 
spark = SparkSession.builder.appName('iris').getOrCreate() 
irisDF = spark.read.csv('iris_v1.csv',inferSchema=True,header=True) 
irisDF.show()
  • From Array

You build a dataframe from Array

sentenceDataFrame = spark.createDataFrame([
(0, "Canada has very good universities"),
(1, "UofT is very popular unversity in Canada"),
(2, "Waterloo is very popular in math and computer science in Canada")], ["id", "u_sentence"])
sentenceDataFrame .show(5)
  • From JSON File
    Upload “iris_v1.json” to Jupyer notebook
df_json=spark.read.json("iris_v1.json")
df_json.printSchema()
df_json.show(5)
  • From JSON string
str="{\"sepal_length\": 5.1,\"sepal_width\": 3.5,\"petal_length\": 1.4,\"petal_width\": 0.2,
\"petal_country\": \"USA\",\"species\": \"setosa\"}"
arr=[str]
rdd=spark.sparkContext.parallelize(arr)
df_json2=spark.read.json(rdd)
df_json2.show()
  • Read from Hive table
df = spark.sql("select * from hive_table")
df.show()
  • Read from hdfs file

1- Start all nodes, goto cmd, tyoe “start-all”
2- load data to hdfs://localhost:9000/lab
>hadoop fs -put iris_v1.csv hdfs://localhost:9000/lab

hadoopdf = spark.read.csv('hdfs://localhost:9000/lab/iris_v1.csv',inferSchema=True,header=True)
hadoopdf.show(5)

B. Transform data

  • Select specific columns

-Print schema

print(hadoopdf.printSchema())
-Choose two columns from the data set
hadoopdf.select('sepal_length','petal_country').show(5)

  • Add new columns

from pyspark.sql.functions import concat, col, lit
from pyspark.sql import functions as sf
hadoopdf.withColumn('petal_country2',concat(sf.col('petal_country'),lit('-'))).show(5)

  • Replace columns

– Replace the column and display all columns

hadoopdf.withColumnRenamed('petal_country','country').show(5)

  • Use function in dataframe

from pyspark.sql.functions import mean as _mean, stddev as _stddev, col

from pyspark.sql.functions import mean as _mean, stddev as _stddev, col
hadoopdf.select(_mean(col('sepal_width')).alias('mean'),_stddev(col('sepal_width')).alias('stddev')).show(5)

  • GroupBy/Affregate Dataframe with Max/Min/Count

Group by column petal_country to count the number of row

hadoopdf.select('sepal_length','petal_country').groupby('petal_country').count().show()

hadoopdf.select('sepal_length','petal_country').groupby('petal_country').agg({"sepal_length": "avg", "sepal_length": "count"}).show()

Use defined function to do the groupby and aggregation

import pyspark.sql.functions as f
print(continentDF.printSchema())
continentDF.groupby('country').agg(f.sum('population').alias('sum')).show()

import pyspark.sql.functions as f
hadoopdf.groupby('petal_country').agg(f.sum('sepal_length').alias('sum'),f.count('sepal_length').alias('count')).show()

C. Filter data

Without function:

irisDF.filter(irisDF.petal_country=="CA").show(5)

continentDF.filter(continentDF.population>80000000).show()

For two or more conditions
| => Or
& => And

irisDF.filter(irisDF.petal_country.isin('CA') | irisDF.petal_country.isin('USA')).show(5)

Use Not in condition (~)

irisDF.filter(~(irisDF.petal_country=='CA')).show(5)

With function:

import pyspark.sql.functions as f
irisDF.filter(f.col('petal_country')=="CA").show(5)

continentDF.filter(f.col('population')>80000000).show()

D. Enrich data

  • Join tables
    We can implement the join at query level as regular query. Or we can implement the join at dataframe. We will give example join at dataframe.
    The joins are: join, left_join, right_join

irisDF.join(continentDF, irisDF.petal_country == continentDF.counteryID, "right_outer").show(5)
irisDF.join(continentDF, irisDF.petal_country == continentDF.counteryID, "inner").show(5)

  • Union

irisDF.union(irisDF).show(5)

E. Data modeling

  • Build function

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
def check_currency(x):
if x=="CA":
return "CA-CURRENCY"
elif x=="USA":
return "USA-CURRENCY"
else:
return "No currency"
currencyUDF = udf(check_currency, StringType()) irisDF.select(currencyUDF(irisDF.petal_country)).show(5)

  • Build lambda function

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
curr2=lambda x:'CA-CURRENCY' if x=='CA' else 'USA-CURRENCY'
currencyUDF2 = udf(curr2, StringType())
irisDF.select(currencyUDF2(irisDF.petal_country).alias('currency')).show(5)

  • Performance Tuning: Cache the table

To avoid lazy evaluation in process we can use cacheTable command. Lazy evaluation, means all the transformation processes will start when the action appears. The process will start from the begining. To avoid this case, we can cache the table. Also it will help us to minimize memory usage and GC pressure.

      1- Cache the data frame table
>hadoopdf.cache()

      2- Cache registered table
                - register data frame as table
                - Cache the table
                  hadoopdf.createOrReplaceTempView("hadoopdf")
                  spark.sql("select * from hadoopdf").show(5)
                  spark.catalog.cacheTable("hadoopdf")

  • Performance Tuning: Broadcast the table
    SparkSQL has the ability to broadcast specific table when we use this table in the joining with another table and the broadcast table should be a small size.

from pyspark.sql.functions import broadcast
broadcast(continentDF).join(hadoopdf,hadoopdf.petal_country == continentDF.counteryID, "inner").show(5)

F. Machine learning extraction methods

  • Features: VectorAssembler

Convert the columns Integer/double to features: This kind for features is to change the numeric columns to features

from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(
inputCols=["sepal_length","sepal_width","petal_length","petal_width"], outputCol="features")
outputdf = assembler.transform(irisDF.select("sepal_length","sepal_width","petal_length","petal_width"))
outputdf.show(5)

  • Features: FeatureHasher

Convert all the columns to Hashing values (Integer/Double/String)

from pyspark.ml.feature import FeatureHasher
hasher = FeatureHasher(inputCols=["sepal_length","sepal_width","petal_length","petal_width","petal_country"], outputCol="features") featurized = hasher.transform(irisDF)
featurized.show(3,truncate=False)

  • Features: RFormula

It’s the model of R to convert the set of columns to Features column. We can write the formula to produce the Features and label Label_column ~ first_column + second_column

The RFormula has two steps: Fit & Transform The fit() step is to determine the mapping the dataset and convert it to vector. The transform() step is implement the formula above to produce the feature column and Label

from pyspark.ml.feature import RFormula
formula = RFormula(formula="petal_country ~ sepal_length + sepal_width + petal_length + petal_width", featuresCol="features",labelCol="label")
output = formula.fit(irisDF).transform(irisDF)
output.select("features", "label").show(8)

  • Features: Tokenizer

Tokenizer is to split the sentences into words. This is very important Natural Language Processing (NLP)

from pyspark.ml.feature import Tokenizer
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType 
sentenceDataFrame = spark.createDataFrame([
(0, "Canada has very good universities"),
(1, "UofT is very popular unversity in Canada"),
(2, "Waterloo is very popular in math and computer science in Canada")], ["id", "u_sentence"]) tokenizer = Tokenizer(inputCol="u_sentence", outputCol="words") regexTokenizer = RegexTokenizer(inputCol="u_sentence",outputCol="words", pattern="\W")
countTokens = udf(lambda words: len(words), IntegerType())
countTokens = udf(lambda words1,words2:words1+words2 , IntegerType())
tokenized = tokenizer.transform(sentenceDataFrame)
tokenized.show(truncate=False)
tokenized.select("u_sentence", "words").withColumn("tokens",countTokens(col("words"))).show(truncate=False)

  • Features:StopWordsRemover

It's the process to remove stop words like has,have, is, are, very, in, on,out from the sentences. The input should be Array of words. This step should come after tokenization process

from pyspark.ml.feature import StopWordsRemover
remover = StopWordsRemover(inputCol="words", outputCol="non-stop-words")<
df=remover.transform(tokenized)
df.select("words","non-stop-words").show(truncate=False)

  • Features:PCA

PCA is a statistical process to combine all features columns to one feature column. Sometimes the one column feature is useful in Visualization and machine learning. This process should comes after we change the columns to Features column.

from pyspark.ml.feature 
import PCA
#from  pyspark.sql.functions
import abs
#from pyspark.sql.functions import
#import pyspark.sql.functions as f
#from pyspark.sql.functions import udf
#from pyspark.sql.types import DoubleType
#import numpy as np
pca = PCA(k=1, inputCol="features", outputCol="pcaFeatures")
model = pca.fit(output)
result = model.transform(output).select("features",f.col("pcaFeatures"))
result.show(5)

  • Features:StringIndexer

StringIndexer: is the process to convert the string/number to index. We use the process when we convert the label/class to index. The ML model can not work with string label

from pyspark.ml.feature import StringIndexer
indexer = StringIndexer(inputCol="species", outputCol="speciesIndex")
indexed = indexer.fit(irisDF).transform(irisDF)
indexed.select('species','speciesIndex').distinct().show()