How to use pipelines in your machine learning models

Using pipelines keeps machine learning code cleaner, easier to maintain, easier to move to production, and lets you apply a range of additional techniques.

How to use pipelines in your machine learning models
Picture by Martin Visser, Unsplash.
16 minutes to read

There’s often a great deal of repetition in machine learning projects. A typical machine learning workflow involves a number of common processes designed to clean, prepare, and transform data, so it’s in the right format to generate good results when it passes through the model.

Pipelines, which are part of scikit-learn, make it possible to automate some of these processes, cutting down on code, improving code readability, making your work more reproducible, and ensuring you implement things in the proper way. Here’s a quick guide to how you can get started using pipelines within your projects to improve your work.

Load the packages

For this project we’ll be using pandas, sklearn, and xgboost. Open up a Jupyter notebook and import the packages below. If you don’t have pandas, sklearn, and xgboost installed, you can install them by typing pip3 install package-name in your terminal.

import pandas as pd
from sklearn.model_selection import train_test_split
from xgboost import XGBClassifier
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import OneHotEncoder
from sklearn.metrics import accuracy_score
from sklearn.metrics import roc_auc_score
from sklearn.preprocessing import StandardScaler

Load the data

For this project I’ve used the Customer Churn Prediction 2020 dataset from Kaggle. This is data from a telecoms provider, so we’ll be using it here to create a contractual churn model. Since the aim is to show how to use pipelines, and not how to create a contractual churn model, I’ll skip some parts, and we’ll only load up the pre-split train.csv data. To keep things tidy, we’ll use the Pandas rename function to convert the column names to lowercase.

df = pd.read_csv('train.csv')
df = df.rename(columns=str.lower)
df.head().T

As you can see below, this dataset includes mostly numeric values, but also had a few categorical variables of varying complexity. We’ll be able to handle these automatically using our pipeline.

0 1 2 3 4
state OH NJ OH OK MA
account_length 107 137 84 75 121
area_code area_code_415 area_code_415 area_code_408 area_code_415 area_code_510
international_plan no no yes yes no
voice_mail_plan yes no no no yes
number_vmail_messages 26 0 0 0 24
total_day_minutes 161.6 243.4 299.4 166.7 218.2
total_day_calls 123 114 71 113 88
total_day_charge 27.47 41.38 50.9 28.34 37.09
total_eve_minutes 195.5 121.2 61.9 148.3 348.5
total_eve_calls 103 110 88 122 108
total_eve_charge 16.62 10.3 5.26 12.61 29.62
total_night_minutes 254.4 162.6 196.9 186.9 212.6
total_night_calls 103 104 89 121 118
total_night_charge 11.45 7.32 8.86 8.41 9.57
total_intl_minutes 13.7 12.2 6.6 10.1 7.5
total_intl_calls 3 5 7 3 7
total_intl_charge 3.7 3.29 1.78 2.73 2.03
number_customer_service_calls 1 0 2 3 3
churn no no no no no

Examine the data

If you run the Pandas value_counts() function you’ll see that this is an imbalanced dataset in which the positive class (people who didn’t churn) are significantly lower than the negative class (people who did churn). Classification on imbalanced datasets like this is much harder to handle, so we may benefit from some special features in our pipeline.

df.churn.value_counts()
no     3652
yes     598
Name: churn, dtype: int64

Since the churn column isn’t going through our pipeline, we’ll binarise the yes and no values present using a quick replace(), which will allow our model to use this as its target parameter.

df['churn'] = df['churn'].replace(('yes', 'no'), (1, 0))

Check for missing values

This dataset has probably been pre-cleansed, as df.isnull().sum() shows us that we don’t have any null values to deal with at all, which will save us a step later.

df.isnull().sum()
state                            0
account_length                   0
area_code                        0
international_plan               0
voice_mail_plan                  0
number_vmail_messages            0
total_day_minutes                0
total_day_calls                  0
total_day_charge                 0
total_eve_minutes                0
total_eve_calls                  0
total_eve_charge                 0
total_night_minutes              0
total_night_calls                0
total_night_charge               0
total_intl_minutes               0
total_intl_calls                 0
total_intl_charge                0
number_customer_service_calls    0
churn                            0
dtype: int64

Examine categorical data cardinality

Next we’ll take a look at the “cardinality” of the categorical variables. Cardinality is just a technical way of saying the number of unique values held within. Columns with low cardinality, such as area_code, international_plan, and voice_mail_plan can be treated differently to the state column which has a high cardinality of 51.

df.select_dtypes(include=['object']).agg(['count','nunique']).T
count nunique
state 4250 51
area_code 4250 3
international_plan 4250 2
voice_mail_plan 4250 2
df.describe().T
count mean std min 25% 50% 75% max
account_length 4250.0 100.236235 39.698401 1.0 73.0000 100.00 127.0000 243.00
number_vmail_messages 4250.0 7.631765 13.439882 0.0 0.0000 0.00 16.0000 52.00
total_day_minutes 4250.0 180.259600 54.012373 0.0 143.3250 180.45 216.2000 351.50
total_day_calls 4250.0 99.907294 19.850817 0.0 87.0000 100.00 113.0000 165.00
total_day_charge 4250.0 30.644682 9.182096 0.0 24.3650 30.68 36.7500 59.76
total_eve_minutes 4250.0 200.173906 50.249518 0.0 165.9250 200.70 233.7750 359.30
total_eve_calls 4250.0 100.176471 19.908591 0.0 87.0000 100.00 114.0000 170.00
total_eve_charge 4250.0 17.015012 4.271212 0.0 14.1025 17.06 19.8675 30.54
total_night_minutes 4250.0 200.527882 50.353548 0.0 167.2250 200.45 234.7000 395.00
total_night_calls 4250.0 99.839529 20.093220 0.0 86.0000 100.00 113.0000 175.00
total_night_charge 4250.0 9.023892 2.265922 0.0 7.5225 9.02 10.5600 17.77
total_intl_minutes 4250.0 10.256071 2.760102 0.0 8.5000 10.30 12.0000 20.00
total_intl_calls 4250.0 4.426353 2.463069 0.0 3.0000 4.00 6.0000 20.00
total_intl_charge 4250.0 2.769654 0.745204 0.0 2.3000 2.78 3.2400 5.40
number_customer_service_calls 4250.0 1.559059 1.311434 0.0 1.0000 1.00 2.0000 9.00
churn 4250.0 0.140706 0.347759 0.0 0.0000 0.00 0.0000 1.00

Split the training and test data

Next we’ll split our dataset into the X feature set and the y target data, and we’ll use the train_test_split() function to create our test and training datasets. I’ve allocated 30% of the data to the test dataset and have set the random_state value to 0 to give reproducible results.

X = df.drop(['churn'], axis=1)
y = df['churn']

X_train, X_test, y_train, y_test = train_test_split(X, y, 
                                                    test_size=0.3, 
                                                    random_state=0)

Identify the numeric and categorical columns

As we will want to treat the categorical data differently to the numeric data, I’ll create a couple of lists which contain the corresponding column names. The code below returns a list of columns with an object datatype and assigns them to categorical_columns, and returns the opposite to numeric_columns by using exclude.

categorical_columns = list(X_train.select_dtypes(include=['object']).columns.values.tolist())
categorical_columns
['state', 'area_code', 'international_plan', 'voice_mail_plan']
numeric_columns = list(X_train.select_dtypes(exclude=['object']).columns.values.tolist())
numeric_columns
['account_length',
 'number_vmail_messages',
 'total_day_minutes',
 'total_day_calls',
 'total_day_charge',
 'total_eve_minutes',
 'total_eve_calls',
 'total_eve_charge',
 'total_night_minutes',
 'total_night_calls',
 'total_night_charge',
 'total_intl_minutes',
 'total_intl_calls',
 'total_intl_charge',
 'number_customer_service_calls']

Determine the correct scale_pos_weight

Since we’re creating an XGBoost classification model, and since we’re dealing with an imbalanced dataset, we need to calculate the optimal scale_pos_weight value to pass into our model. The below function returns the value 6 which we can pass in to XGBoost to get a small improvement when dealing with class imbalance.

def get_scale_pos_weight(target, square_root=False, gridsearch=False):
    """Return the scale_pos_weight parameter for the XGBoost model when data are imbalanced.
    The scale_pos_weight parameter is calculated from the ratio of the negative class over
    the positive class. The exact scale_pos_weight sometimes does not give the best result,
    so by passing the gridsearch=True parameter you can return a list of values to test with
    GridSearchCV. In addition, passing square_root=True changes the scale_pos_weight to the
    square root value, which can sometimes be beneficial on extremely imbalanced data.
    
    :param target: Pandas dataframe column containing the binary target
    :param square_root: Optional boolean parameter to convert to square root on extremely unbalanced data
    :param gridsearch: Optional boolean parameter to return a bracketed list for use in GridSearchCV
    
    Usage:
        scale_pos_weight = get_scale_pos_weight(df['target'], square_root=False, gridsearch=True)
        
    """
    
    import math
    
    scale_pos_weight = round((len(target) - sum(target)) / sum(target))
    
    if square_root:
        scale_pos_weight = round(math.sqrt(scale_pos_weight))
    
    if gridsearch:
        scale_pos_weight = [scale_pos_weight-2, scale_pos_weight-1, scale_pos_weight, 
                            scale_pos_weight+1, scale_pos_weight+2]
    
    return scale_pos_weight
get_scale_pos_weight(df['churn'], square_root=False, gridsearch=False)
6

Create the pipelines

Finally, now all of that is out of the way, we can actually build the pipelines. Firstly, we’ll create a numeric_transformer which runs the SimpleImputer() to impute missing values. If we had any missing values in our numeric data, this would intelligently fill them in for us.

numeric_transformer = SimpleImputer(strategy='constant')

Next, we’ll create a categorical_transformer comprising a SimpleImputer and a OneHotEncoder to binarise our categorical variables. This isn’t actually ideal for that high cardinality state column, but this is for demonstration purposes only.

categorical_transformer = Pipeline(steps=[
    ('simple_imputer', SimpleImputer(strategy='most_frequent')),
    ('onehot_encoder', OneHotEncoder(handle_unknown='ignore')),    
])

Now we’ll build a preprocessor using ColumnTransformer() which runs the numeric_transformer on our numeric_columns list and the categorical_transformer on our categorical_columns list. Note that this only builds the pipeline and that it hasn’t been run yet. (If you find this useful, check out the Category Encoders package.)

preprocessor = ColumnTransformer(
    transformers=[
        ('numeric', numeric_transformer, numeric_columns),
        ('categorical', categorical_transformer, categorical_columns),        
    ]
)

Define the model and bundle the pipeline

Next we’ll define a simple XGBoost model, using the scale_pos_weight value we calculated earlier to improve the handling of the class imbalance, then we’ll use Pipeline to bundle our steps together. This will preprocess the data using our pipeline and then fit the model.

model = XGBClassifier(random_state=0, scale_pos_weight=6)
bundled_pipeline = Pipeline(steps=[
    ('preprocessor', preprocessor),
    ('model', model)
])

Preprocess the training data and fit the model

Now we can append the fit() function to our bundled_pipeline and pass in our X_train and y_train data. This will run the steps in our preprocessor, and fit the model, all in a single line of code.

bundled_pipeline.fit(X_train, y_train)
Pipeline(steps=[('preprocessor',
                 ColumnTransformer(transformers=[('numeric',
                                                  SimpleImputer(strategy='constant'),
                                                  ['account_length',
                                                   'number_vmail_messages',
                                                   'total_day_minutes',
                                                   'total_day_calls',
                                                   'total_day_charge',
                                                   'total_eve_minutes',
                                                   'total_eve_calls',
                                                   'total_eve_charge',
                                                   'total_night_minutes',
                                                   'total_night_calls',
                                                   'total_night_charge',
                                                   'total_intl_minutes',...
                               colsample_bytree=1, gamma=0, gpu_id=-1,
                               importance_type='gain',
                               interaction_constraints='',
                               learning_rate=0.300000012, max_delta_step=0,
                               max_depth=6, min_child_weight=1, missing=nan,
                               monotone_constraints='()', n_estimators=100,
                               n_jobs=0, num_parallel_tree=1, random_state=0,
                               reg_alpha=0, reg_lambda=1, scale_pos_weight=6,
                               subsample=1, tree_method='exact',
                               validate_parameters=1, verbosity=None))])

Generate predictions

To generate predictions from out X_test data, which hasn’t been through the above steps, we can call the bundled_pipeline again and append the predict() function. This ensures the exact same processes are followed on both the test and train data.

y_pred = bundled_pipeline.predict(X_test)

Now that’s run, we can examine the model’s performance. We get an accuracy score of 96.94% and a ROC/AUC of 0.90, which is pretty good for a first attempt.

roc_auc = roc_auc_score(y_test, y_pred)
accuracy = accuracy_score(y_test, y_pred)
print('ROC/AUC', roc_auc)
print('Accuracy', accuracy)
ROC/AUC 0.9043384073098834
Accuracy 0.9694117647058823

There are a number of other things we could do to improve the performance of the churn model itself, but I’ll cover these separately. Hopefully this shows you how effective and useful the pipeline approach can be.

Matt Clarke, Thursday, August 12, 2021

Matt Clarke Matt is an Ecommerce and Marketing Director who uses data science to help in his work. Matt has a Master's degree in Internet Retailing (plus two other Master's degrees in different fields) and specialises in the technical side of ecommerce and marketing.