ML_Modeling

基于spark的纽约2013出租费用数据分析与建模

项目流程

  1. 数据读取、清洗与关联
  2. 数据探索分析可视化
  3. 数据预处理与特征工程
  4. 建模、超参数调优、预测与模型存储
  5. 模型评估

整个项目会用到很多的spark SQL操作,在大家工业界的实际项目中,除掉spark mllib中默认给到的特征工程模块,我们也会经常用spark SQL来进行特征工程(完成各种统计信息计算与变换)。

数据读取、清洗与关联
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
#数据注册成视图
trip = spark.read.csv(path=trip_file_loc, header=True, inferSchema=True)
fare = spark.read.csv(path=fare_file_loc, header=True, inferSchema=True)
#查看数据字段情况
trip.printSchema()
fare.printSchema()
#使用Spark SQL关联清洗与生成特征数据
sqlStatement = """
SELECT t.medallion,
t.hack_license,
f.total_amount,
f.tolls_amount,
hour(f.pickup_datetime) as pickup_hour,
f.vendor_id,
f.fare_amount,
f.surcharge,
f.tip_amount,
f.payment_type,
t.rate_code,
t.passenger_count,
t.trip_distance,
t.trip_time_in_secs
FROM trip t,
fare f
WHERE t.medallion = f.medallion
AND t.hack_license = f.hack_license
AND t.pickup_datetime = f.pickup_datetime
AND t.passenger_count > 0
and t.passenger_count < 8
AND f.tip_amount >= 0
AND f.tip_amount <= 25
AND f.fare_amount >= 1
AND f.fare_amount <= 250
AND f.tip_amount < f.fare_amount
AND t.trip_distance > 0
AND t.trip_distance <= 100
AND t.trip_time_in_secs >= 30
AND t.trip_time_in_secs <= 7200
AND t.rate_code <= 5
AND f.payment_type in ('CSH','CRD')
"""
trip_fareDF = spark.sql(sqlStatement)

# REGISTER JOINED TRIP-FARE DF IN SQL-CONTEXT
trip_fareDF.createOrReplaceTempView("trip_fare")
#
数据探索分析可视化
1
2
3
4
5
6
7
8
9
10
11
12
13
#使用SQL做数据分析
querySQL = '''
SELECT fare_amount, passenger_count, tip_amount
FROM taxi_train
WHERE passenger_count > 0
AND passenger_count < 7
AND fare_amount > 0
AND fare_amount < 100
AND tip_amount > 0
AND tip_amount < 15
'''
sqlResultsPD = spark.sql(querySQL).toPandas()
#单维度分析与关联分析(利用pyplot绘制图形可视化分析feature的影响和多对多的影响)
数据预处理与特征工程
1
2
3
4
5
6
7
8
9
10
11
#数据变换与特征工程(类别性可以进行数值序号编码转换OneHotEncoder)
#切分为训练集和数据集(randomSplit)
trainingFraction = 0.75; testingFraction = (1-trainingFraction);
seed = 1234;

# SPLIT SAMPLED DATA-FRAME INTO TRAIN/TEST, WITH A RANDOM COLUMN ADDED FOR DOING CV (SHOWN LATER)
trainData, testData = encodedFinal.randomSplit([trainingFraction, testingFraction], seed=seed);

# CACHE DATA FRAMES IN MEMORY
trainData.persist(); trainData.count()
testData.persist(); testData.count()
建模、超参数调优、预测与模型存储
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
#GBT Regression
from pyspark.ml.regression import GBTRegressor

## DEFINE REGRESSION FURMULA
regFormula = RFormula(formula="tip_amount ~ paymentIndex + vendorIndex + rateIndex + TrafficTimeBinsIndex + pickup_hour + passenger_count + trip_time_in_secs + trip_distance + fare_amount")

## DEFINE INDEXER FOR CATEGORIAL VARIABLES
featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=32)

## DEFINE GRADIENT BOOSTING TREE REGRESSOR
gBT = GBTRegressor(featuresCol="indexedFeatures", maxIter=10)

## Fit model, with formula and other transformations
model = Pipeline(stages=[regFormula, featureIndexer, gBT]).fit(trainData)

## PREDICT ON TEST DATA AND EVALUATE
predictions = model.transform(testData)
predictionAndLabels = predictions.select("label","prediction").rdd
testMetrics = RegressionMetrics(predictionAndLabels)
print("RMSE = %s" % testMetrics.rootMeanSquaredError)
print("R-sqr = %s" % testMetrics.r2)

## PLOC ACTUALS VS. PREDICTIONS
predictions.select("label","prediction").createOrReplaceTempView("tmp_results");

#超参数调优
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator

## DEFINE RANDOM FOREST MODELS
randForest = RandomForestRegressor(featuresCol = 'indexedFeatures', labelCol = 'label',
featureSubsetStrategy="auto",impurity='variance', maxBins=100)

## DEFINE MODELING PIPELINE, INCLUDING FORMULA, FEATURE TRANSFORMATIONS, AND ESTIMATOR
pipeline = Pipeline(stages=[regFormula, featureIndexer, randForest])

## DEFINE PARAMETER GRID FOR RANDOM FOREST
paramGrid = ParamGridBuilder() \
.addGrid(randForest.numTrees, [10, 25, 50]) \
.addGrid(randForest.maxDepth, [3, 5, 7]) \
.build()

## DEFINE CROSS VALIDATION
crossval = CrossValidator(estimator=pipeline,
estimatorParamMaps=paramGrid,
evaluator=RegressionEvaluator(metricName="rmse"),
numFolds=3)

## TRAIN MODEL USING CV
cvModel = crossval.fit(trainData)

## PREDICT AND EVALUATE TEST DATA SET
predictions = cvModel.transform(testData)
evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="r2")
r2 = evaluator.evaluate(predictions)
print("R-squared on test data = %g" % r2)

## SAVE THE BEST MODEL
datestamp = datetime.datetime.now().strftime('%m-%d-%Y-%s');
fileName = "CV_RandomForestRegressionModel_" + datestamp;
CVDirfilename = modelDir + fileName;
cvModel.bestModel.save(CVDirfilename);
模型评估以及保存加载
1
2
3
4
5
6
7
8
9
10
11
12
13
14
#from pyspark.ml import PipelineModel

savedModel = PipelineModel.load(randForestDirfilename)

predictions = savedModel.transform(testData)
predictionAndLabels = predictions.select("label","prediction").rdd
testMetrics = RegressionMetrics(predictionAndLabels)
print("RMSE = %s" % testMetrics.rootMeanSquaredError)
print("R-sqr = %s" % testMetrics.r2)
#结果存储到HDFS
datestamp = datetime.datetime.now().strftime('%m-%d-%Y-%s');
fileName = "Predictions_CV_" + datestamp;
predictionfile = dataDir + fileName;
predictions.select("label","prediction").write.mode("overwrite").csv(predictionfile)

基于spark的航班延误数据分析与建模

数据读取、清洗与关联
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
#数据注册成视图
# COUNT FLIGHTS BY AIRPORT
spark.sql("SELECT ORIGIN, COUNT(*) as CTORIGIN FROM airline GROUP BY ORIGIN").createOrReplaceTempView("countOrigin")
spark.sql("SELECT DEST, COUNT(*) as CTDEST FROM airline GROUP BY DEST").createOrReplaceTempView("countDest")

## CLEAN AIRLINE DATA WITH QUERY, FILTER FOR AIRPORTS WHICH HAVE VERY FEW FLIGHTS (<100)
sqlStatement = """
SELECT ARR_DEL15 as ArrDel15,
YEAR as Year,
MONTH as Month,
DAY_OF_MONTH as DayOfMonth,
DAY_OF_WEEK as DayOfWeek,
UNIQUE_CARRIER as Carrier,
ORIGIN_AIRPORT_ID as OriginAirportID,
ORIGIN,
DEST_AIRPORT_ID as DestAirportID,
DEST,
floor(CRS_DEP_TIME/100) as CRSDepTime,
floor(CRS_ARR_TIME/100) as CRSArrTime
FROM airline
WHERE ARR_DEL15 in ('0.0', '1.0')
AND ORIGIN IN (
SELECT DISTINCT ORIGIN
FROM countOrigin
where CTORIGIN > 100
)
AND DEST IN (
SELECT DISTINCT DEST
FROM countDest
where CTDEST > 100
)
"""
airCleaned = spark.sql(sqlStatement)

# REGISTER CLEANED AIR DATASET
airCleaned.createOrReplaceTempView("airCleaned")

## CLEAN WEATHER DATA WITH QUERY
sqlStatement = """
SELECT AdjustedYear,
AdjustedMonth,
AdjustedDay,
AdjustedHour,
AirportID,
avg(Visibility) as Visibility,
avg(DryBulbCelsius) as DryBulbCelsius,
avg(DewPointCelsius) as DewPointCelsius,
avg(RelativeHumidity) as RelativeHumidity,
avg(WindSpeed) as WindSpeed,
avg(Altimeter) as Altimeter
FROM weather
GROUP BY AdjustedYear,
AdjustedMonth,
AdjustedDay,
AdjustedHour,
AirportID
"""
weatherCleaned = spark.sql(sqlStatement)

# REGISTER CLEANED AIR DATASET
weatherCleaned.createOrReplaceTempView("weatherCleaned")
#按照年做数据切分,2011年的数据做训练,2012年的数据做验证
sqlStatement = """SELECT * from joined WHERE Year = 2011"""
train = spark.sql(sqlStatement)

# REGISTER JOINED
sqlStatement = """SELECT * from joined WHERE Year = 2012"""
validation = spark.sql(sqlStatement)
#数据存储
# SAVE JOINED DATA IN BLOB
trainfilename = dataDir + "TrainData";
train.write.mode("overwrite").parquet(trainfilename)

validfilename = dataDir + "ValidationData";
validation.write.mode("overwrite").parquet(validfilename)
#处理好的数据进行缓存
## PERSIST AND MATERIALIZE DF IN MEMORY
train_df.persist()
train_df.count()
数据探索分析可视化
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#单维度和多维度分析
%%local
%matplotlib inline
import matplotlib.pyplot as plt
## %%local creates a pandas data-frame on the head node memory, from spark data-frame,
## which can then be used for plotting. Here, sampling data is a good idea, depending on the memory of the head node

# TIP BY PAYMENT TYPE AND PASSENGER COUNT
ax1 = sqlResultsPD[['WindSpeedDest']].plot(kind='hist', bins=25, facecolor='lightblue')
ax1.set_title('WindSpeed @ Destination distribution')
ax1.set_xlabel('WindSpeedDest'); ax1.set_ylabel('Counts');
plt.figure(figsize=(4,4)); plt.suptitle(''); plt.show()

# TIP BY PASSENGER COUNT
ax2 = sqlResultsPD.boxplot(column=['WindSpeedDest'], by=['ArrDel15'])
ax2.set_title('WindSpeed Destination')
ax2.set_xlabel('ArrDel15'); ax2.set_ylabel('WindSpeed');
plt.figure(figsize=(4,4)); plt.suptitle(''); plt.show()
数据预处理与特征工程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
#过滤非空值
## EXAMPLES BELOW ALSO SHOW HOW TO USE SQL DIRECTLY ON DATAFRAMES
trainPartitionFilt = trainPartition.filter("ArrDel15 is not NULL and DayOfMonth is not NULL and DayOfWeek is not NULL \
and Carrier is not NULL and OriginAirportID is not NULL and DestAirportID is not NULL \
and CRSDepTime is not NULL and VisibilityOrigin is not NULL and DryBulbCelsiusOrigin is not NULL \
and DewPointCelsiusOrigin is not NULL and RelativeHumidityOrigin is not NULL \
and WindSpeedOrigin is not NULL and AltimeterOrigin is not NULL \
and VisibilityDest is not NULL and DryBulbCelsiusDest is not NULL \
and DewPointCelsiusDest is not NULL and RelativeHumidityDest is not NULL \
and WindSpeedDest is not NULL and AltimeterDest is not NULL ")
trainPartitionFilt.persist(); trainPartitionFilt.count()
trainPartitionFilt.createOrReplaceTempView("TrainPartitionFilt")
#测试集非空值也要过滤,保证测试机中的类型能覆盖训练集
testPartitionFilt = testPartition.filter("ArrDel15 is not NULL and DayOfMonth is not NULL and DayOfWeek is not NULL \
and Carrier is not NULL and OriginAirportID is not NULL and DestAirportID is not NULL \
and CRSDepTime is not NULL and VisibilityOrigin is not NULL and DryBulbCelsiusOrigin is not NULL \
and DewPointCelsiusOrigin is not NULL and RelativeHumidityOrigin is not NULL \
and WindSpeedOrigin is not NULL and AltimeterOrigin is not NULL \
and VisibilityDest is not NULL and DryBulbCelsiusDest is not NULL \
and DewPointCelsiusDest is not NULL and RelativeHumidityDest is not NULL \
and WindSpeedDest is not NULL and AltimeterDest is not NULL") \
.filter("OriginAirportID IN (SELECT distinct OriginAirportID FROM TrainPartitionFilt) \
AND ORIGIN IN (SELECT distinct ORIGIN FROM TrainPartitionFilt) \
AND DestAirportID IN (SELECT distinct DestAirportID FROM TrainPartitionFilt) \
AND DEST IN (SELECT distinct DEST FROM TrainPartitionFilt) \
AND Carrier IN (SELECT distinct Carrier FROM TrainPartitionFilt) \
AND CRSDepTime IN (SELECT distinct CRSDepTime FROM TrainPartitionFilt) \
AND DayOfMonth in (SELECT distinct DayOfMonth FROM TrainPartitionFilt) \
AND DayOfWeek in (SELECT distinct DayOfWeek FROM TrainPartitionFilt)")
testPartitionFilt.persist(); testPartitionFilt.count()
testPartitionFilt.createOrReplaceTempView("TestPartitionFilt")
#建pipeline
# TRANSFORM SOME FEATURES BASED ON MLLIB TRANSFORMATION FUNCTIONS
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorIndexer, Bucketizer, Binarizer

sI0 = StringIndexer(inputCol = 'ArrDel15', outputCol = 'ArrDel15_ind');
bin0 = Binarizer(inputCol = 'ArrDel15_ind', outputCol = 'ArrDel15_bin', threshold = 0.5);
sI1 = StringIndexer(inputCol="Carrier", outputCol="Carrier_ind");
transformPipeline = Pipeline(stages=[sI0, bin0, sI1]);

transformedTrain = transformPipeline.fit(trainPartition).transform(trainPartitionFilt)
transformedTest = transformPipeline.fit(trainPartition).transform(testPartitionFilt)

transformedTrain.persist(); transformedTrain.count();
transformedTest.persist(); transformedTest.count();
建模、超参数调优、预测与模型存储
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
#RL建模并使用ROC进行评估
from pyspark.ml.classification import LogisticRegression
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from sklearn.metrics import roc_curve,auc

## DEFINE ELASTIC NET REGRESSOR
eNet = LogisticRegression(featuresCol="indexedFeatures", maxIter=25, regParam=0.01, elasticNetParam=0.5)

## TRAINING PIPELINE: Fit model, with formula and other transformations
model = Pipeline(stages=[regFormula, featureIndexer, eNet]).fit(transformedTrain)

# SAVE MODEL
datestamp = datetime.datetime.now().strftime('%m-%d-%Y-%s');
fileName = "logisticRegModel_" + datestamp;
logRegDirfilename = modelDir + fileName;
model.save(logRegDirfilename)

## Evaluate model on test set
predictions = model.transform(transformedTest)
predictionAndLabels = predictions.select("label","prediction").rdd
predictions.select("label","probability").createOrReplaceTempView("tmp_results")

metrics = BinaryClassificationMetrics(predictionAndLabels)
print("Area under ROC = %s" % metrics.areaUnderROC)
#ROC曲线绘制模板
%%local
## PLOT ROC CURVE AFTER CONVERTING PREDICTIONS TO A PANDAS DATA FRAME
from sklearn.metrics import roc_curve,auc
import matplotlib.pyplot as plt
%matplotlib inline

labels = predictions_pddf["label"]
prob = []
for dv in predictions_pddf["probability"]:
prob.append(list(dv.values())[1][1])

fpr, tpr, thresholds = roc_curve(labels, prob, pos_label=1);
roc_auc = auc(fpr, tpr)

plt.figure(figsize=(5,5))
plt.plot(fpr, tpr, label='ROC curve (area = %0.2f)' % roc_auc)
plt.plot([0, 1], [0, 1], 'k--')
plt.xlim([0.0, 1.0]); plt.ylim([0.0, 1.05]);
plt.xlabel('False Positive Rate'); plt.ylabel('True Positive Rate');
plt.title('ROC Curve'); plt.legend(loc="lower right");
plt.show()
################################################
## DEFINE GRADIENT BOOSTING TREE CLASSIFIER
gBT = GBTRegressor(featuresCol="indexedFeatures", maxIter=10, maxBins = 250)
## DEFINE RANDOM FOREST CLASSIFIER
randForest = RandomForestClassifier(featuresCol = 'indexedFeatures', labelCol = 'label', numTrees=20, maxDepth=6, maxBins=250)
#网格搜索交叉验证做超参数调优
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator

## DEFINE RANDOM FOREST MODELS
## DEFINE RANDOM FOREST CLASSIFIER
randForest = RandomForestClassifier(featuresCol = 'indexedFeatures', labelCol = 'label', numTrees=20, \
maxDepth=6, maxBins=250)


## DEFINE MODELING PIPELINE, INCLUDING FORMULA, FEATURE TRANSFORMATIONS, AND ESTIMATOR
pipeline = Pipeline(stages=[regFormula, featureIndexer, randForest])

## DEFINE PARAMETER GRID FOR RANDOM FOREST
paramGrid = ParamGridBuilder() \
.addGrid(randForest.numTrees, [10, 25, 50]) \
.addGrid(randForest.maxDepth, [3, 5, 7]) \
.build()

## DEFINE CROSS VALIDATION
crossval = CrossValidator(estimator=pipeline,
estimatorParamMaps=paramGrid,
evaluator=BinaryClassificationEvaluator(metricName="areaUnderROC"),
numFolds=3)

## TRAIN MODEL USING CV
cvModel = crossval.fit(transformedTrain)

## Evaluate model on test set
predictions = cvModel.transform(transformedTest)
predictionAndLabels = predictions.select("label","prediction").rdd
metrics = BinaryClassificationMetrics(predictionAndLabels)
print("Area under ROC = %s" % metrics.areaUnderROC)

## SAVE THE BEST MODEL
datestamp = datetime.datetime.now().strftime('%m-%d-%Y-%s');
fileName = "CV_RandomForestRegressionModel_" + datestamp;
CVDirfilename = modelDir + fileName;
cvModel.bestModel.save(CVDirfilename);
模型评估以及保存加载
1
2
3
4
5
6
7
8
9
from pyspark.ml import PipelineModel

savedModel = PipelineModel.load(logRegDirfilename)

## Evaluate model on test set
predictions = savedModel.transform(transformedTest)
predictionAndLabels = predictions.select("label","prediction").rdd
metrics = BinaryClassificationMetrics(predictionAndLabels)
print("Area under ROC = %s" % metrics.areaUnderROC)

###