Using #Kettle (#Hop) with #Apache #Airflow

A few days ago someone mentioned using airflow to orchestrate their messaging solution, so I figured it was worth a look. In essence, it’s a scheduler that understands the individual steps in your job, and the dependencies therein.  Of course, it scales out to workers, etc etc.  All good.

In fact, i recommend watching a few videos on youtube and you’ll get a feel for it.  Then, whilst searching for something else I came across this blog – Do take a read, it is quite interesting.

This is particularly interesting, reading through it, the barrier to entry was learning all of Airflows operators.  (And learning to love all the bugs/quirks of each one!). They propose a generic operator, and then the developers just concentrate on the code.  In fact the philosophy of airflow is and should be to separate the actual workflow code from the job, so this makes a lot of sense.

Now; You know where i’m going with this.  Kettle IS that generic operator, so this could be incredibly powerful. So lets have a go…  I think the devil in the detail is at what level you allow the orchestration to happen in Airflow and at what level you do it in the job definition.  My personal feeling is that jobs in Kettle should always be self contained anyway, so keeping to that you wont go far wrong.  There’ll always be a need to run jobs independently, so don’t lose that feature – And you don’t want duplication either as they will get out of sync. However Airflow is EXACTLY the scheduler we’ve been looking for to manage the interactions of your 100’s of jobs.  Unfortunately if the job does all of the orchestration you lose the clever bits of Airflow including re-runability/resume and performance tracking.  See, i told you there’s a devil here.  Some hybrid may be the best way to go, where you do replicate a job for manual runs, but break out the stages for airflow.  I would only put really high level breaks, and only very simple logic into airflow however – For example, at this level:

  • Load data to stage
  • Process dimensions
  • Process Facts
    • Could be argued to be broken down further, if you already have rerunnability setup.

So; Having written all that out, and looked at the typical DAGs in Airflow, i have concluded that you can generate a DAG from a Kettle Job, and boom! You have the best of both worlds.  Of course, this would be a post commit hook and happen automatically so they can never get out of sync. #CI

Note: this approach will of course break down if you’re ingesting data in a metadata driven fashion (and therefore, only running one job)  But maybe Airflow has an API and you can report progress that way? One to research…

OK. Let’s get it started.  Well thats easy – Just 4 commands, follow the Quick Start.  If you’re using Anaconda then be sure to install using conda, not pip.  Once done, it starts straight away, talk about easy.  Interesting that the UI is a separate service to the scheduler too. Nice – a sensible separation there.

So how are we to do this? Well the quick and easy approach is of course to use the bash operator and call Kitchen.  There’s some discussion of this approach in StackOverflow

And in fact, it’s super simple.  What I love about Airflow, is the schedule itself is in the code or the DAG. And that means all you need to do to update your schedules is a pull of your latest code.  So here it is:

Screenshot 2019-10-07 at 10.10.17

You’ll notice the schedule is very familiar – it’s basically a cron string. And thats it, our job is now running.  Airflow then captures duration, failure, logs etc etc etc.

Another approach would be to build a new operator I guess, which would do little more than launch kettle anyway. Perhaps pyJava can be used, but i suspect that’s overkill. The advantage of the bash operator would be around separation of the processes, resources etc.

But hang on, there’s a http operator. Why don’t we just call Carte?  Not quite sure how that would work in scaleout terms, perhaps each airflow worker has a single carte running on it, and when you call carte you just always call the localhost one?  Or perhaps you can devise a way to teach airflow about the carte cluster.

So what else is there? Well they’re very dynamic – DAGs can of course instantiate other DAGs.  And the original product came from Airbnb. I guess thats a good thing too? 🙂 . It’s easy to pass parameters of course as well and there’s lots of examples of how to do that.

For anyone putting up with Windows Scheduler, or cron jobs, and the sometimes obtuse places you have to look for logs/problems, then I do recommend taking a closer look at airflow. Let me know how you get on!

PS: The DAG itself is as simple as:

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2015, 6, 1),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG('KettleGetBeerTags', default_args=default_args, schedule_interval=timedelta(days=1))

t1 = BashOperator(
    task_id='RunKettle',
    bash_command='/Users/codek/apps/kettle-remix-20190130/data-integration/kitchen.sh -file=/Users/codek/projects/airflow/jb-extract-beer-tags.kjb',
    dag=dag)

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s