12 Spark Pipelines
12.1 Build an Estimator (plan)
Create a simple estimator that transforms data and fits a model
Use the
spark_lineitems
variable to create a new aggregation byorder_id
. Summarize the total sales and number of itemsspark_lineitems %>% mutate(price = as.double(price)) %>% group_by(order_id) %>% summarise(total_sales = sum(price, na.rm = TRUE), no_items = n())
## # Source: spark<?> [?? x 3] ## order_id total_sales no_items ## <chr> <dbl> <dbl> ## 1 33531 45.4 6 ## 2 33538 79.9 12 ## 3 33541 70.0 10 ## 4 33550 92.8 16 ## 5 33561 46.6 8 ## 6 33565 55.6 8 ## 7 33568 76.6 10 ## 8 33571 74 12 ## 9 33572 54.4 8 ## 10 33582 82.5 12 ## # … with more rows
Assign the code to a new variable called
orders
Start a new code chunk, with calling
ml_pipeline(sc)
## Pipeline (Estimator) with no stages ## <pipeline_55c15862a53>
Pipe the
ml_pipeline()
code into aft_dplyr_transfomer()
call. Use theorders
variable for its argument## Pipeline (Estimator) with 1 stage ## <pipeline_55c68ca32c1> ## Stages ## |--1 SQLTransformer (Transformer) ## | <dplyr_transformer_55c795af321> ## | (Parameters -- Column Names)
Add an
ft_binarizer()
step that determines if the total sale is above $50. Name the new variableabove_50
## Pipeline (Estimator) with 2 stages ## <pipeline_55c2f66e520> ## Stages ## |--1 SQLTransformer (Transformer) ## | <dplyr_transformer_55c74b98194> ## | (Parameters -- Column Names) ## |--2 Binarizer (Transformer) ## | <binarizer_55cbe5bb73> ## | (Parameters -- Column Names) ## | input_col: total_sales ## | output_col: above_50
Using the
ft_r_formula
, add a step that sets the model’s formula to:above_50 ~ no_items
ml_pipeline(sc) %>% ft_dplyr_transformer(orders) %>% ft_binarizer("total_sales", "above_50", 50) %>% ft_r_formula(above_50 ~ no_items)
## Pipeline (Estimator) with 3 stages ## <pipeline_55c7014263c> ## Stages ## |--1 SQLTransformer (Transformer) ## | <dplyr_transformer_55c640e84e2> ## | (Parameters -- Column Names) ## |--2 Binarizer (Transformer) ## | <binarizer_55c6073c45b> ## | (Parameters -- Column Names) ## | input_col: total_sales ## | output_col: above_50 ## |--3 RFormula (Estimator) ## | <r_formula_55c3c7df218> ## | (Parameters -- Column Names) ## | features_col: features ## | label_col: label ## | (Parameters) ## | force_index_label: FALSE ## | formula: above_50 ~ no_items ## | handle_invalid: error ## | stringIndexerOrderType: frequencyDesc
Finalize the pipeline by adding a
ml_logistic_regression()
step, no arguments are neededml_pipeline(sc) %>% ft_dplyr_transformer(orders) %>% ft_binarizer("total_sales", "above_50", 50) %>% ft_r_formula(above_50 ~ no_items) %>% ml_logistic_regression()
## Pipeline (Estimator) with 4 stages ## <pipeline_55cc32e40c> ## Stages ## |--1 SQLTransformer (Transformer) ## | <dplyr_transformer_55c183307c4> ## | (Parameters -- Column Names) ## |--2 Binarizer (Transformer) ## | <binarizer_55c10313fcc> ## | (Parameters -- Column Names) ## | input_col: total_sales ## | output_col: above_50 ## |--3 RFormula (Estimator) ## | <r_formula_55c69641535> ## | (Parameters -- Column Names) ## | features_col: features ## | label_col: label ## | (Parameters) ## | force_index_label: FALSE ## | formula: above_50 ~ no_items ## | handle_invalid: error ## | stringIndexerOrderType: frequencyDesc ## |--4 LogisticRegression (Estimator) ## | <logistic_regression_55c412af3f9> ## | (Parameters -- Column Names) ## | features_col: features ## | label_col: label ## | prediction_col: prediction ## | probability_col: probability ## | raw_prediction_col: rawPrediction ## | (Parameters) ## | aggregation_depth: 2 ## | elastic_net_param: 0 ## | family: auto ## | fit_intercept: TRUE ## | max_iter: 100 ## | reg_param: 0 ## | standardization: TRUE ## | threshold: 0.5 ## | tol: 1e-06
Assign the code to a new variable called
orders_plan
Call
orders_plan
to confirm that all of the steps are present## Pipeline (Estimator) with 4 stages ## <pipeline_55c13a42573> ## Stages ## |--1 SQLTransformer (Transformer) ## | <dplyr_transformer_55c59d87735> ## | (Parameters -- Column Names) ## |--2 Binarizer (Transformer) ## | <binarizer_55c52b9fd9e> ## | (Parameters -- Column Names) ## | input_col: total_sales ## | output_col: above_50 ## |--3 RFormula (Estimator) ## | <r_formula_55c30166c14> ## | (Parameters -- Column Names) ## | features_col: features ## | label_col: label ## | (Parameters) ## | force_index_label: FALSE ## | formula: above_50 ~ no_items ## | handle_invalid: error ## | stringIndexerOrderType: frequencyDesc ## |--4 LogisticRegression (Estimator) ## | <logistic_regression_55c5655d997> ## | (Parameters -- Column Names) ## | features_col: features ## | label_col: label ## | prediction_col: prediction ## | probability_col: probability ## | raw_prediction_col: rawPrediction ## | (Parameters) ## | aggregation_depth: 2 ## | elastic_net_param: 0 ## | family: auto ## | fit_intercept: TRUE ## | max_iter: 100 ## | reg_param: 0 ## | standardization: TRUE ## | threshold: 0.5 ## | tol: 1e-06
12.2 Build a Transformer (fit)
Execute the planned changes to obtain a new model
Use
ml_fit()
to execute the changes inorder_plan
using thespark_lineitems
data. Assign to a new variable calledorders_fit
Call
orders_fit
to see the print-out of the newly fitted model## PipelineModel (Transformer) with 4 stages ## <pipeline_55c13a42573> ## Stages ## |--1 SQLTransformer (Transformer) ## | <dplyr_transformer_55c59d87735> ## | (Parameters -- Column Names) ## |--2 Binarizer (Transformer) ## | <binarizer_55c52b9fd9e> ## | (Parameters -- Column Names) ## | input_col: total_sales ## | output_col: above_50 ## |--3 RFormulaModel (Transformer) ## | <r_formula_55c30166c14> ## | (Parameters -- Column Names) ## | features_col: features ## | label_col: label ## | (Transformer Info) ## | formula: chr "above_50 ~ no_items" ## |--4 LogisticRegressionModel (Transformer) ## | <logistic_regression_55c5655d997> ## | (Parameters -- Column Names) ## | features_col: features ## | label_col: label ## | prediction_col: prediction ## | probability_col: probability ## | raw_prediction_col: rawPrediction ## | (Transformer Info) ## | coefficient_matrix: num [1, 1] 2.21 ## | coefficients: num 2.21 ## | intercept: num -16.7 ## | intercept_vector: num -16.7 ## | num_classes: int 2 ## | num_features: int 1 ## | threshold: num 0.5 ## | thresholds: num [1:2] 0.5 0.5
12.3 Predictions using Spark Pipelines
Overview of how to use a fitted pipeline to run predictions
Use
ml_transform()
in order to use theorders_fit
model to run predictions overspark_lineitems
With
count()
, compare the results fromabove_50
against the predictions, the variable created byml_transform()
is calledprediction
## # Source: spark<?> [?? x 3] ## # Groups: above_50 ## above_50 prediction n ## <dbl> <dbl> <dbl> ## 1 0 1 783 ## 2 0 0 9282 ## 3 1 1 16387 ## 4 1 0 92
12.4 Save the pipeline objects
Overview of how to save the Estimator and the Transformer
Use
ml_save()
to saveorder_plan
in a new folder called “saved_model”## Model successfully saved.
Navigate to the “saved_model” folder to inspect its contents
Use
ml_save()
to saveorders_fit
in a new folder called “saved_pipeline”## Model successfully saved.
Navigate to the “saved_pipeline” folder to inspect its contents