Ozelot 0.2.2 and Recursive Task Clearing

Posted on Fri 01 December 2017 in articles

Ozelot version 0.2.2 has been released yesterday. This release contains a few bug fixes and improvements. Most notably, it fixes the order in which tasks are cleared when using ozelot.etl.tasks.check_completion(...) with the clear=True flag.

As always, you can find the latest code on GitHub, while the documentation is on readthedocs.

Let me elaborate a bit on the task clearing fix: Let's assume that your ORM model contains a class Employee describing an employee and a class Unit describing organizational units of a company. An Employee instance references its respective Unit instance via a foreign key reference. Let's further assume that there is a task LoadUnits that loads organizational units and their metadata, and a task LoadEmployees that loads employees and links each employee to its respective Unit instance. As a result, LoadEmployees must have LoadUnits as a requirement, since we cannot load the employees and link to the units before we have imported the latter. Finally, let there be a top-level LoadEverything wrapper task to run the whole pipeline.

Now assume you need to change the structure of your Unit class and re-run LoadUnits. In order to do that, you want to re-process only those parts of your pipeline that depend on LoadUnits. You can do this by running (imports for models and tasks described above are omitted):

from ozelot.etl import tasks
import luigi

LoadUnits.mark_incomplete()
tasks.check_completion(LoadEverything(), clear=True)
luigi.build([LoadEverything()])

This would mark LoadUnits as incomplete, check recursively to find all now-incomplete tasks, clear their output, and re-build the necessary parts of the pipeline.

The catch lies in the order in which check_completion() has to clear the tasks. The recursive completion checking works 'upstream', i.e. it starts at LoadEverything and works its way recursively through all requirements and their requirements, to find incomplete tasks. However, once incomplete tasks are found, they must not be cleared right away.

The recursive task checking will end up finding LoadUnits as the first incomplete task. 'Downstream' tasks are only marked as incomplete later, when running back towards LoadEverything. However, trying to clear LoadUnits right away, which means deleting all Unit objects, will result in failed foreign key constraints: there would be Employee instances left over pointing at non-existent Unit instances.

Instead, once all to-be-cleared tasks have been identified, clearing has to be performed in 'upstream' direction': starting at LoadEverything and running towards LoadUnits. While looping over the to-be-cleared tasks several times, at each iteration only those tasks can be cleared that are required by no other task. This has now been implemented. Apologies for any confusion caused by the previous code.