Cornellius Yudha Wijaya
2025-05-05 10:00:00
www.kdnuggets.com

Data is a vital asset that businesses utilize to gain a competitive edge. With advancements in technology, it has become much easier to collect and store data. The problem is that the abundance of data hinders processing, as it becomes slower with larger data sizes.
To enhance data processing, several tools can assist, including Dask. Dask is a powerful Python library that provides a Pandas-compatible API to scale data processing via parallel, out-of-core computation. It handles large datasets by partitioning workflows into smaller batches and executing them concurrently across multiple cores or machines.
As Dask is a valuable tool, it was wise to learn how to establish an end-to-end data pipeline that any data professional can use. That’s why this article will teach you how to set up the data pipeline with Dask.
Let’s get into it.
Preparation
For this tutorial to work, we need to set up a few things. First, we will establish a database to store our data. In this case, we will use MySQL as the database; therefore, simply download it and follow the standard installation instructions.
For the dataset, we will utilise the Data Scientist Salary dataset, which is publicly available on Kaggle. Save the data in a folder called ‘data’ and leave it for now.
Next, set up the environment by creating a virtual environment using the following code.
python -m venv dask_pipeline
You can choose other names for your virtual environment, but I prefer a self-explanatory name. Activate the virtual environment and create a requirements.txt file, which will be populated with the necessary libraries for the project.
dask[complete]
pandas
numpy
sqlalchemy
PyMySQL
luigi
python-dotenv
setuptools
Once the file is ready, we will install the libraries using the following code.
pip install -r requirements.txt
Then, create a file called ‘.env’, where we will store all the variables used in this project, primarily for database access. Fill the file with the following information:
DB_USER=your_username
DB_PASS=your_password
DB_HOST=localhost
DB_PORT=3306
DB_NAME=analytics
Then, create a file called config.py
, which will be used for connecting to the database.
from dotenv import load_dotenv
import os
load_dotenv()
DB_USER = os.getenv("DB_USER")
DB_PASS = os.getenv("DB_PASS")
DB_HOST = os.getenv("DB_HOST")
DB_PORT = os.getenv("DB_PORT")
DB_NAME = os.getenv("DB_NAME")
CONN_STR = (
f"mysql+pymysql://{DB_USER}:{DB_PASS}@"
f"{DB_HOST}:{DB_PORT}/{DB_NAME}"
)
With everything in place, we will then create our end-to-end data pipelines with Dask.
Data Pipeline with Dask
To set up the data pipeline, we will utilize the Luigi Python library, which is typically used to build complex pipelines for batch jobs. In our case, it will be used to develop a pipeline that utilizes Dask to ingest CSV data into the database, transform it using Dask,, and load it back into the database.
Let’s start creating the pipeline by setting up the code for creating the database which I will create in the Python file called luigi_pipeline.py
. We will import all the necessary libraries and create a task to establish a database.
import luigi
from luigi import LocalTarget, Parameter, IntParameter
from sqlalchemy import create_engine, text
import pandas as pd
from dask import delayed
import dask.dataframe as dd
from config import DB_USER, DB_PASS, DB_HOST, DB_PORT, DB_NAME, CONN_STR
class CreateDatabase(luigi.Task):
def output(self):
return LocalTarget("tmp/db_created.txt")
def run(self):
engine = create_engine(
f"mysql+pymysql://{DB_USER}:{DB_PASS}@{DB_HOST}:{DB_PORT}/"
)
with engine.connect() as conn:
conn.execute(text(f"CREATE DATABASE IF NOT EXISTS {DB_NAME}"))
self.output().makedirs()
with self.output().open("w") as f:
f.write("ok")
The code above will create a new database when run if the database name doesn’t exist previously. We will use the class above in the CSV ingestion pipeline with Dask that we will set up below.
class IngestCSV(luigi.Task):
csv_path = Parameter()
table_name = Parameter(default="ds_salaries")
def requires(self):
return CreateDatabase()
def output(self):
return LocalTarget("tmp/ingest_done.txt")
def run(self):
url_no_db = f"mysql+pymysql://{DB_USER}:{DB_PASS}@{DB_HOST}:{DB_PORT}/"
engine0 = create_engine(url_no_db)
with engine0.connect() as conn:
conn.execute(text(f"CREATE DATABASE IF NOT EXISTS {DB_NAME}"))
ddf = dd.read_csv(self.csv_path, assume_missing=True)
engine = create_engine(CONN_STR)
empty = ddf.head(0)
empty.to_sql(self.table_name, con=engine, if_exists="replace", index=False)
def append_part(pdf):
pdf.to_sql(self.table_name, con=engine, if_exists="append", index=False)
ddf.map_partitions(append_part, meta=()).compute()
with self.output().open("w") as f:
f.write("ok")
In the code above, we use Dask to read the CSV files and send them to the database. We are using Dask to enhance the reading process and make it more manageable to send the data to the database.
As a part of the pipeline, we will put the CSV ingestion into the ETL transformation using the code below.
class TransformETL(luigi.Task):
csv_path = Parameter()
table_name = Parameter(default="ds_salaries")
chunk_size = IntParameter(default=100_000)
def requires(self):
return IngestCSV(csv_path=self.csv_path,
table_name=self.table_name)
def output(self):
return LocalTarget("tmp/etl_done.txt")
def run(self):
engine = create_engine(CONN_STR)
# 1. Count total rows for chunking
with engine.connect() as conn:
total = conn.execute(
text(f"SELECT COUNT(*) FROM {self.table_name}")
).scalar()
# 2. Build delayed partitions
@delayed
def load_chunk(offset, limit):
return pd.read_sql(
f"SELECT * FROM {self.table_name} LIMIT {limit} OFFSET {offset}",
engine
)
parts = [
load_chunk(i * self.chunk_size, self.chunk_size)
for i in range((total // self.chunk_size) + 1)
]
# 3. Load zero‐row metadata and cast to correct dtypes
meta = (
pd.read_sql(f"SELECT * FROM {self.table_name} LIMIT 0", engine)
.astype({
"work_year": "int64",
"salary": "float64",
"salary_in_usd": "float64",
"remote_ratio": "int64",
# leave the rest as object
})
)
# 4. Create Dask DataFrame with corrected meta
ddf = dd.from_delayed(parts, meta=meta)
# 5. Filter & clean
ddf = (
ddf
.dropna(subset=["salary_in_usd"])
.assign(
salary_in_usd=ddf["salary_in_usd"].astype(float)
)
)
# 6. Keep only full-time roles
ddf = ddf[ddf["employment_type"] == "FT"]
# 7. Compute salary bracket at 10k USD
bracket = (ddf["salary_in_usd"] // 10_000).astype(int) * 10_000
ddf = ddf.assign(salary_bracket=bracket)
# 8. Aggregate: average salary by year
result = (
ddf.groupby("work_year")["salary_in_usd"]
.mean()
.compute()
.reset_index()
.rename(columns={"salary_in_usd": "avg_salary_usd"})
)
# 9. Persist results
result.to_sql("avg_salary_by_year",
con=engine, if_exists="replace", index=False)
with self.output().open("w") as f:
f.write("ok")
The code above performs multiple tasks using Dask to transform the data we have. Specifically, here is what Dask does within the pipeline:
- Load the dataset from the database in chunks.
- Set the metadata and create a Dask dataframe.
- Filter and data cleaning with Dask.
- Data Transformation with Dask.
- Load the data into the Database.
The data pipeline is then ready to use, and we can execute it with Python using the code below.
python luigi_pipeline.py TransformETL --csv-path data\ds_salaries.csv
You will receive the output information as follows.
===== Luigi Execution Summary =====
Scheduled 1 tasks of which:
* 1 complete ones were encountered:
- 1 TransformETL(csv_path=data\ds_salaries.csv, table_name=ds_salaries, chunk_size=100000)
Did not run any tasks
This progress looks :) because there were no failed tasks or missing dependencies
Then, you can check the Luigi UI to see if the pipeline is working well.
You can see the dashboard output in the image below.
If it’s successful, you can see that the pipeline has executed successfully, and you can check the result in your database.
SELECT * FROM analytics.avg_salary_by_year;
Where the output is shown below.
With that, you just built an end-to-end data pipeline with Dask. All the code is stored in the following GitHub repository.
Conclusion
Building a data pipeline is a crucial skill for data professionals, particularly when utilizing Dask, as it is a tool that enhances data processing and manipulation. In this article, we have learned how to build the end-to-end data pipeline from ingesting data to loading it back into the database.
I hope this has helped!
Cornellius Yudha Wijaya is a data science assistant manager and data writer. While working full-time at Allianz Indonesia, he loves to share Python and data tips via social media and writing media. Cornellius writes on a variety of AI and machine learning topics.
Transform your cleaning routine with the Shark AI Ultra Voice Control Robot Vacuum! This high-tech marvel boasts over 32,487 ratings, an impressive 4.2 out of 5 stars, and has been purchased over 900 times in the past month. Perfect for keeping your home spotless with minimal effort, this vacuum is now available for the unbeatable price of $349.99!
Don’t miss out on this limited-time offer. Order now and let Shark AI do the work for you!
Help Power Techcratic’s Future – Scan To Support
If Techcratic’s content and insights have helped you, consider giving back by supporting the platform with crypto. Every contribution makes a difference, whether it’s for high-quality content, server maintenance, or future updates. Techcratic is constantly evolving, and your support helps drive that progress.
As a solo operator who wears all the hats, creating content, managing the tech, and running the site, your support allows me to stay focused on delivering valuable resources. Your support keeps everything running smoothly and enables me to continue creating the content you love. I’m deeply grateful for your support, it truly means the world to me! Thank you!
BITCOIN bc1qlszw7elx2qahjwvaryh0tkgg8y68enw30gpvge Scan the QR code with your crypto wallet app |
DOGECOIN D64GwvvYQxFXYyan3oQCrmWfidf6T3JpBA Scan the QR code with your crypto wallet app |
ETHEREUM 0xe9BC980DF3d985730dA827996B43E4A62CCBAA7a Scan the QR code with your crypto wallet app |
Please read the Privacy and Security Disclaimer on how Techcratic handles your support.
Disclaimer: As an Amazon Associate, Techcratic may earn from qualifying purchases.