Forking Forks with Higher Order Messaging
New way of constructing streams has been implemented which uses “higher order messaging”. What does that mean? Instead of constructing the stream from nodes and connections, you “call” functions that process your data. You pretend in your script that you work with data using functions:
... main.csv_source("data.csv") main.sample(1000) main.aggregate(keys = ["year"]) main.formatted_printer() ...
Executing the functions as written in a script might be be very expensive from time and memory perspective. What is in fact happening is that instead of executing the data processing functions a stream network is being constructed.
Construction
Construction is being done by using forked branches. You create an empty forked branch by forking an empty stream:
from brewery.streams import * stream = Stream() main = stream.fork() ...
Now you have fork main
. Each function call on main
will append new processing node to the fork and the new node will be connected to the previous node of the fork.
Function names are based on node names in most of the cases. There might be custom function names for some nodes in the future, but now there is just simple rule:
- decamelize node name:
CSVSourceNode
tocsv source node
- replace spaces with underscores:
csv_source_node
- remove ‘node’ suffix:
csv_source
Arguments to the function are the same as arguments for node constructor. If you want to do more node configuration you can access current node with node
attribute of the fork:
main.node.keys = ["country"]
Running
Run the stream as if it was constructed manually from nodes and connections:
stream.run()
Forking
So far you are able to construct single simple stream from a source to a target. There are plenty of situations where linear processing is not sufficient and you will need to have branches. To create another branch, you fork()
a fork. For example, to attach data audit to the stream insert following code right after the node you want to audit:
# we are in main at node after which we want to have multiple branches audit = main.fork() audit.audit() audit.value_threshold(...) audit.formatted_printer(...) # continue main.* branch here...
Example
Here is full example how to use forking with HOM in Brewery:
# Create the stream and empty fork stream = Stream() main = stream.fork() # Start adding nodes by pretending that we are processing using functions main.csv_source("data.csv", read_header = True, encoding = "utf-8") main.coalesce_value_to_type() # Create another fork for data audit: audit = main.fork() audit.audit(distinct_threshold = None) audit.value_threshold(thresholds = [ ["null_record_ratio", 0.01] ], bin_names = ("ok", "fail")) audit.formatted_printer() audit.node.header = u"field nulls status distinct\n" \ "------------------------------------------------------------" audit.node.format = u"{field_name:7.2%} {null_record_ratio_bin:>10} {distinct_count:>10}" # ...and we continue in main branch main.database_table_target(url = "postgres://localhost/sandbox", table = "data", create = True, replace = True, add_id_key = True) # Run the stream and pretty-print the exception try: stream.run() except pipes.StreamRuntimeError, e: e.print_exception()
The constructed stream looks like this: