analytical data streams & online analytical processing Python frameworks
How to build and run a data analysis stream? Why streams? I am going to talk about how to use brewery from command line and from Python scripts.
Brewery is a Python framework and a way of analysing and auditing data. Basic principle is flow of structured data through processing and analysing nodes. This architecture allows more transparent, understandable and maintainable data streaming process.
You might want to use brewery when you:
There are many approaches and ways how to the data analysis. Brewery brings a certain workflow to the analyst:
Brewery makes the steps 2. and 3. easy - quick prototyping, alternative branching, comparison. Tries to keep the analysts workflow clean and understandable.
There are two ways to create a stream: programmatic in Python and command-line without Python knowledge requirement. Both ways have two alternatives: quick and simple, but with limited feature set. And the other is full-featured but is more verbose.
The two programmatic alternatives to create a stream are: basic construction and “HOM” or forking construction. The two command line ways to run a stream: run and pipe. We are now going to look closer at them.

Note regarding Zen of Python: this does not go against “There should be one – and preferably only one – obvious way to do it.” There is only one way: the raw construction. The others are higher level ways or ways in different environments.
In our examples below we are going to demonstrate simple linear (no branching) stream that reads a CSV file, performs very basic audit and “pretty prints” out the result. The stream looks like this:

Brewery comes with a command line utility brewery which can run streams
without needing to write a single line of python code. Again there are two
ways of stream description: json-based and plain linear pipe.
The simple usage is with brewery pipe command:
brewery pipe csv_source resource=data.csv audit pretty_printer
The pipe command expects list of nodes and attribute=value pairs for node
configuration. If there is no source pipe specified, CSV on standard input is
used. If there is no target pipe, CSV on standard output is assumed:
cat data.csv | brewery pipe audit
The actual stream with implicit nodes is:

The json way is more verbose but is full-featured: you can create complex
processing streams with many branches. stream.json:
{
"nodes": {
"source": { "type":"csv_source", "resource": "data.csv" },
"audit": { "type":"audit" },
"target": { "type":"pretty_printer" }
},
"connections": [
["source", "audit"],
["audit", "target"]
]
}
And run:
$ brewery run stream.json
To list all available nodes do:
$ brewery nodes
To get more information about a node, run brewery nodes <node_name>:
$ brewery nodes string_strip
Note that data streaming from command line is more limited than the python way. You might not get access to nodes and node features that require python language, such as python storage type nodes or functions.
Preferred programming way of creating streams is through higher order messaging (HOM), which is, in this case, just fancy name for pretending doing something while in fact we are preparing the stream.
This way of creating a stream is more readable and maintainable. It is easier to insert nodes in the stream and create forks while not losing picture of the stream. Might be not suitable for very complex streams though. Here is an example:
b = brewery.create_builder()
b.csv_source("data.csv")
b.audit()
b.pretty_printer()
When this piece of code is executed, nothing actually happens to the data stream. The stream is just being prepared and you can run it anytime:
b.stream.run()
What actually happens? The builder b is somehow empty object that accepts
almost anything and then tries to find a node that corresponds to the method
called. Node is instantiated, added to the stream and connected to the
previous node.
You can also create branched stream:
b = brewery.create_builder()
b.csv_source("data.csv")
b.audit()
f = b.fork()
f.csv_target("audit.csv")
b.pretty_printer()
This is the lowest level way of creating the stream and allows full customisation and control of the stream. In the basic construction method the programmer prepares all node instance objects and connects them explicitly, node-by-node. Might be a too verbose, however it is to be used by applications that are constructing streams either using an user interface or from some stream descriptions. All other methods are using this one.
from brewery import Stream
from brewery.nodes import CSVSourceNode, AuditNode, PrettyPrinterNode
stream = Stream()
# Create pre-configured node instances
src = CSVSourceNode("data.csv")
stream.add(src)
audit = AuditNode()
stream.add(audit)
printer = PrettyPrinterNode()
stream.add(printer)
# Connect nodes: source -> target
stream.connect(src, audit)
stream.connect(audit, printer)
stream.run()
It is possible to pass nodes as dictionary and connections as list of tuples (source, target):
stream = Stream(nodes, connections)
What would be lovely to have in brewery?
Probing and data quality indicators – tools for simple data probing and easy way of creating data quality indicators. Will allow something like “test-driven-development” but for data. This is the next step.
Stream optimisation – merge multiple nodes into single processing unit before running the stream. Might be done in near future.
Backend-based nodes and related data transfer between backend nodes – For example, two SQL nodes might pass data through a database table instead of built-in data pipe or two numpy/scipy-based nodes might use numpy/scipy structure to pass data to avoid unnecessary streaming. Not very soon, but foreseeable future.
Stream compilation – compile a stream to an optimised script. Not too soon, but like to have that one.
Last, but not least: Currently there is little performance cost because of the
nature of brewery implementation. This penalty will be explained in another
blog post, however to make long story short, it has to do with threads, Python
GIL and non-optimalized stream graph. There is no future prediction for this
one, as it might be included step-by-step. Also some Python 3 features look
promising, such as yield from in Python 3.3 (PEP 308).
I’m glad to announce new release of Brewery – stream based data auditing and analysis framework for Python.
There are quite a few updates, to mention the notable ones:
brewery runner with commands run and graphAdded several simple how-to examples, such as: aggregation of remote CSV, basic audit of a CSV, how to use a generator function. Feedback and questions are welcome. I’ll help you.
Note that there are couple changes that break compatibility, however they can be updated very easily. I apologize for the inconvenience, but until 1.0 the changes might happen more frequently. On the other hand, I will try to make them as painless as possible.
Full listing of news, changes and fixes is below.
Nodes can be configured with node.configure(dictionary, protected). If ‘protected’ is True, then protected attributes (specified in node info) can not be set with this method.
added node identifier to the node reference doc
added create_logger
added experimental retype feature (works for CSV only at the moment)
aggregates to measures, added measures as
public node attributefield_name(), now str(field) should be usedWARNING: Compatibility break:
__node_info__ and use plain node_info insteadStream.update() now takes nodes and connections as two separate argumentsIf you have any questions, comments, requests, do not hesitate to ask.
New small release is out with quite nice addition of documentation. It does not bring too many new features, but contains a refactoring towards better package structure, that breaks some compatibility.
Documentation updates
Framework Changes
Depreciated functions
Streams
Enjoy!
Data Cleansing introduction (for BigClean Prague 2011) Presentation from BigClean Prague about data cleansing - with Brewery examples.
New way of constructing streams has been implemented which uses “higher order messaging”. What does that mean? Instead of constructing the stream from nodes and connections, you “call” functions that process your data. You pretend in your script that you work with data using functions:
...
main.csv_source("data.csv")
main.sample(1000)
main.aggregate(keys = ["year"])
main.formatted_printer()
...
Executing the functions as written in a script might be be very expensive from time and memory perspective. What is in fact happening is that instead of executing the data processing functions a stream network is being constructed.
Construction is being done by using forked branches. You create an empty forked branch by forking an empty stream:
from brewery.streams import * stream = Stream() main = stream.fork() ...
Now you have fork main. Each function call on main will append new processing node to the fork and the new node will be connected to the previous node of the fork.

Function names are based on node names in most of the cases. There might be custom function names for some nodes in the future, but now there is just simple rule:
CSVSourceNode to csv source nodecsv_source_nodecsv_sourceArguments to the function are the same as arguments for node constructor. If you want to do more node configuration you can access current node with node attribute of the fork:
main.node.keys = ["country"]
Run the stream as if it was constructed manually from nodes and connections:
stream.run()
So far you are able to construct single simple stream from a source to a target. There are plenty of situations where linear processing is not sufficient and you will need to have branches. To create another branch, you fork() a fork. For example, to attach data audit to the stream insert following code right after the node you want to audit:
# we are in main at node after which we want to have multiple branches audit = main.fork() audit.audit() audit.value_threshold(...) audit.formatted_printer(...) # continue main.* branch here...
Here is full example how to use forking with HOM in Brewery:
# Create the stream and empty fork
stream = Stream()
main = stream.fork()
# Start adding nodes by pretending that we are processing using functions
main.csv_source("data.csv", read_header = True, encoding = "utf-8")
main.coalesce_value_to_type()
# Create another fork for data audit:
audit = main.fork()
audit.audit(distinct_threshold = None)
audit.value_threshold(thresholds = [ ["null_record_ratio", 0.01] ],
bin_names = ("ok", "fail"))
audit.formatted_printer()
audit.node.header = u"field nulls status distinct\n" \
"------------------------------------------------------------"
audit.node.format = u"{field_name:7.2%} {null_record_ratio_bin:>10} {distinct_count:>10}"
# ...and we continue in main branch
main.database_table_target(url = "postgres://localhost/sandbox",
table = "data",
create = True,
replace = True,
add_id_key = True)
# Run the stream and pretty-print the exception
try:
stream.run()
except pipes.StreamRuntimeError, e:
e.print_exception()
The constructed stream looks like this:

New subproject sprouted in Brewery: Opendata. The new package will contain wrappers for various open data services with APIs for structured data. First wrapper is for the Scraper Wiki. There are two new classes: ScraperWikiDataSource for plain data reading and ScraperWikiSourceNode for stream processing.
Example with ScraperWikiDataSource: Copy data from Scraper Wiki source into a local database. Table will be automatically created and replaced according to data structure in the source:
from brewery.opendata import *
from brewery.ds import *
src = ScraperWikiDataSource("seznam_insolvencnich_spravcu")
target = SQLDataTarget(url = "postgres://localhost/sandbox", table = "swiki_data",
create = True, replace = True)
src.initialize()
target.fields = src.fields
target.initialize()
for row in src.rows():
target.append(row)
src.finalize()
target.finalize()
Another example using streams: simple completeness audit report of source data. Fail threshold is set to 10%.
The stream looks like this:

And the source code for the stream set-up is:
nodes = {
"source": ScraperWikiSourceNode("seznam_insolvencnich_spravcu"),
"audit": AuditNode(distinct_threshold = None),
"threshold": ValueThresholdNode(),
"print": FormattedPrinterNode(),
}
connections = [
("source", "audit"),
("audit", "threshold"),
("threshold", "print")
]
nodes["print"].header = u"field nulls status distinct\n" \
"------------------------------------------------------------"
nodes["print"].format = u"{field_name:<30.30} {null_record_ratio: >7.2%} {null_record_ratio_bin:>10} {distinct_count:>10}"
nodes["threshold"].thresholds = [ ["null_record_ratio", 0.10] ]
nodes["threshold"].bin_names = ("ok", "fail")
stream = Stream(nodes, connections)
try:
stream.run()
except StreamRuntimeError, e:
e.print_exception()
Output:
field nulls status distinct ------------------------------------------------------------ cp_S 0.00% ok 84 cp_TP 31.00% fail 66 datumNarozeni 18.00% fail 83 denPozastaveni 100.00% fail 1 denVzniku 0.00% ok 91 denZaniku 100.00% fail 1 dne 99.00% fail 2 dobaPlatnosti 100.00% fail 1 ... nazev 82.00% fail 19 okres_S 5.00% ok 38 okres_TP 38.00% fail 35 ...
In this example you can see how successful you scraper is or how complete the provided data are. This simple stream helps you to fine-tune your scraping method.
Possible use, besides during development, would be to integrate the stream into automated process to get feedback on how complete your daily/monthly processing was.
In one of the following posts I will show you how to do “join” (in SQL sense) between datasets, for example how to enrich data from Scraper Wiki with details you have stored in a CSV or another scraper.
Freshly brewed clean data with analytical taste – that is what Data Brewery is for. The Python framework will allow you to:
You can use Brewery for analytical automation or just for ad-hoc analytical processing.
Project page is at databrewery.org. Source repository can be found at:
Documentation with examples and node reference can be found here.
Happy brewing!
