The code we’ll be working with on this piece is that this set of Python functions that use Pandas to read in and process data. It features a function to read the raw data in chunks, then a couple of functions that perform some transformations on the raw data.
# data_processing.py
import pandas as pd
from pandas import DataFramedef read_raw_data(file_path: str, chunk_size: int = 1000) -> DataFrame:
csv_reader = pd.read_csv(file_path, chunksize=chunk_size)
processed_chunks = []
for chunk in csv_reader:
chunk = chunk.loc[chunk["Order ID"] != "Order ID"].dropna()
processed_chunks.append(chunk)
return pd.concat(processed_chunks, axis=0)
def split_purchase_address(df_to_process: DataFrame) -> DataFrame:
df_address_split = df_to_process["Purchase Address"].str.split(
",", n=3, expand=True
)
df_address_split.columns = ["Street Name", "City", "State and Postal Code"]
df_state_postal_split = (
df_address_split["State and Postal Code"]
.str.strip()
.str.split(" ", n=2, expand=True)
)
df_state_postal_split.columns = ["State Code", "Postal Code"]
return pd.concat([df_to_process, df_address_split, df_state_postal_split], axis=1)
def extract_product_pack_information(df_to_process: DataFrame) -> DataFrame:
df_to_process["Pack Information"] = (
df_to_process["Product"].str.extract(r".*((.*)).*").fillna("Not Pack")
)
return df_to_process
def one_hot_encode_product_column(df_to_process: DataFrame) -> DataFrame:
return pd.get_dummies(df_to_process, columns=["Product"])
def process_raw_data(file_path: str, chunk_size: int) -> DataFrame:
df = read_raw_data(file_path=file_path, chunk_size=chunk_size)
return (
df.pipe(split_purchase_address)
.pipe(extract_product_pack_information)
.pipe(one_hot_encode_product_column)
)
Next, we are able to start with implementing our first data validation test. Should you’re going to follow along in a notebook or IDE, you must import the next in a brand new file (or in one other cell in your notebook):
import pandas as pd
import numpy as np
import pytest
from pandas import DataFrame
from data_processing import (
read_raw_data,
split_purchase_address,
extract_product_pack_information,
one_hot_encode_product_column,
)
from pandas.testing import assert_series_equal, assert_index_equal
You possibly can read more on tips on how to actually run pytest (naming conventions for files and the way tests are discovered here, but for our case, all you should do is create a brand new file called test_data_processing.py
and in your IDE as you add to the file you simply can run pytest
and optionally with “- -verbose”.
Quick Introduction to pytest and Easy Data Validation Check
Pytest is a testing framework in Python that makes it easy for you to put in writing tests in your data pipelines. You possibly can primarily make use of the assert statement, which essentially checks if a condition you place after assert
evaluates to True or False. If it evaluates to False, it should raise an exception AssertionError
(and when used with pytest will cause the test to fail).
So first, let’s test something easy. All we’re going to do is check if the output of certainly one of our functions (the primary one to read the raw data) returns a DataFrame.
As a fast aside, you’ll notice in the unique function we write the arrow ->
syntax so as to add type hints to the function where we are saying that the function should return a DataFrame. Because of this for those who write in your function to return something aside from a DataFrame, your IDE will flag it as returning an invalid output (but this won’t technically break your code or prevent it from running).
To truly check if the function returns a DataFrame, we’ll implement a function to check the read_raw_data
function and just call it test_read_raw_data
.
def test_read_raw_data():
"""Testing output of raw table read in is DataFrame"""
test_df = read_raw_data(file_path="Updated_sales.csv", chunk_size=1000)
assert isinstance(test_df, DataFrame) # checking if it is a DataFrame
On this function, we add a one-line docstring to clarify that our test function is just checking if the output is a DataFrame. Then, we assign the output of the present read_raw_data
function to a variable and use isinstance
to return True or False if the desired object is of the sort you set in. On this case, we check if the test_df
is a DataFrame
.
We will similarly do that for the remainder of our functions that just take a DataFrame as input and are expected to return a DataFrame as output. Implementing it may possibly seem like this:
def test_pipe_functions_output_df():
"""Testing output of raw table read in is DataFrame"""
test_df = read_raw_data(file_path="Updated_sales.csv", chunk_size=1000)
all_pipe_functions = [
split_purchase_address,
extract_product_pack_information,
one_hot_encode_product_column,
]
for function in all_pipe_functions:
assert isinstance(function(test_df), DataFrame)
Note which you can also use the assert
statement in a for loop, so we just undergo each of the functions, passing in a DataFrame as input and checking to see if the output can be a DataFrame.
Implementing fixtures in pytest for more efficient testing
You possibly can see above that we had to put in writing the very same line twice in our two different test functions:
test_df = read_raw_data(file_path="Updated_sales.csv", chunk_size=1000)
It’s because for each test functions, we wanted a DataFrame as input for our test to envision if the output of our data processing functions resulted in a DataFrame. So you possibly can avoid copying the identical code in all of your test functions, you need to use fixtures, which allow you to write some code that pytest will allow you to reuse in your different tests. Doing so looks like this:
@pytest.fixture
def test_df() -> DataFrame:
return read_raw_data(file_path="Updated_sales.csv", chunk_size=1000)def test_read_raw_data(test_df):
"""Testing output of raw table read in is DataFrame"""
assert isinstance(test_df, DataFrame) # checking if it is a DataFrame
def test_pipe_functions_output_df(test_df):
"""Testing output of raw table read in is DataFrame"""
all_pipe_functions = [
split_purchase_address,
extract_product_pack_information,
one_hot_encode_product_column,
]
for function in all_pipe_functions:
assert isinstance(function(test_df), DataFrame)
We define the test_df
in a function this time that returns the raw DataFrame. Then, in our test functions, we just include test_df
as a parameter and we are able to use it just as we did before.
Next, let’s get into checking our split_purchase_address
function, which essentially outputs the identical DataFrame passed as input but with additional address columns. Our test function will seem like this:
def test_split_purchase_address(test_df):
"""Testing multiple columns in output and rows unchanged"""
split_purchase_address_df = split_purchase_address(test_df)
assert len(split_purchase_address_df.columns) > len(test_df.columns)
assert split_purchase_address_df.index.__len__() == test_df.index.__len__()
assert_index_equal(split_purchase_address_df.index, test_df.index) # using the Pandas testing
Here, we’ll check two major things:
- Does the output DataFrame have more columns than the unique DataFrame?
- Does the output DataFrame have a special index than the unique DataFrame?
First, we run the split_purchase_address
function, passing the test_df
as input and assigning the result to a brand new variable. This offers us the output of the unique function that we are able to then test.
To truly do the test, we could check if a particular column exists within the output DataFrame, but a less complicated (not necessarily higher) way of doing it’s just checking if the output DataFrame has more columns than the unique with the assert
statement. Similarly, we are able to assert
if the length of the index for every of the DataFrames is similar.
You may also check the Pandas testing documentation for some built-in testing functions, but there are only a couple of functions that essentially just check if two of a DataFrame, index, or Series are equal. We use the assert_index_equal
function to do the identical thing that we do with the index.__len__()
.
As mentioned before, we can even check if a DataFrame accommodates a particular column. We’ll move on to the subsequent function extract_product_pack_information
which should all the time output the unique DataFrame with a further column called “Pack Information”. Our test function will seem like this:
def test_extract_product_pack_information(test_df):
"""Test specific output column in latest DataFrame"""
product_pack_df = extract_product_pack_information(test_df)
assert "Pack Information" in product_pack_df.columns
Here, all we do is call columns
again on the output of the unique function, but this time check specifically if the “Pack Information” column is within the list of columns. If for some reason we edited our original extract_product_pack_information
function to return additional columns or renamed the output column, this test would fail. This is able to be reminder to envision if what whatever we used the ultimate data for (like a machine learning model) also took that into consideration.
We could then make do two things:
- Make changes downstream in our code pipeline (like code that refers back to the “Pack Information” column);
- Edit our tests to reflect the changes in our processing function.
One other thing we needs to be doing is checking to see if the DataFrame returned by our functions has columns of our desired data types. For instance, if we’re doing calculations on numerical columns, we should always see if the columns are returned as an int
or float
, depending on what we want.
Let’s test data types on our one_hot_encode_product_column
function, where we do a typical step in feature engineering on certainly one of the explicit columns in the unique DataFrame. We expect all of the columns to be of the uint8
DataType (what the get_dummies
function in Pandas returns by default), so we are able to test that like this.
def test_one_hot_encode_product_column(test_df):
"""Testing if column types are correct"""
encoded_df = one_hot_encode_product_column(test_df)
encoded_columns = [column for column in encoded_df.columns if "_" in column]
for encoded_column in encoded_columns:
assert encoded_df[encoded_column].dtype == np.dtype("uint8")
The output of the get_dummies
function also returns columns which have an underscore (this, after all, could possibly be done higher by checking the actual column names- like within the previous test function we check for specific columns).
Here, all we’re doing is in a for loop of goal columns checking if all of them are of the np.dtype("uint8")
data type. I checked this previously by just in a notebook checking the information sort of certainly one of the output columns like column.dtype
.
One other good practice along with testing the person functions you will have that make up your data processing and transformation pipelines is testing the ultimate output of your pipeline.
To accomplish that, we’ll simulate running our entire pipeline within the test, after which check the resulting DataFrame.
def test_process_raw_data(test_df):
"""Testing the ultimate output DataFrame as a final sanity check"""
processed_df = (
test_df.pipe(split_purchase_address)
.pipe(extract_product_pack_information)
.pipe(one_hot_encode_product_column)
)# check if all original columns are still in DataFrame
for column in test_df.columns:
if column not in processed_df.columns:
raise AssertionError(f"COLUMN -- {column} -- not in final DataFrame")
assert all(
element in list(test_df.columns) for element in list(processed_df.columns)
)
# check if final DataFrame doesn't have duplicates
assert assert_series_equal(
processed_df["Order ID"].drop_duplicates(), test_df["Order ID"]
)
Our final test_process_raw_data
will check for 2 final things:
- Checking if the unique columns are still present in the ultimate DataFrame — this isn’t all the time a requirement, but it surely is perhaps that you just want all of the raw data to still be available (and never transformed) in your output. Doing so is simple- we just need to envision if the column within the
test_df
remains to be present within theprocessed_df
. Finally, we are able to this time raise anAssertionError
(similarly to simply using anassert
statement) if the column isn’t present. This can be a nice example of how you possibly can output a particular message in your tests when needed. - Checking if the ultimate DataFrame doesn’t have any duplicates — there are quite a lot of other ways you possibly can do this- on this case, we’re just using the “Order ID” (which we expect to be like an index) and the
assert_series_equal
to see if the output DataFrame didn’t generate any duplicate rows.
Checking the pytest output
For a fast take a look at what running pytest looks like, in your IDE just run:
pytest --verbose
Pytest will check the brand new test file with all of the test functions and run them! This is a straightforward implementation of getting a series of knowledge validation and testing checks in your data processing pipeline. Should you run the above, the output should look something like this:
You possibly can see that our final test failed, specifically the a part of the test where we check if the entire columns from the initial DataFrame are present in the ultimate. Also that our custom error message within the AssertionError
we defined earlier is populating appropriately—that the “Product” column from our original DataFrame isn’t showing up in the ultimate DataFrame (see for those who can find why based on our initial data processing functions).
There’s quite a bit more room to enhance on this testing—we just have a very easy implementation with basic testing and data validation cases. For more complex pipelines, you could need to have quite a bit more testing each in your individual data processing functions, in addition to in your raw and final output DataFrames to be certain that the information you find yourself using is data you possibly can trust.