12 Spark Pipelines

12.1 Build an Estimator (plan)

Create a simple estimator that transforms data and fits a model

  1. Use the spark_lineitems variable to create a new aggregation by order_id. Summarize the total sales and number of items

    ## # Source: spark<?> [?? x 3]
    ##    order_id total_sales no_items
    ##    <chr>          <dbl>    <dbl>
    ##  1 33531           45.4        6
    ##  2 33538           79.9       12
    ##  3 33541           70.0       10
    ##  4 33550           92.8       16
    ##  5 33561           46.6        8
    ##  6 33565           55.6        8
    ##  7 33568           76.6       10
    ##  8 33571           74         12
    ##  9 33572           54.4        8
    ## 10 33582           82.5       12
    ## # … with more rows
  2. Assign the code to a new variable called orders

  3. Start a new code chunk, with calling ml_pipeline(sc)

    ## Pipeline (Estimator) with no stages
    ## <pipeline_55c15862a53>
  4. Pipe the ml_pipeline() code into a ft_dplyr_transfomer() call. Use the orders variable for its argument

    ## Pipeline (Estimator) with 1 stage
    ## <pipeline_55c68ca32c1> 
    ##   Stages 
    ##   |--1 SQLTransformer (Transformer)
    ##   |    <dplyr_transformer_55c795af321> 
    ##   |     (Parameters -- Column Names)
  5. Add an ft_binarizer() step that determines if the total sale is above $50. Name the new variable above_50

    ## Pipeline (Estimator) with 2 stages
    ## <pipeline_55c2f66e520> 
    ##   Stages 
    ##   |--1 SQLTransformer (Transformer)
    ##   |    <dplyr_transformer_55c74b98194> 
    ##   |     (Parameters -- Column Names)
    ##   |--2 Binarizer (Transformer)
    ##   |    <binarizer_55cbe5bb73> 
    ##   |     (Parameters -- Column Names)
    ##   |      input_col: total_sales
    ##   |      output_col: above_50
  6. Using the ft_r_formula, add a step that sets the model’s formula to: above_50 ~ no_items

    ## Pipeline (Estimator) with 3 stages
    ## <pipeline_55c7014263c> 
    ##   Stages 
    ##   |--1 SQLTransformer (Transformer)
    ##   |    <dplyr_transformer_55c640e84e2> 
    ##   |     (Parameters -- Column Names)
    ##   |--2 Binarizer (Transformer)
    ##   |    <binarizer_55c6073c45b> 
    ##   |     (Parameters -- Column Names)
    ##   |      input_col: total_sales
    ##   |      output_col: above_50
    ##   |--3 RFormula (Estimator)
    ##   |    <r_formula_55c3c7df218> 
    ##   |     (Parameters -- Column Names)
    ##   |      features_col: features
    ##   |      label_col: label
    ##   |     (Parameters)
    ##   |      force_index_label: FALSE
    ##   |      formula: above_50 ~ no_items
    ##   |      handle_invalid: error
    ##   |      stringIndexerOrderType: frequencyDesc
  7. Finalize the pipeline by adding a ml_logistic_regression() step, no arguments are needed

    ## Pipeline (Estimator) with 4 stages
    ## <pipeline_55cc32e40c> 
    ##   Stages 
    ##   |--1 SQLTransformer (Transformer)
    ##   |    <dplyr_transformer_55c183307c4> 
    ##   |     (Parameters -- Column Names)
    ##   |--2 Binarizer (Transformer)
    ##   |    <binarizer_55c10313fcc> 
    ##   |     (Parameters -- Column Names)
    ##   |      input_col: total_sales
    ##   |      output_col: above_50
    ##   |--3 RFormula (Estimator)
    ##   |    <r_formula_55c69641535> 
    ##   |     (Parameters -- Column Names)
    ##   |      features_col: features
    ##   |      label_col: label
    ##   |     (Parameters)
    ##   |      force_index_label: FALSE
    ##   |      formula: above_50 ~ no_items
    ##   |      handle_invalid: error
    ##   |      stringIndexerOrderType: frequencyDesc
    ##   |--4 LogisticRegression (Estimator)
    ##   |    <logistic_regression_55c412af3f9> 
    ##   |     (Parameters -- Column Names)
    ##   |      features_col: features
    ##   |      label_col: label
    ##   |      prediction_col: prediction
    ##   |      probability_col: probability
    ##   |      raw_prediction_col: rawPrediction
    ##   |     (Parameters)
    ##   |      aggregation_depth: 2
    ##   |      elastic_net_param: 0
    ##   |      family: auto
    ##   |      fit_intercept: TRUE
    ##   |      max_iter: 100
    ##   |      reg_param: 0
    ##   |      standardization: TRUE
    ##   |      threshold: 0.5
    ##   |      tol: 1e-06
  8. Assign the code to a new variable called orders_plan

  9. Call orders_plan to confirm that all of the steps are present

    ## Pipeline (Estimator) with 4 stages
    ## <pipeline_55c13a42573> 
    ##   Stages 
    ##   |--1 SQLTransformer (Transformer)
    ##   |    <dplyr_transformer_55c59d87735> 
    ##   |     (Parameters -- Column Names)
    ##   |--2 Binarizer (Transformer)
    ##   |    <binarizer_55c52b9fd9e> 
    ##   |     (Parameters -- Column Names)
    ##   |      input_col: total_sales
    ##   |      output_col: above_50
    ##   |--3 RFormula (Estimator)
    ##   |    <r_formula_55c30166c14> 
    ##   |     (Parameters -- Column Names)
    ##   |      features_col: features
    ##   |      label_col: label
    ##   |     (Parameters)
    ##   |      force_index_label: FALSE
    ##   |      formula: above_50 ~ no_items
    ##   |      handle_invalid: error
    ##   |      stringIndexerOrderType: frequencyDesc
    ##   |--4 LogisticRegression (Estimator)
    ##   |    <logistic_regression_55c5655d997> 
    ##   |     (Parameters -- Column Names)
    ##   |      features_col: features
    ##   |      label_col: label
    ##   |      prediction_col: prediction
    ##   |      probability_col: probability
    ##   |      raw_prediction_col: rawPrediction
    ##   |     (Parameters)
    ##   |      aggregation_depth: 2
    ##   |      elastic_net_param: 0
    ##   |      family: auto
    ##   |      fit_intercept: TRUE
    ##   |      max_iter: 100
    ##   |      reg_param: 0
    ##   |      standardization: TRUE
    ##   |      threshold: 0.5
    ##   |      tol: 1e-06

12.2 Build a Transformer (fit)

Execute the planned changes to obtain a new model

  1. Use ml_fit() to execute the changes in order_plan using the spark_lineitems data. Assign to a new variable called orders_fit

  2. Call orders_fit to see the print-out of the newly fitted model

    ## PipelineModel (Transformer) with 4 stages
    ## <pipeline_55c13a42573> 
    ##   Stages 
    ##   |--1 SQLTransformer (Transformer)
    ##   |    <dplyr_transformer_55c59d87735> 
    ##   |     (Parameters -- Column Names)
    ##   |--2 Binarizer (Transformer)
    ##   |    <binarizer_55c52b9fd9e> 
    ##   |     (Parameters -- Column Names)
    ##   |      input_col: total_sales
    ##   |      output_col: above_50
    ##   |--3 RFormulaModel (Transformer)
    ##   |    <r_formula_55c30166c14> 
    ##   |     (Parameters -- Column Names)
    ##   |      features_col: features
    ##   |      label_col: label
    ##   |     (Transformer Info)
    ##   |      formula:  chr "above_50 ~ no_items" 
    ##   |--4 LogisticRegressionModel (Transformer)
    ##   |    <logistic_regression_55c5655d997> 
    ##   |     (Parameters -- Column Names)
    ##   |      features_col: features
    ##   |      label_col: label
    ##   |      prediction_col: prediction
    ##   |      probability_col: probability
    ##   |      raw_prediction_col: rawPrediction
    ##   |     (Transformer Info)
    ##   |      coefficient_matrix:  num [1, 1] 2.21 
    ##   |      coefficients:  num 2.21 
    ##   |      intercept:  num -16.7 
    ##   |      intercept_vector:  num -16.7 
    ##   |      num_classes:  int 2 
    ##   |      num_features:  int 1 
    ##   |      threshold:  num 0.5 
    ##   |      thresholds:  num [1:2] 0.5 0.5

12.3 Predictions using Spark Pipelines

Overview of how to use a fitted pipeline to run predictions

  1. Use ml_transform() in order to use the orders_fit model to run predictions over spark_lineitems

  2. With count(), compare the results from above_50 against the predictions, the variable created by ml_transform() is called prediction

    ## # Source: spark<?> [?? x 3]
    ## # Groups: above_50
    ##   above_50 prediction     n
    ##      <dbl>      <dbl> <dbl>
    ## 1        0          1   783
    ## 2        0          0  9282
    ## 3        1          1 16387
    ## 4        1          0    92

12.4 Save the pipeline objects

Overview of how to save the Estimator and the Transformer

  1. Use ml_save() to save order_plan in a new folder called “saved_model”

    ## Model successfully saved.
  2. Navigate to the “saved_model” folder to inspect its contents

  3. Use ml_save() to save orders_fit in a new folder called “saved_pipeline”

    ## Model successfully saved.
  4. Navigate to the “saved_pipeline” folder to inspect its contents