A. Collect data to dataframe
- From File
Upload “iris_v1.csv” to Jupyer notebook
import findspark import pyspark findspark.init() from pyspark.sql import SparkSession from pyspark import SparkContext, SparkConf spark = SparkSession.builder.appName('iris').getOrCreate() irisDF = spark.read.csv('iris_v1.csv',inferSchema=True,header=True) irisDF.show()
- From Array
You build a dataframe from Array
sentenceDataFrame = spark.createDataFrame([ (0, "Canada has very good universities"), (1, "UofT is very popular unversity in Canada"), (2, "Waterloo is very popular in math and computer science in Canada")], ["id", "u_sentence"]) sentenceDataFrame .show(5)
- From JSON File
Upload “iris_v1.json” to Jupyer notebook
df_json=spark.read.json("iris_v1.json") df_json.printSchema() df_json.show(5)
- From JSON string
str="{\"sepal_length\": 5.1,\"sepal_width\": 3.5,\"petal_length\": 1.4,\"petal_width\": 0.2, \"petal_country\": \"USA\",\"species\": \"setosa\"}" arr=[str] rdd=spark.sparkContext.parallelize(arr) df_json2=spark.read.json(rdd) df_json2.show()
- Read from Hive table
df = spark.sql("select * from hive_table") df.show()
- Read from hdfs file
1- Start all nodes, goto cmd, tyoe “start-all”
2- load data to hdfs://localhost:9000/lab
>hadoop fs -put iris_v1.csv hdfs://localhost:9000/lab
hadoopdf = spark.read.csv('hdfs://localhost:9000/lab/iris_v1.csv',inferSchema=True,header=True) hadoopdf.show(5)
B. Transform data
- Select specific columns
-Print schema
print(hadoopdf.printSchema())-Choose two columns from the data set
hadoopdf.select('sepal_length','petal_country').show(5)
- Add new columns
from pyspark.sql.functions import concat, col, lit
from pyspark.sql import functions as sf
hadoopdf.withColumn('petal_country2',concat(sf.col('petal_country'),lit('-'))).show(5)
- Replace columns
– Replace the column and display all columns
hadoopdf.withColumnRenamed('petal_country','country').show(5)
- Use function in dataframe
from pyspark.sql.functions import mean as _mean, stddev as _stddev, col
from pyspark.sql.functions import mean as _mean, stddev as _stddev, col
hadoopdf.select(_mean(col('sepal_width')).alias('mean'),_stddev(col('sepal_width')).alias('stddev')).show(5)
- GroupBy/Affregate Dataframe with Max/Min/Count
Group by column petal_country to count the number of row
hadoopdf.select('sepal_length','petal_country').groupby('petal_country').count().show()
hadoopdf.select('sepal_length','petal_country').groupby('petal_country').agg({"sepal_length": "avg", "sepal_length": "count"}).show()
Use defined function to do the groupby and aggregation
import pyspark.sql.functions as f
print(continentDF.printSchema())
continentDF.groupby('country').agg(f.sum('population').alias('sum')).show()
import pyspark.sql.functions as f
hadoopdf.groupby('petal_country').agg(f.sum('sepal_length').alias('sum'),f.count('sepal_length').alias('count')).show()
C. Filter data
Without function:
irisDF.filter(irisDF.petal_country=="CA").show(5)
continentDF.filter(continentDF.population>80000000).show()
For two or more conditions
| => Or
& => And
irisDF.filter(irisDF.petal_country.isin('CA') | irisDF.petal_country.isin('USA')).show(5)
Use Not in condition (~)
irisDF.filter(~(irisDF.petal_country=='CA')).show(5)
With function:
import pyspark.sql.functions as f
irisDF.filter(f.col('petal_country')=="CA").show(5)
continentDF.filter(f.col('population')>80000000).show()
D. Enrich data
- Join tables
We can implement the join at query level as regular query. Or we can implement the join at dataframe. We will give example join at dataframe.
The joins are: join, left_join, right_join
irisDF.join(continentDF, irisDF.petal_country == continentDF.counteryID, "right_outer").show(5)
irisDF.join(continentDF, irisDF.petal_country == continentDF.counteryID, "inner").show(5)
- Union
irisDF.union(irisDF).show(5)
E. Data modeling
- Build function
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
def check_currency(x):
if x=="CA":
return "CA-CURRENCY"
elif x=="USA":
return "USA-CURRENCY"
else:
return "No currency"
currencyUDF = udf(check_currency, StringType()) irisDF.select(currencyUDF(irisDF.petal_country)).show(5)
- Build lambda function
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
curr2=lambda x:'CA-CURRENCY' if x=='CA' else 'USA-CURRENCY'
currencyUDF2 = udf(curr2, StringType())
irisDF.select(currencyUDF2(irisDF.petal_country).alias('currency')).show(5)
- Performance Tuning: Cache the table
To avoid lazy evaluation in process we can use cacheTable command. Lazy evaluation, means all the transformation processes will start when the action appears. The process will start from the begining. To avoid this case, we can cache the table. Also it will help us to minimize memory usage and GC pressure.
1- Cache the data frame table
>hadoopdf.cache()
2- Cache registered table - register data frame as table - Cache the table hadoopdf.createOrReplaceTempView("hadoopdf") spark.sql("select * from hadoopdf").show(5) spark.catalog.cacheTable("hadoopdf")
- Performance Tuning: Broadcast the table
SparkSQL has the ability to broadcast specific table when we use this table in the joining with another table and the broadcast table should be a small size.
from pyspark.sql.functions import broadcast broadcast(continentDF).join(hadoopdf,hadoopdf.petal_country == continentDF.counteryID, "inner").show(5)
F. Machine learning extraction methods
- Features: VectorAssembler
Convert the columns Integer/double to features: This kind for features is to change the numeric columns to features
from pyspark.ml.feature import VectorAssembler assembler = VectorAssembler( inputCols=["sepal_length","sepal_width","petal_length","petal_width"], outputCol="features") outputdf = assembler.transform(irisDF.select("sepal_length","sepal_width","petal_length","petal_width")) outputdf.show(5)
- Features: FeatureHasher
Convert all the columns to Hashing values (Integer/Double/String)
from pyspark.ml.feature import FeatureHasher
hasher = FeatureHasher(inputCols=["sepal_length","sepal_width","petal_length","petal_width","petal_country"], outputCol="features") featurized = hasher.transform(irisDF)
featurized.show(3,truncate=False)
- Features: RFormula
It’s the model of R to convert the set of columns to Features column. We can write the formula to produce the Features and label Label_column ~ first_column + second_column
The RFormula has two steps: Fit & Transform The fit() step is to determine the mapping the dataset and convert it to vector. The transform() step is implement the formula above to produce the feature column and Label
from pyspark.ml.feature import RFormula formula = RFormula(formula="petal_country ~ sepal_length + sepal_width + petal_length + petal_width", featuresCol="features",labelCol="label") output = formula.fit(irisDF).transform(irisDF) output.select("features", "label").show(8)
- Features: Tokenizer
Tokenizer is to split the sentences into words. This is very important Natural Language Processing (NLP)
from pyspark.ml.feature import Tokenizer from pyspark.sql.functions import col, udf from pyspark.sql.types import IntegerType
sentenceDataFrame = spark.createDataFrame([
(0, "Canada has very good universities"),
(1, "UofT is very popular unversity in Canada"),
(2, "Waterloo is very popular in math and computer science in Canada")], ["id", "u_sentence"]) tokenizer = Tokenizer(inputCol="u_sentence", outputCol="words") regexTokenizer = RegexTokenizer(inputCol="u_sentence",outputCol="words", pattern="\W")
countTokens = udf(lambda words: len(words), IntegerType())
countTokens = udf(lambda words1,words2:words1+words2 , IntegerType())
tokenized = tokenizer.transform(sentenceDataFrame)
tokenized.show(truncate=False)
tokenized.select("u_sentence", "words").withColumn("tokens",countTokens(col("words"))).show(truncate=False)
- Features:StopWordsRemover
It's the process to remove stop words like has,have, is, are, very, in, on,out from the sentences. The input should be Array of words. This step should come after tokenization process
from pyspark.ml.feature import StopWordsRemover remover = StopWordsRemover(inputCol="words", outputCol="non-stop-words")< df=remover.transform(tokenized) df.select("words","non-stop-words").show(truncate=False)
- Features:PCA
PCA is a statistical process to combine all features columns to one feature column. Sometimes the one column feature is useful in Visualization and machine learning. This process should comes after we change the columns to Features column.
from pyspark.ml.feature import PCA #from pyspark.sql.functions import abs #from pyspark.sql.functions import #import pyspark.sql.functions as f #from pyspark.sql.functions import udf #from pyspark.sql.types import DoubleType #import numpy as np pca = PCA(k=1, inputCol="features", outputCol="pcaFeatures") model = pca.fit(output) result = model.transform(output).select("features",f.col("pcaFeatures")) result.show(5)
- Features:StringIndexer
StringIndexer: is the process to convert the string/number to index. We use the process when we convert the label/class to index. The ML model can not work with string label
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer(inputCol="species", outputCol="speciesIndex")
indexed = indexer.fit(irisDF).transform(irisDF)
indexed.select('species','speciesIndex').distinct().show()