2011-03-28 by Stefan Urbanek

Forking Forks with Higher Order Messaging

New way of constructing streams has been implemented which uses “higher order messaging”. What does that mean? Instead of constructing the stream from nodes and connections, you “call” functions that process your data. You pretend in your script that you work with data using functions:

...
main.csv_source("data.csv")
main.sample(1000)
main.aggregate(keys = ["year"])
main.formatted_printer()
...

Executing the functions as written in a script might be be very expensive from time and memory perspective. What is in fact happening is that instead of executing the data processing functions a stream network is being constructed.

Construction

Construction is being done by using forked branches. You create an empty forked branch by forking an empty stream:

from brewery.streams import *

stream = Stream()
main = stream.fork()
...

Now you have fork main. Each function call on main will append new processing node to the fork and the new node will be connected to the previous node of the fork.

Fork Construction

Function names are based on node names in most of the cases. There might be custom function names for some nodes in the future, but now there is just simple rule:

  1. decamelize node name: CSVSourceNode to csv source node
  2. replace spaces with underscores: csv_source_node
  3. remove ‘node’ suffix: csv_source

Arguments to the function are the same as arguments for node constructor. If you want to do more node configuration you can access current node with node attribute of the fork:

main.node.keys = ["country"]

Running

Run the stream as if it was constructed manually from nodes and connections:

stream.run()

Forking

So far you are able to construct single simple stream from a source to a target. There are plenty of situations where linear processing is not sufficient and you will need to have branches. To create another branch, you fork() a fork. For example, to attach data audit to the stream insert following code right after the node you want to audit:

# we are in main at node after which we want to have multiple branches

audit = main.fork()
audit.audit()
audit.value_threshold(...)
audit.formatted_printer(...)

# continue main.* branch here...

Example

Here is full example how to use forking with HOM in Brewery:

# Create the stream and empty fork
stream = Stream()
main = stream.fork()

# Start adding nodes by pretending that we are processing using functions
main.csv_source("data.csv", read_header = True, encoding = "utf-8")
main.coalesce_value_to_type()

# Create another fork for data audit:
audit = main.fork()
audit.audit(distinct_threshold = None)
audit.value_threshold(thresholds = [ ["null_record_ratio", 0.01] ],
                        bin_names = ("ok", "fail"))

audit.formatted_printer()
audit.node.header = u"field                            nulls     status   distinct\n" \
                         "------------------------------------------------------------"
audit.node.format = u"{field_name:7.2%} {null_record_ratio_bin:>10} {distinct_count:>10}"

# ...and we continue in main branch
main.database_table_target(url = "postgres://localhost/sandbox", 
                            table = "data",
                            create = True,
                            replace = True,
                            add_id_key = True)

# Run the stream and pretty-print the exception
try:
    stream.run()
except pipes.StreamRuntimeError, e:
    e.print_exception()

The constructed stream looks like this:

Fork Example Stream