Tuesday, September 16, 2014

Tuple Calculus using Java 8 Streams

About a light-weight approach to processing arbitrary tabular data sets using Java 8 streams. The encapsulation of stream functions and utilities as operations on typed tuple streams offers a coherent basis for processing tabular data sets. The approach extends naturally into a straightforward domain specific language that is easy to explain and understand.

Introduction


Recently I came across the problem of processing arbitrary tabular data sets from various sources. Data sources included query results from relational databases, search engines and a bunch of flat files. Processing had to support filtering, calculated fields, aggregation, joining, sorting ..., basically the full computational power offered by SQL, Relational Algebra or Tuple Calculus.

This post summarizes a solution to this problem using a light-weight domain specific language on top of Java 8 streams. The approach offers an encapsulation of streams where:

  • Streamed objects are tuples, supporting field access by name and position.
  • Tuple streams are typed using schemas. Schemas are used to define stream operations too.
  • Tuple streams are defined as a composition of operations. Operations encapsulate the specifics of and variations in lambda types and stream composition.
  • These tuple stream defintions are expressed using a light-weight domain specific language.

Benford's Law


In this post I will be using Benford's law to illustrate the concepts and techniques introduced.

Benford's law refers to the frequency distribution of the first digit in real-life, numerical data. One would expect a uniform distribution of digits. Counterintuitively, the distribution is not uniform: digit 1 occurs as the first digit about 30% of the time, while digit 9 occurs as the first digit less than 5% of the time, as predicted by Benford's law.

I will use the World Bank country surface area's 2013 data set and compare the frequencies predicted by Benford's law with the actual observations. The data set provides the surface area of over 200 countries. The data has been prepared in a CSV-file, say country.csv, which starts with the following content:

countrysurface:Decimal
Afghanistan652,230
Albania28,750
Algeria2,381,740

The country data can be processed using the following tuple stream definition:
read(‘country.csv’)
.aggregate(
   { digit <- integer(substr(text(surface),0,1)) },
   { observation <- count()/count(*), prediction <- ln(1+1/digit) }
)
.sort({ digit })
The operations that constitute the tuple stream definition are:
  • The Read generator produces the content of a CSV-file as a stream of tuples. The name of the file is provided as an argument. The header row of the CSV-file is used as the schema definition of the content of the file.
  • The Aggregate operation groups the data, based on a group schema defining the tuple used for grouping and a value schema defining the aggregated values for each group. The result schema is the concatenation of the key and aggregate schemas.
    • The digit field in the group schema is defined as the first character of the conversion of the surface area to text, converted to an integer again.
    • The observation field in the value schema defines the relative frequency for each digit, dividing the absolute frequency by the total number of entries.
    • The prediction field in the value schema defines the theoretical relative frequency for each digit using Benford's law.
  • The Sort operation sorts these data on the digit field.

The resulting tuple stream generates the following frequency values:

digit:Integerobservation:Decimalprediction:Decimal
10.2769950.301030
20.1784040.176091
30.1220660.124939
40.1126760.096910
50.0798120.079181
60.0610330.066947
70.0704230.057992
80.0422540.051153
90.0563380.045757

To compare the observations with the predictions we combine them into a chart:


Clearly, the observed values (blue column chart) nicely match the theoretical frequencies (red line chart). The chi-square test is left as an exercise.

The remainder of this post illustrates how the tuple calculus used in the example can be realized using Java 8 streams.

Building Blocks


Before coming to the actual Java 8 streams, I will introduce some concepts and the basics of their definition in Java.

I will be working with streams of tuples, Stream<Tuple>, where Tuple is an interface. A tuple represents an ordered collection of objects and provides access to these objects by name.
public interface Tuple extends Comparable {
   public Schema getSchema();
   public Object get(String code);
}
Tuples are comparable, using the ordering of the values they contain. Tuples are mutable, in the sense that the values they contain can be updated. For brevity, the setter functions have been left out.

The content of a tuple is described by a Schema. Basically, a schema is an ordered collection of fields. The Schema interface provides positional access to these fields.
public interface Schema {
   public int getSize();
   public Field getField(int i);
}
Besides the access functions mentioned here, a schema provides access to fields by name and some convenience functions to add fields and get a field iterator.

A field is defined by a name and a primitive data type. As schemas are used for multiple purposes, fields are somewhat overloaded and may specify an expression for calculated fields and a sort order for schemas used for sorting and merging streams.
public interface Field {
   public String getCode();
   public AttributeType getType();
   public Expression getExpression();
   public boolean getAscending();
}
An expression is the representation of an algebraic formula that supports the calculation of field values. I will come back on this subject when talking about a domain specific language for tuple streams.

Stream Operations


Now that we have tuples, schemas and fields, we can start talking about operations. Operations are the building blocks of tuple stream definitions. A tuple stream definition is a composition of operations.

An operation typically operates on one or more source operations. An operation that has no source operations is a generator. It can be used as the starting point for a tuple stream definition. I will be talking about generators further on.

Before addressing some examples of operations, we will have a look at the Operation interface.
public interface Operation {
   public Schema getSchema();
   public Stream<Tuple> getStream();
}
An operation provides access to instances of the stream that it defines using the getStream function. It also provides access to the schema of this stream.

As a first example, consider the Filter operation, used for selecting a subset of the tuples in a stream. The selection citerion is provided as an expression that is evaluated for each tuple.

The implementation is straightforward, using the Java 8 Stream.filter function.
public class Filter extends BaseOperation {

   protected Expression condition;

   public Filter(Operation source, String condition) {
      this.condition = StreamUtils.parseExpression(condition);
      addSourceOperation(source);
   }

   public Schema getSchema() {
      return getSourceOperation(0).getSchema();
   }

   public Stream<Tuple> getStream() {
      return getSourceOperation(0).getStream()
         .filter(tuple -> condition.eval(new TupleContext(tuple)));
   }

}
The constructor of the Filter operation takes a source stream and the filter condition as an argument. As mentioned before, I will come back on expressions when talking about a domain specific language for tuple streams. For now, note that the expression is parsed once and evaluated many times, once per tuple that is processed.

The getSchema function is very simple, as the operation works on a single source stream, and the schema of the result stream is the same as the schema of the source stream. The management of source streams is implemented at the level of the BaseOperation class.

The getStream function takes the stream of the source operation and applies a filter using a Predicate that evaluates the condition expression. The TupleContext serves as the evaluation context based on the fields of the source tuple. The evaluation context merely provides a binding of field names to field values for that particular tuple.

As a second example, consider the Select operation, used for selecting fields based on the fields of the source stream. The operation also supports the use of calculated fields, once again using expressions. The fields and calculated fields selected are provided as a schema.

Again, the implementation is straightforward, using the Java 8 Stream.map function.
public class Select extends BaseOperation {

   protected Schema schema;

   public Select(Operation source, Schema schema) {
      this.schema = schema;
      addSourceOperation(source);
   }

   public Schema getSchema() {
      return schema;
   }

   public Stream<Tuple> getStream() {
      return getSourceOperation(0).getStream()
         .map(tuple -> TupleUtils.select(schema, tuple));
   }

}
In this case, the schema of the result stream is the selection schema provided as a constructor argument. The stream of the operation is created based on the stream of the source operation, where the selected tuple fields are calculated out of the tuples of the source stream. The TupleUtils.select function evaluates the expressions in the schema argument based on the field values of the tuple argument.

As a somewhat more complex example, consider the Merge operation, used for merging a number of sorted source streams with identical schemas while preserving the sort order.

The implementation is greatly simplified using the Iterators.mergeSorted function part of Google's guava-libraries. As a result, the implementation primarily consists of wrapping streams as iterators and vice versa.
public class Merge extends BaseOperation {

   protected Schema sortSchema;

   public Merge(Iterable sources, Schema sortSchema) {
      this.sortSchema = sortSchema;
      for (Operation source : sources) {
         addSourceOperation(source);
      }
   }

   public Schema getSchema() {
      return getSourceOperation(0).getSchema();
   }

   public Stream<Tuple> getStream() {
      List<Iterator<Tuple>> iterators = getSourceOperationList()
         .stream()
         .map(operation -> iterator(operation))
         .collect(Collectors.toList());
      Iterator<Tuple> iterator = Iterators.mergeSorted(
         iterators,
         (a, b) -> TupleUtils.compare(sortSchema, a, b)
      );
      return StreamSupport.stream(
         Spliterators.spliteratorUnknownSize(
            operation -> operation.getStream().iterator(),
            Spliterator.ORDERED
         ),
         false
      );
   }

}
The getStream function gets the collection of streams for the source operations using stream primitives and uses Iterators.mergeSorted to merge those iterators. The TupleUtils.compare function compares tuples based on a sort schema. The function then wraps the resulting iterator as a Spliterator and converts this to a stream.

As a last example, consider the Aggregate operation. It is used to aggregate tuples in a stream based on aggregation key and value schemas.
public class Aggregate extends BaseOperation {

   protected Schema aggregateSchema;
   protected Schema valueSchema;
   protected Schema schema;

   public Aggregate(Operation source, Schema aggregateSchema,
      Schema valueSchema) {
      this.aggregateSchema = aggregateSchema;
      this.valueSchema = valueSchema;
      addSourceOperation(source);
   }

   public Schema getSchema() {
      if (schema == null) {
         schema = SchemaUtils.concat(aggregateSchema, valueSchema);
      }
      return schema;
   }

   public Stream<Tuple> getStream() {
      Map<Tuple, Tuple> map = getSourceOperation(0).getStream()
         .collect(supplier(), accumulator(), combiner());
      return map.keySet().stream().map(
         key -> TupleUtils.concat(getSchema(), key, map.get(key))
      );
   }

}
The getSchema function simply returns the concatenation of the key and value schemas. The aggregation is implemented using a Collector based on a Map<Tuple, Tuple>, with the appropriate supplier, accumulator and combiner functions. The result stream is then created by streaming the key tuples in the map with the corresponding value tuples concatenated to them.

Stream Generators


As mentioned before, a generator is an operation that takes no source operations. It can be used as the starting point of a sequence of operations.

As a first example of a generator, consider the Read operation, that defines a tuple stream based on the content of a CSV-file.
public class Read extends BaseOperation {

   protected String file;
   protected Schema schema;

   public Read(String file) {
      this.file = file;
   }

   public Schema getSchema() {
      if (schema == null) {
         schema = SchemaUtils.getSchema(
            Files.lines(Paths.get(file)).findFirst().get()
         );
      }
      return schema;
   }

   public Stream getStream() {
      return Files.lines(Paths.get(file)).skip(1).map(
         line -> TupleUtils.getTuple(schema, line)
      );
   }

}
The schema of the Read operation is defined by the header row in the CSV-file. This line should contain the separated labels of the columns in the CSV-file. Optionally, a label may be followed by a type indication, as in
count:Integer
The stream for the Read operation is created using the file stream features present in Java 8. The first line is skipped, as it is the header line. Subsequent lines are converted to tuples using a utility function.

As a second example, consider the Generate operation, which uses a start schema to generate an initial tuple and a step schema to generate subsequent tuples out of the previous tuple.
public class Generate extends BaseOperation {

   protected Schema startSchema;
   protected Schema stepSchema;

   public Generate(Schema startSchema, Schema stepSchema) {
      this.startSchema = startSchema;
      this.stepSchema = stepSchema;
   }

   public Schema getSchema() {
      return startSchema;
   }

   public Stream getStream() {
      return StreamSupport.stream(
         Spliterators.spliteratorUnknownSize(
            new TupleIterator(startSchema, stepSchema),
            Spliterator.ORDERED
         ),
         false
      );
   }

}
The stream for the Generate operation is created using a TupleIterator, which will evaluate expressions in the start and step schemas to generate tuples. Once again, a Java 8 Spliterator is used to wrap this iterator as a stream.

Coding Convenience


You will have noticed the presence of some convenience classes such as StreamUtils, SchemaUtils and TupleUtils. The use of such convenience classes can be extended to simplify using operations too.

In particular, the creation of schemas can be made less cumbersome by providing a varargs SchemaUtils.create function. Using this function, schemas can be created fairly easily, as in
SchemaUtils.create(
   "radius",
   "circumference <- 2*radius*pi()"
   "surface <- sqr(radius)*pi()"
)
This schema could be used to select a radius field from a source stream and complete the result stream with the circumference and the surface area of a circle with that radius. Again we are using expressions, this time not only to specify calculated values, but also to specify their binding to schema fields.

Another convenience technique is to provide constructor shortcut functions on the BaseOperation class, as in
public class BaseOperation implements Operation {

   public Filter filter(String condition) {
      return new Filter(this, condition);
   }

}
This supports chaining of operations when defining complex tuple streams, as in
BaseOperation
   .read("country.csv")
   .filter("surface > 1000")
   .sort(SchemaUtils.create("name"))
This stream definition processes countries for which at least the name and the surface area is available in a CSV-file by filtering out the ones with a surface area above 1,000 and sorting the result by name.

The Sort operation uses a sort schema to sort a tuple stream using the natural sort order of tuples defined by the sort schema. I will leave the implementation as an exercise.

Towards a Domain Specific Language


The last example illustrates that tuple stream definitions can be formulated in an expressive way given the proper convenience classes and functions. We can simplify these definitions even more by the introduction of a domain specific language for tuple stream definitions. In fact, the expression syntax used so far already constitutes part of such a domain specific language.

Parsing these expressions is done using an operator precedence parser. Such a parser supports term notation and can be configured dynamically to add support for unary and binary operators. The parser creates an abstract syntax tree for parsed expressions. I hope to address the topic of the operator precedence parser in another post.

By adding the appropriate operator definitions, the expression syntax can be extended to:
  • Support chaining for the construction of operations using the . (dot) operator.
  • Provide a syntax for schema specifications using curly braces ({}) and the <- (arrow) operator.

More Operations


On top of the operations already mentioned in this post, the entire collection of stream functions provided by the Java 8 Streams library can be lifted to make them available as operations, including concat, distinct, limit, skip ..., each time leveraging the schema and expression features addressed in this post.

In the context of accessing data of many different source systems, one may imagine generators similar to the Read operation, such as a Sql operation to fetch data from a relational database, a Solr operation to fetch data from a solr server, and even a Sax operation to stream the content of an XML-file as a sequence of SAX events.

Presenting aggregated results in a tabular layout can be supported by a Matrix operation that transforms a tuple stream with row labels, column labels and values into a matrix. The inverse operation Unwind (to give it a name) may convert data in a matrix layout to a tuple stream with row labels, column labels and values.

Conclusions


The encapsulation of Java 8 stream functions and utilities as operations on typed tuple streams offers a coherent basis for processing tabular data sets. Java 8 stream features can be lifted to the level of operations, and the approach adds a descriptive level for stream definitions. The focus of operations is mainly on the composition of streams, while Java 8 takes care of the actual processing and resource management.

The approach extends naturally into a straightforward domain specific language that is easy to explain and understand. The resulting formalism is light-weight, but it equals the computational power of SQL, Relational Algebra and Tuple Calculus. It provides a rich set of operations that offer excellent expressivity for data processing in a variety of contexts.

1 comment:

  1. Apparently, the idea has already been explored. Have a look at Apache Drill.

    ReplyDelete