This is a very basic example on using Luigi as a task pipeline
It is incredibly easy to write a script to process some data in python. But if you have a lot of tasks that depend on each other, and you need to create a robust work flow, then thinking in terms of a data pipeline is useful.
Luigi is a framework for building data pipelines, and managing workflows. The onus of setting up each unit of work is on you as the developer. Luigi taks care of resolving dependencies, manages the overall workflow, and most importantly handles failures. As a bonus Luigi provides a rather nice visualization tool and a command line interface.
In Luigi, a data pipeline is built by defining Task instances. For every Task, you can define its dependency by specifying the requires method for the Task. Every Task can define an output method to specify the Target where the results of the Task should go. Lets look at a simple example to get our feet wet, and gradually build complex cases.
This example is rather self-explanatory. I use the MockFile class as the Target just so that I can print to console. One can instead use luigi.LocalFileTarget(filename) to use the file system as the target. The main_task_cls specifies SimpleTask as the task to run. The actual processing part of the task is encapsulated in the run method of the SimpleTask class.
When the script is executed, you should see an output that looks like this:
DEBUG: Checking if SimpleTask() is complete INFO: Scheduled SimpleTask() (PENDING) INFO: Done scheduling tasks INFO: Running Worker with 1 processes DEBUG: Asking scheduler for work... DEBUG: Pending tasks: 1 INFO: [pid 30338] Worker Worker(salt=329921834, host=G-ubuntu, username=myuser, pid=30338) running SimpleTask() SimpleTask: Hello World! INFO: [pid 30338] Worker Worker(salt=329921834, host=G-ubuntu, username=myuser, pid=30338) done SimpleTask() DEBUG: 1 running tasks, waiting for next task to finish DEBUG: Asking scheduler for work... INFO: Done INFO: There are no more tasks to run at this time INFO: Worker Worker(salt=329921834, host=G-ubuntu, username=myuser, pid=30338) was stopped. Shutting down Keep-Alive thread
There you go! You have learnt a basic example.
The above example was a good starter example. Though we did not really do much in terms of building a pipeline. Lets modify the above code a little bit, so we can build a pipeline.
This example is built on top of the "Hello World" example from above. The SimpleTask outputs the text "Hello World!". The DecoratedTask takes this output from SimpleTask and prefixes with the word "Decorated".
I use the MockFile here just as a way to see the results on the console. This is usually a good testing tool, though I am not sure if it is production ready approach.
The output for the above example would look like:
DEBUG: Checking if DecoratedTask() is complete INFO: Scheduled DecoratedTask() (PENDING) DEBUG: Checking if SimpleTask() is complete INFO: Scheduled SimpleTask() (PENDING) INFO: Done scheduling tasks INFO: Running Worker with 1 processes DEBUG: Asking scheduler for work... DEBUG: Pending tasks: 2 INFO: [pid 30648] Worker Worker(salt=907129510, host=G-ubuntu, username=myuser, pid=30648) running SimpleTask() SimpleTask: Hello World! INFO: [pid 30648] Worker Worker(salt=907129510, host=G-ubuntu, username=v, pid=30648) done SimpleTask() DEBUG: 1 running tasks, waiting for next task to finish DEBUG: Asking scheduler for work... DEBUG: Pending tasks: 1 INFO: [pid 30648] Worker Worker(salt=907129510, host=G-ubuntu, username=myuser, pid=30648) running DecoratedTask() DecoratedTask: Decorated Hello World! INFO: [pid 30648] Worker Worker(salt=907129510, host=G-ubuntu, username=myuser, pid=30648) done DecoratedTask() DEBUG: 1 running tasks, waiting for next task to finish DEBUG: Asking scheduler for work... INFO: Done INFO: There are no more tasks to run at this time INFO: Worker Worker(salt=907129510, host=G-ubuntu, username=myuser, pid=30648) was stopped. Shutting down Keep-Alive thread
Here we learnt a really basic example that should give you some sense of building Luigi based task pipelines.