Not every data pipeline needs Databricks or Airflow. Sometimes you just need a Python script that runs on a schedule, pulls data from an API, transforms it, and loads it into a database. No orchestration framework. No managed platform. Just a $6/month server that quietly does its job.
I run several lightweight ETL pipelines exactly this way — on a simple VPS with cron and Python. It's the most cost-effective setup for small to mid-size data jobs that don't justify a full orchestration platform. In this guide, I'll walk you through setting up a production-ready scheduled pipeline from scratch.
What you'll need
- A VPS (I use DigitalOcean — $6/month gets you 1GB RAM, 25GB SSD, which handles most ETL jobs easily)
- Python 3.10+
- A data source (API, CSV, database)
- A target database or data store
- Basic familiarity with Linux and SSH
Step 1: Set up your VPS
If you don't already have a server, spin one up. I recommend DigitalOcean Droplets because the setup takes about 60 seconds and pricing is predictable — no surprise bills like you sometimes get with AWS.
Create a Droplet with these specs:
- Image: Ubuntu 24.04 LTS
- Plan: Basic, Regular, $6/month (1 vCPU, 1GB RAM, 25GB SSD)
- Region: Pick the one closest to your data source
- Authentication: SSH key (always use SSH keys, never password auth)
Once it's live, SSH in:
ssh root@your-server-ip
First thing — update the system and install Python:
apt update && apt upgrade -y
apt install python3 python3-pip python3-venv -y
Create a dedicated user for running pipelines (don't run ETL as root):
adduser etl-runner
usermod -aG sudo etl-runner
su - etl-runner
Step 2: Structure your pipeline project
A clean project structure makes your pipeline maintainable and debuggable. Here's what I use:
~/pipelines/
├── daily_sales_sync/
│ ├── .env # credentials (never commit this)
│ ├── config.py # configuration and env loading
│ ├── extract.py # data extraction logic
│ ├── transform.py # data transformation logic
│ ├── load.py # data loading logic
│ ├── pipeline.py # main orchestrator
│ ├── requirements.txt # dependencies
│ └── logs/ # log files
└── shared/
├── db.py # shared database connection helpers
└── notifications.py # alerting (Slack, email, etc.)
Set up the virtual environment:
mkdir -p ~/pipelines/daily_sales_sync
cd ~/pipelines/daily_sales_sync
python3 -m venv venv
source venv/bin/activate
pip install requests psycopg2-binary python-dotenv pandas
pip freeze > requirements.txt
Step 3: Write the pipeline
Here's a real-world example — an ETL that extracts data from a REST API, transforms it, and loads it into a PostgreSQL database.
config.py
import os
from dotenv import load_dotenv
load_dotenv()
API_URL = os.getenv("API_URL")
API_KEY = os.getenv("API_KEY")
DB_HOST = os.getenv("DB_HOST")
DB_NAME = os.getenv("DB_NAME")
DB_USER = os.getenv("DB_USER")
DB_PASSWORD = os.getenv("DB_PASSWORD")
DB_PORT = os.getenv("DB_PORT", "5432")
extract.py
import requests
from config import API_URL, API_KEY
def extract_data(endpoint, params=None):
"""Extract data from API with error handling."""
headers = {"Authorization": f"Bearer {API_KEY}"}
response = requests.get(
f"{API_URL}/{endpoint}",
headers=headers,
params=params,
timeout=30
)
response.raise_for_status()
data = response.json()
print(f"Extracted {len(data)} records from {endpoint}")
return data
transform.py
import pandas as pd
from datetime import datetime
def transform_sales(raw_data):
"""Clean and transform raw sales data."""
df = pd.DataFrame(raw_data)
# Drop duplicates on business key
df = df.drop_duplicates(subset=["order_id"], keep="last")
# Clean data types
df["amount"] = pd.to_numeric(df["amount"], errors="coerce").fillna(0)
df["order_date"] = pd.to_datetime(df["order_date"], errors="coerce")
# Filter out invalid records
df = df.dropna(subset=["order_date"])
# Add metadata
df["loaded_at"] = datetime.utcnow()
print(f"Transformed: {len(df)} clean records")
return df
load.py
import psycopg2
from psycopg2.extras import execute_values
from config import DB_HOST, DB_NAME, DB_USER, DB_PASSWORD, DB_PORT
def get_connection():
"""Create database connection."""
return psycopg2.connect(
host=DB_HOST,
dbname=DB_NAME,
user=DB_USER,
password=DB_PASSWORD,
port=DB_PORT
)
def load_to_postgres(df, table_name):
"""Load DataFrame into PostgreSQL using upsert."""
conn = get_connection()
cursor = conn.cursor()
columns = list(df.columns)
values = [tuple(row) for row in df.values]
insert_query = f"""
INSERT INTO {table_name} ({', '.join(columns)})
VALUES %s
ON CONFLICT (order_id)
DO UPDATE SET
amount = EXCLUDED.amount,
order_date = EXCLUDED.order_date,
loaded_at = EXCLUDED.loaded_at
"""
execute_values(cursor, insert_query, values)
conn.commit()
print(f"Loaded {len(values)} records into {table_name}")
cursor.close()
conn.close()
pipeline.py (the main orchestrator)
#!/usr/bin/env python3
"""Daily sales sync pipeline."""
import sys
import logging
from datetime import datetime, timedelta
from extract import extract_data
from transform import transform_sales
from load import load_to_postgres
# Set up logging
logging.basicConfig(
filename=f"logs/pipeline_{datetime.now().strftime('%Y%m%d')}.log",
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s"
)
def run():
"""Run the full ETL pipeline."""
start_time = datetime.now()
logging.info("Pipeline started")
try:
# Extract
yesterday = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")
raw_data = extract_data("sales", params={"date": yesterday})
if not raw_data:
logging.info("No new data to process")
return
# Transform
clean_data = transform_sales(raw_data)
# Load
load_to_postgres(clean_data, "sales_fact")
duration = (datetime.now() - start_time).total_seconds()
logging.info(f"Pipeline completed in {duration:.1f}s")
except Exception as e:
logging.error(f"Pipeline failed: {str(e)}", exc_info=True)
# Add your alerting here (Slack webhook, email, etc.)
sys.exit(1)
if __name__ == "__main__":
run()
Step 4: Schedule with cron
Cron is the simplest and most reliable scheduler for Linux. No dependencies, no daemons, no config files — just a one-liner.
Open the crontab editor:
crontab -e
Add a schedule. Here are common patterns:
# Run daily at 6:00 AM UTC
0 6 * * * cd /home/etl-runner/pipelines/daily_sales_sync && /home/etl-runner/pipelines/daily_sales_sync/venv/bin/python pipeline.py
# Run every 6 hours
0 */6 * * * cd /home/etl-runner/pipelines/daily_sales_sync && /home/etl-runner/pipelines/daily_sales_sync/venv/bin/python pipeline.py
# Run every Monday at midnight
0 0 * * 1 cd /home/etl-runner/pipelines/daily_sales_sync && /home/etl-runner/pipelines/daily_sales_sync/venv/bin/python pipeline.py
Two critical things people get wrong with cron:
- Always use the full path to the Python binary inside your venv. Cron doesn't activate your virtual environment — it has a minimal PATH. Using
/home/etl-runner/pipelines/daily_sales_sync/venv/bin/pythonensures the right Python and packages are used. - Always
cdinto the project directory first. Relative paths in your code (like thelogs/folder or.envfile) won't resolve correctly if cron runs from a different working directory.
Verify your crontab was saved:
crontab -l
Step 5: Add monitoring and alerts
A pipeline that fails silently is worse than no pipeline at all. Here's a lightweight monitoring setup that doesn't require any additional tools.
Log rotation
Your log files will grow over time. Add a simple cleanup to your crontab:
# Delete logs older than 30 days, every Sunday
0 0 * * 0 find /home/etl-runner/pipelines/daily_sales_sync/logs -name "*.log" -mtime +30 -delete
Slack notifications on failure
Add a simple webhook notification to your pipeline. Create a notifications.py:
import requests
SLACK_WEBHOOK = "https://hooks.slack.com/services/YOUR/WEBHOOK/URL"
def notify_failure(pipeline_name, error_message):
"""Send a Slack alert when a pipeline fails."""
payload = {
"text": f":red_circle: *Pipeline Failed*\n"
f"*Pipeline:* {pipeline_name}\n"
f"*Error:* {error_message}\n"
f"*Time:* {datetime.now().strftime('%Y-%m-%d %H:%M UTC')}"
}
requests.post(SLACK_WEBHOOK, json=payload, timeout=10)
Then update the except block in pipeline.py:
except Exception as e:
logging.error(f"Pipeline failed: {str(e)}", exc_info=True)
notify_failure("daily_sales_sync", str(e))
sys.exit(1)
Health check endpoint (optional)
For critical pipelines, I use a dead man's switch — a service like UptimeRobot or Healthchecks.io that expects a ping after every successful run. If the ping doesn't arrive, it alerts you:
# Add at the end of a successful run
import requests
requests.get("https://hc-ping.com/your-unique-uuid", timeout=10)
Step 6: Make it production-ready
A few more things that separate a hobby script from a production pipeline.
Environment variables
Never hardcode credentials. Use a .env file:
# .env
API_URL=https://api.example.com/v1
API_KEY=your_secret_key_here
DB_HOST=your-db-host.com
DB_NAME=analytics
DB_USER=etl_user
DB_PASSWORD=your_db_password
DB_PORT=5432
And make sure it's not accessible to other users:
chmod 600 .env
Idempotent design
Your pipeline should produce the same result whether it runs once or five times. The ON CONFLICT ... DO UPDATE in our load function handles this — re-running the pipeline updates existing records instead of creating duplicates. (Sound familiar? That's the same principle from my previous article on fixing duplicate records in Delta tables.)
Resource monitoring
On a $6 VPS, you need to be mindful of memory. Add a check at the top of your pipeline:
import shutil
def check_disk_space(min_gb=2):
"""Ensure enough disk space before running."""
total, used, free = shutil.disk_usage("/")
free_gb = free / (1024 ** 3)
if free_gb < min_gb:
raise RuntimeError(f"Low disk space: {free_gb:.1f}GB free")
When to upgrade from cron + VPS
This setup works great for 1-10 pipelines with simple dependencies. But consider upgrading when:
- You need complex DAGs — if pipeline B must wait for pipeline A to finish, cron gets messy. Switch to Apache Airflow or Prefect.
- You need retries with backoff — cron either runs or doesn't. For retry logic, you'll want an orchestrator.
- You're managing 20+ pipelines — the operational overhead of maintaining individual cron jobs becomes painful.
- You need a UI for monitoring — cron is invisible. Airflow and Prefect give you dashboards.
When that time comes, you can self-host Airflow on a DigitalOcean Droplet with 4GB RAM ($24/month) — still far cheaper than any managed orchestration platform.
Key takeaways
A VPS with cron and Python is the most underrated tool in a data engineer's stack. For $6/month, you get a production-grade pipeline scheduler that runs 24/7 without cold starts, without vendor lock-in, and without a $500/month platform bill.
The setup in this guide — structured project, proper logging, idempotent loads, failure alerts — is the same pattern I use for pipelines that have been running reliably for months. Start simple. Add complexity only when you need it.
If you're dealing with duplicates in your target tables, check out my previous guide on how to fix duplicate records in Delta tables — the deduplication patterns work the same whether you're loading into Delta Lake or PostgreSQL.
Running data pipelines on a budget? DigitalOcean gives you $200 in free credits to get started — that's over 30 months of running the setup in this guide. I use it for all my self-hosted data tools.
Want more practical data engineering guides? Subscribe to PipelinePulse for weekly tutorials on building production pipelines without the enterprise price tag.