Data Brewery

analytical data streams & online analytical processing Python frameworks

    by Stefan Urbanek
  • Data Streaming Basics in Brewery

    How to build and run a data analysis stream? Why streams? I am going to talk about how to use brewery from command line and from Python scripts.

    Brewery is a Python framework and a way of analysing and auditing data. Basic principle is flow of structured data through processing and analysing nodes. This architecture allows more transparent, understandable and maintainable data streaming process.

    You might want to use brewery when you:

    • want to learn more about data
    • encounter unknown datasets and/or you do not know what you have in your datasets
    • do not know exactly how to process your data and you want to play-around without getting lost
    • want to create alternative analysis paths and compare them
    • measure data quality and feed data quality results into the data processing process

    There are many approaches and ways how to the data analysis. Brewery brings a certain workflow to the analyst:

    1. examine data
    2. prototype a stream (can use data sampling, not to overheat the machine)
    3. see results and refine stream, create alternatives (at the same time)
    4. repeat 3. until satisfied

    Brewery makes the steps 2. and 3. easy - quick prototyping, alternative branching, comparison. Tries to keep the analysts workflow clean and understandable.

    Building and Running a Stream

    There are two ways to create a stream: programmatic in Python and command-line without Python knowledge requirement. Both ways have two alternatives: quick and simple, but with limited feature set. And the other is full-featured but is more verbose.

    The two programmatic alternatives to create a stream are: basic construction and “HOM” or forking construction. The two command line ways to run a stream: run and pipe. We are now going to look closer at them.

    Note regarding Zen of Python: this does not go against “There should be one – and preferably only one – obvious way to do it.” There is only one way: the raw construction. The others are higher level ways or ways in different environments.

    In our examples below we are going to demonstrate simple linear (no branching) stream that reads a CSV file, performs very basic audit and “pretty prints” out the result. The stream looks like this:

    Command line

    Brewery comes with a command line utility brewery which can run streams without needing to write a single line of python code. Again there are two ways of stream description: json-based and plain linear pipe.

    The simple usage is with brewery pipe command:

    brewery pipe csv_source resource=data.csv audit pretty_printer
    

    The pipe command expects list of nodes and attribute=value pairs for node configuration. If there is no source pipe specified, CSV on standard input is used. If there is no target pipe, CSV on standard output is assumed:

    cat data.csv | brewery pipe audit
    

    The actual stream with implicit nodes is:

    The json way is more verbose but is full-featured: you can create complex processing streams with many branches. stream.json:

        {
            "nodes": { 
                "source": { "type":"csv_source", "resource": "data.csv" },
                "audit":  { "type":"audit" },
                "target": { "type":"pretty_printer" }
            },
            "connections": [
                ["source", "audit"],
                ["audit", "target"]
            ]
        }
    

    And run:

    $ brewery run stream.json
    

    To list all available nodes do:

    $ brewery nodes
    

    To get more information about a node, run brewery nodes <node_name>:

    $ brewery nodes string_strip
    

    Note that data streaming from command line is more limited than the python way. You might not get access to nodes and node features that require python language, such as python storage type nodes or functions.

    Higher order messaging

    Preferred programming way of creating streams is through higher order messaging (HOM), which is, in this case, just fancy name for pretending doing something while in fact we are preparing the stream.

    This way of creating a stream is more readable and maintainable. It is easier to insert nodes in the stream and create forks while not losing picture of the stream. Might be not suitable for very complex streams though. Here is an example:

        b = brewery.create_builder()
        b.csv_source("data.csv")
        b.audit()
        b.pretty_printer()
    

    When this piece of code is executed, nothing actually happens to the data stream. The stream is just being prepared and you can run it anytime:

        b.stream.run()
    

    What actually happens? The builder b is somehow empty object that accepts almost anything and then tries to find a node that corresponds to the method called. Node is instantiated, added to the stream and connected to the previous node.

    You can also create branched stream:

        b = brewery.create_builder()
        b.csv_source("data.csv")
        b.audit()
    
        f = b.fork()
        f.csv_target("audit.csv")
    
        b.pretty_printer()
    

    Basic Construction

    This is the lowest level way of creating the stream and allows full customisation and control of the stream. In the basic construction method the programmer prepares all node instance objects and connects them explicitly, node-by-node. Might be a too verbose, however it is to be used by applications that are constructing streams either using an user interface or from some stream descriptions. All other methods are using this one.

        from brewery import Stream
        from brewery.nodes import CSVSourceNode, AuditNode, PrettyPrinterNode
    
        stream = Stream()
    
        # Create pre-configured node instances
        src = CSVSourceNode("data.csv")
        stream.add(src)
    
        audit = AuditNode()
        stream.add(audit)
    
        printer = PrettyPrinterNode()
        stream.add(printer)
    
        # Connect nodes: source -> target
        stream.connect(src, audit)
        stream.connect(audit, printer)
    
        stream.run()
    

    It is possible to pass nodes as dictionary and connections as list of tuples (source, target):

        stream = Stream(nodes, connections)
    

    Future plans

    What would be lovely to have in brewery?

    Probing and data quality indicators – tools for simple data probing and easy way of creating data quality indicators. Will allow something like “test-driven-development” but for data. This is the next step.

    Stream optimisation – merge multiple nodes into single processing unit before running the stream. Might be done in near future.

    Backend-based nodes and related data transfer between backend nodes – For example, two SQL nodes might pass data through a database table instead of built-in data pipe or two numpy/scipy-based nodes might use numpy/scipy structure to pass data to avoid unnecessary streaming. Not very soon, but foreseeable future.

    Stream compilation – compile a stream to an optimised script. Not too soon, but like to have that one.

    Last, but not least: Currently there is little performance cost because of the nature of brewery implementation. This penalty will be explained in another blog post, however to make long story short, it has to do with threads, Python GIL and non-optimalized stream graph. There is no future prediction for this one, as it might be included step-by-step. Also some Python 3 features look promising, such as yield from in Python 3.3 (PEP 308).

    Links

  • Tags: brewery by Stefan Urbanek
  • Brewery 0.8 Released

    I’m glad to announce new release of Brewery – stream based data auditing and analysis framework for Python.

    There are quite a few updates, to mention the notable ones:

    • new brewery runner with commands run and graph
    • new nodes: pretty printer node (for your terminal pleasure), generator function node
    • many CSV updates and fixes

    Added several simple how-to examples, such as: aggregation of remote CSV, basic audit of a CSV, how to use a generator function. Feedback and questions are welcome. I’ll help you.

    Note that there are couple changes that break compatibility, however they can be updated very easily. I apologize for the inconvenience, but until 1.0 the changes might happen more frequently. On the other hand, I will try to make them as painless as possible.

    Full listing of news, changes and fixes is below.

    Version 0.8

    News

    • Changed license to MIT
    • Created new brewery runner commands: ‘run’ and ‘graph’:
      • ‘brewery run stream.json’ will execute the stream
      • ‘brewery graph stream.json’ will generate graphviz data
    • Nodes: Added pretty printer node - textual output as a formatted table
    • Nodes: Added source node for a generator function
    • Nodes: added analytical type to derive field node
    • Preliminary implementation of data probes (just concept, API not decided yet for 100%)
    • CSV: added empty_as_null option to read empty strings as Null values
    • Nodes can be configured with node.configure(dictionary, protected). If ‘protected’ is True, then protected attributes (specified in node info) can not be set with this method.

    • added node identifier to the node reference doc

    • added create_logger

    • added experimental retype feature (works for CSV only at the moment)

    • Mongo Backend - better handling of record iteration

    Changes

    • CSV: resource is now explicitly named argument in CSV*Node
    • CSV: convert fields according to field storage type (instead of all-strings)
    • Removed fields getter/setter (now implementation is totally up to stream subclass)
    • AggregateNode: rename aggregates to measures, added measures as public node attribute
    • moved errors to brewery.common
    • removed field_name(), now str(field) should be used
    • use named blogger ‘brewery’ instead of the global one
    • better debug-log labels for nodes (node type identifier + python object ID)

    WARNING: Compatibility break:

    • depreciate __node_info__ and use plain node_info instead
    • Stream.update() now takes nodes and connections as two separate arguments

    Fixes

    • added SQLSourceNode, added option to keep ifelds instead of dropping them in FieldMap and FieldMapNode (patch by laurentvasseur @ bitbucket)
    • better traceback handling on node failure (now actually the traceback is displayed)
    • return list of field names as string representation of FieldList
    • CSV: fixed output of zero numeric value in CSV (was empty string)

    Links

    If you have any questions, comments, requests, do not hesitate to ask.

  • Tags: brewery release announcement by Stefan Urbanek
  • Brewery 0.7 Released

    New small release is out with quite nice addition of documentation. It does not bring too many new features, but contains a refactoring towards better package structure, that breaks some compatibility.

    Documentation updates

    Framework Changes

    • added soft (optional) dependencies on backend libraries. Exception with useful information will be raised when functionality that depends on missing package is used. Example: “Exception: Optional package ‘sqlalchemy’ is not installed. Please install the package from http://www.sqlalchemy.org/ to be able to use: SQL streams. Recommended version is > 0.7”
    • field related classes and functions were moved from ‘ds’ module to ‘metadata’ and included in brewery top-level: Field, FieldList, expand_record, collapse_record
    • added probes

    Depreciated functions

    Streams

    • new node: DeriveNode - derive new field with callables or string formula (python expression)
    • new SelectNode implementation: accepts callables or string with python code
    • former SelectNode renamed to FunctionSelectNode

    Enjoy!

    Links

  • Tags: announcement release brewery by Stefan Urbanek
  • Data Cleansing introduction (for BigClean Prague 2011)

    Presentation from BigClean Prague about data cleansing - with Brewery examples.

  • Tags: brewery slides by Stefan Urbanek
  • Forking Forks with Higher Order Messaging

    New way of constructing streams has been implemented which uses “higher order messaging”. What does that mean? Instead of constructing the stream from nodes and connections, you “call” functions that process your data. You pretend in your script that you work with data using functions:

    ...
    main.csv_source("data.csv")
    main.sample(1000)
    main.aggregate(keys = ["year"])
    main.formatted_printer()
    ...
    

    Executing the functions as written in a script might be be very expensive from time and memory perspective. What is in fact happening is that instead of executing the data processing functions a stream network is being constructed.

    Construction

    Construction is being done by using forked branches. You create an empty forked branch by forking an empty stream:

    from brewery.streams import *
    
    stream = Stream()
    main = stream.fork()
    ...
    

    Now you have fork main. Each function call on main will append new processing node to the fork and the new node will be connected to the previous node of the fork.

    Fork Construction

    Function names are based on node names in most of the cases. There might be custom function names for some nodes in the future, but now there is just simple rule:

    1. decamelize node name: CSVSourceNode to csv source node
    2. replace spaces with underscores: csv_source_node
    3. remove ‘node’ suffix: csv_source

    Arguments to the function are the same as arguments for node constructor. If you want to do more node configuration you can access current node with node attribute of the fork:

    main.node.keys = ["country"]
    

    Running

    Run the stream as if it was constructed manually from nodes and connections:

    stream.run()
    

    Forking

    So far you are able to construct single simple stream from a source to a target. There are plenty of situations where linear processing is not sufficient and you will need to have branches. To create another branch, you fork() a fork. For example, to attach data audit to the stream insert following code right after the node you want to audit:

    # we are in main at node after which we want to have multiple branches
    
    audit = main.fork()
    audit.audit()
    audit.value_threshold(...)
    audit.formatted_printer(...)
    
    # continue main.* branch here...
    

    Example

    Here is full example how to use forking with HOM in Brewery:

    # Create the stream and empty fork
    stream = Stream()
    main = stream.fork()
    
    # Start adding nodes by pretending that we are processing using functions
    main.csv_source("data.csv", read_header = True, encoding = "utf-8")
    main.coalesce_value_to_type()
    
    # Create another fork for data audit:
    audit = main.fork()
    audit.audit(distinct_threshold = None)
    audit.value_threshold(thresholds = [ ["null_record_ratio", 0.01] ],
                            bin_names = ("ok", "fail"))
    
    audit.formatted_printer()
    audit.node.header = u"field                            nulls     status   distinct\n" \
                             "------------------------------------------------------------"
    audit.node.format = u"{field_name:7.2%} {null_record_ratio_bin:>10} {distinct_count:>10}"
    
    # ...and we continue in main branch
    main.database_table_target(url = "postgres://localhost/sandbox", 
                                table = "data",
                                create = True,
                                replace = True,
                                add_id_key = True)
    
    # Run the stream and pretty-print the exception
    try:
        stream.run()
    except pipes.StreamRuntimeError, e:
        e.print_exception()
    

    The constructed stream looks like this:

    Fork Example Stream

  • Tags: feature howto brewery by Stefan Urbanek
  • Brew data from Scraper Wiki

    New subproject sprouted in Brewery: Opendata. The new package will contain wrappers for various open data services with APIs for structured data. First wrapper is for the Scraper Wiki. There are two new classes: ScraperWikiDataSource for plain data reading and ScraperWikiSourceNode for stream processing.

    Example with ScraperWikiDataSource: Copy data from Scraper Wiki source into a local database. Table will be automatically created and replaced according to data structure in the source:

    from brewery.opendata import *
    from brewery.ds import *
    
    src = ScraperWikiDataSource("seznam_insolvencnich_spravcu")
    target = SQLDataTarget(url = "postgres://localhost/sandbox", table = "swiki_data",
                            create = True, replace = True)
    src.initialize()
    target.fields = src.fields
    target.initialize()
    
    for row in src.rows():
        target.append(row)
    
    src.finalize()
    target.finalize()
    

    Another example using streams: simple completeness audit report of source data. Fail threshold is set to 10%.

    The stream looks like this:

    Scraper Wiki simple example

    1. from scraper wiki feed data to data audit node
    2. based on value threshold generate new textual field that will state whether the data passed or failed completeness test (there should be no more than 10% of empty values)
    3. print formatted report

    And the source code for the stream set-up is:

    nodes = {
        "source": ScraperWikiSourceNode("seznam_insolvencnich_spravcu"),
        "audit": AuditNode(distinct_threshold = None),
        "threshold": ValueThresholdNode(),
        "print": FormattedPrinterNode(),
    }
    
    connections = [ 
                    ("source", "audit"), 
                    ("audit", "threshold"),
                    ("threshold", "print")
                    ]
    
    nodes["print"].header = u"field                            nulls     status   distinct\n" \
                             "------------------------------------------------------------"
    nodes["print"].format = u"{field_name:<30.30} {null_record_ratio: >7.2%} {null_record_ratio_bin:>10} {distinct_count:>10}"
    
    nodes["threshold"].thresholds = [ ["null_record_ratio", 0.10] ]
    nodes["threshold"].bin_names = ("ok", "fail")
    
    stream = Stream(nodes, connections)
    
    try:
        stream.run()
    except StreamRuntimeError, e:
        e.print_exception()
    

    Output:

    field                            nulls     status   distinct
    ------------------------------------------------------------
    cp_S                             0.00%         ok         84
    cp_TP                           31.00%       fail         66
    datumNarozeni                   18.00%       fail         83
    denPozastaveni                 100.00%       fail          1
    denVzniku                        0.00%         ok         91
    denZaniku                      100.00%       fail          1
    dne                             99.00%       fail          2
    dobaPlatnosti                  100.00%       fail          1
    ...
    
    nazev                           82.00%       fail         19
    okres_S                          5.00%         ok         38
    okres_TP                        38.00%       fail         35
    ...
    

    In this example you can see how successful you scraper is or how complete the provided data are. This simple stream helps you to fine-tune your scraping method. 

    Possible use, besides during development, would be to integrate the stream into automated process to get feedback on how complete your daily/monthly processing was.

    In one of the following posts I will show you how to do “join” (in SQL sense) between datasets, for example how to enrich data from Scraper Wiki with details you have stored in a CSV or another scraper.

  • Tags: opendata howto brewery by Stefan Urbanek
  • Introduction

    Freshly brewed clean data with analytical taste – that is what Data Brewery is for. The Python framework will allow you to:

    • stream structured data from various sources (CSV, XLS, SQL database, Google spreadsheet) to various structured targets
    • create analytical streams using flow-based programming: connect processing nodes together and let the structured data flow through them
    • measure data properties, such as data quality or numerical statistics
    • do advanced data mining in the future such as clustering or classification

    You can use Brewery for analytical automation or just for ad-hoc analytical processing.

    Project page is at databrewery.org. Source repository can be found at:

    Documentation with examples and node reference can be found here.

    Happy brewing!

    Brewery stream example

  • Tags: announcement brewery