266-x Multiple Arguments in pipeline.task()

Hi, in the last slide of Building A Pipeline Class

There are a couple drawbacks with our pipeline that we did not discuss. First, we can only input one argument for each task, and second, we only have a linear dependency mapping between the tasks. In the next mission, we will expand on this pipeline, and enhance it with task scheduling, a proper dependency tree, and other options.

but didn’t catch in Multiple Dependencies Pipeline how to actually pass multiple arguments between tasks?

Thanks and sorry if this is obvious!

Hey, Scott.

Can you please take a look at this guide on how to ask a good question and edit your post? By making it easier for other people to understand your question, you’ll make it easier for them to answer you.

Ok let me know if you can help

1 Like

Hi @scott.j.simontacchi,

I believe that I understood your question now. In my answer, I will be assuming that we are using the final version of the Pipeline class that was developed in that mission:

from collections import deque

class DAG:

    def __init__(self):
        self.graph = {}
            
    def in_degrees(self):
        self.degrees = {}
        for node in self.graph:
            if node not in self.degrees:
                self.degrees[node] = 0
            for pointed in self.graph[node]:
                if pointed not in self.degrees:
                    self.degrees[pointed] = 0
                self.degrees[pointed] += 1
    
    def sort(self):
        self.in_degrees()
        to_visit = deque()
        for node in self.graph:
            if self.degrees[node] == 0:
                to_visit.append(node)
        searched = []
        while to_visit:
            node = to_visit.popleft()
            for pointer in self.graph[node]:
                self.degrees[pointer] -= 1
                if self.degrees[pointer] == 0:
                    to_visit.append(pointer)
            searched.append(node)
        return searched
    
    def add(self, node, to=None):
        if not node in self.graph:
            self.graph[node] = []
        if to:
            if not to in self.graph:
                self.graph[to] = []
            self.graph[node].append(to)
        if len(self.sort()) != len(self.graph):
            raise Exception

    def __str__(self):
        s = ''
        for node in self.graph:
            s += '{}: {}\n'.format(node, self.graph[node])
        return s

class Pipeline:

    def __init__(self):
        self.tasks = DAG()
        
    def task(self, depends_on=None):
        def inner(f):
            self.tasks.add(f)
            if depends_on:
                self.tasks.add(depends_on, f)
            return f
        return inner
    
    def run(self):
        scheduled = self.tasks.sort()
        completed = {}
        for task in scheduled:
            for node, values in self.tasks.graph.items():
                if task in values:
                    completed[task] = task(completed[node])
            if task not in completed:
                completed[task] = task()
        return completed

Using the above Pipeline class, if some task1 depends on task2 then the output of task1 will be used as the input of task2. Therefore, if you output several values in task1 then you will have several inputs on task2.

Example:

pipeline = Pipeline()

@pipeline.task()
def task1():
    print('running task1')
    return 1, 2  # task with two outputs

@pipeline.task(depends_on=task1)
def task2(args):
    print('running task2')
    print('input1:', args[0])
    print('input2:', args[1])

pipeline.run()

If you run this you will see that the two outputs from task1 are passed as a tuple to task2 and can be accessed as args[0] and args[1]:

running task1
running task2
input1: 1
input2: 2

Let me know if this helps :slight_smile:

1 Like

Very cool! Is there a way to do this with kwargs instead for readability? Does the task method just need to be edited or do you think it is in the DAG? Thanks!! I’m new to decorators and still wrapping my head around them :slight_smile:

@scott.j.simontacchi,

I think (but am not 100% sure) that with the current implementation it would not be possible. More, I think that it would involve significant changes.

What I would do is to return a dictionary rather than returning several values using meaningfull keys for the values that we want to return. Then you can access the values using those keys so the code is more readable. Here is an example:

pipeline = Pipeline()

@pipeline.task()
def task1():
    print('running task1')
    return {'output1': 1, 'output2': 2}

@pipeline.task(depends_on=task1)
def task2(args):
    print('running task2')
    print('input1:', args['output1'])
    print('input2:', args['output2'])

pipeline.run()

In a real case, output1 and output2 would be well selected names so that the code is more readable.