Saturday, April 21, 2012

Implement Word Count in Accumulo

It seems like the first map-reduce program that everyone tries is counting words. This first program reads a piece of text using the mapper to tokenize the text and outputs a "1" for each token. Then the reducer adds up the "1" values to produce the word counts.

Accumulo provides the same functionality without needing to write a single line of code by using a SummingCombiner iterator. Below is a complete example.

Actually this example is more powerful because the same code can be used to sum across any time dimension.

This example shows how to sum across days. First start the accumulo shell.Then follow these steps:

> createtable --no-default-iterators wordtrack
wordtrack> setiter -t wordtrack -p 10 -scan -minc -majc -class org.apache.accumulo.core.iterators.user.SummingCombiner
SummingCombiner interprets Values as Longs and adds them together.  A variety of encodings (variable length, fixed length, or string) are available
----------> set SummingCombiner parameter all, set to true to apply Combiner to every column, otherwise leave blank. if true, columns option will be ignored.: true
----------> set SummingCombiner parameter columns, <col fam>[:<col qual>]{,<col fam>[:<col qual>]} escape non-alphanum chars using %<hex>.: 
----------> set SummingCombiner parameter lossy, if true, failed decodes are ignored. Otherwise combiner will error on failed decodes (default false): <TRUE|FALSE>: 
----------> set SummingCombiner parameter type, <VARLEN|FIXEDLEN|STRING|fullClassName>: STRING

Insert records for a daily rollup.

wordtrack> insert "Robert" "2011.Nov.12" "" 1
wordtrack> insert "Robert" "2011.Nov.12" "" 1
wordtrack> insert "Parker" "2011.Nov.12" "" 1
wordtrack> insert "Parker" "2011.Nov.12" "" 1
wordtrack> insert "Parker" "2011.Nov.12" "" 1
wordtrack> insert "Parker" "2011.Nov.23" "" 1
wordtrack> scan
Parker 2011.Nov.12: []    3
Parker 2011.Nov.23: []    1
Robert 2011.Nov.12: []    2

Get all counts for a given day:

wordtrack> scan -c 2011.Nov.12
Parker 2011.Nov.12: []    3
Robert 2011.Nov.12: []    2

Let's talk about that "--no-default-iterators" parameter for a moment. Normally, Accumulo uses an iterator that only displays the one value (the value with the latest timestamp) based on the uniqueness of the key/column family/column qualifer combination. If you leave that iterator in place, your counters will get essentially reset to one each time a compaction is done.

Wednesday, April 11, 2012

Using Accumulo To Calculate Seven Day Rolling Average

Without commenting on if this is a good idea, let me show how you can use Accumulo to store the seven values needed to perform a rolling average. Log into the shell. Create the table. Then configure iterators to retain seven values instead of just the default single value. Finally insert some values.
bin/accumulo shell -u root -p password
> createtable rolling
rolling> config -t rolling -s table.iterator.scan.vers.opt.maxVersions=7
rolling> config -t rolling -s table.iterator.minc.vers.opt.maxVersions=7
rolling> config -t rolling -s table.iterator.majc.vers.opt.maxVersions=7

rolling> insert 2012.02.20 "" "" 21
rolling> insert 2012.02.20 "" "" 22
rolling> insert 2012.02.20 "" "" 23
rolling> insert 2012.02.20 "" "" 24
rolling> insert 2012.02.20 "" "" 25
rolling> insert 2012.02.20 "" "" 26
rolling> insert 2012.02.20 "" "" 27
rolling> insert 2012.02.20 "" "" 28
rolling> insert 2012.02.20 "" "" 29
rolling> insert 2012.02.20 "" "" 30

rolling> insert 2012.02.21 "" "" 51
rolling> insert 2012.02.21 "" "" 52
rolling> insert 2012.02.21 "" "" 53
rolling> insert 2012.02.21 "" "" 54
rolling> insert 2012.02.21 "" "" 55
rolling> insert 2012.02.21 "" "" 56
rolling> insert 2012.02.21 "" "" 57
rolling> insert 2012.02.21 "" "" 58
rolling> insert 2012.02.21 "" "" 59
rolling> insert 2012.02.21 "" "" 60
You can use the 'scan' command to see all information in the table. Or you can use 'scan -b 2012.02.21 -e 2012.02.21' to see information about a single row id. You can use Java program to calculate the rolling average:
public class RollingAverageDriver {

 public static void main(String[] args) throws AccumuloException, AccumuloSecurityException, TableNotFoundException, TableExistsException {
  String instanceName = "development";
  String zooKeepers = "localhost";
  String user = "root";
  byte[] pass = "password".getBytes();
  String tableName = "rolling";

  ZooKeeperInstance instance = new ZooKeeperInstance(instanceName, zooKeepers);
  Connector connector = instance.getConnector(user, pass);

  Scanner scanner = connector.createScanner(tableName, new Authorizations());

  RollingAverageCalculator raCalculator = new RollingAverageCalculator(scanner, 7);
  int rollingAverage = raCalculator.calculate("2012.02.21");

  System.out.println("7 Day Rolling Average: " + rollingAverage);
  

  System.out.println("END");
 }
}
Of course, you'll also need the RollingAverageCalculator class:
public class RollingAverageCalculator {

 Scanner scanner = null;
 int minNumberOfValues = 0;
 
 public RollingAverageCalculator(Scanner scanner, int minNumberOfValues) {
  super();
  this.scanner = scanner;
  this.minNumberOfValues = minNumberOfValues;
 }

 public int calculate(final String rowId) {
  scanner.setRange(new Range(rowId, rowId));

  int sum = 0;
  int count = 0;
  Iterator<Map.Entry<Key,Value>> iterator = scanner.iterator();
  while (iterator.hasNext()) {
   Map.Entry<Key,Value> entry = iterator.next();
   Value value = entry.getValue();
   String sValue = new String(value.get());
   sum += Integer.parseInt(sValue);
   count++;
  }
  return count < minNumberOfValues ? 0 : (sum / count);
 }
}
It should be fairly straightforward to change the code to perform any kind of rolling average.

Reading A Row With Accumulo

Just another example of reading information with Accumulo. This example needs the accumulo-core and hadoop-core jar files.
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;

import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableExistsException;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.ZooKeeperInstance;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.hadoop.io.Text;

public class Accumulo_Scan_A_Row_Driver {

 public static void main(String[] args) throws AccumuloException, AccumuloSecurityException, TableNotFoundException, TableExistsException {
  String instanceName = "development";
  String zooKeepers = "localhost";
  String user = "root";
  byte[] pass = "password".getBytes();
  String tableName = "user";
  String rowId = "John";

  ZooKeeperInstance instance = new ZooKeeperInstance(instanceName, zooKeepers);
  Connector connector = instance.getConnector(user, pass);

  Scanner scan = connector.createScanner(tableName, new Authorizations());
  scan.setRange(new Range(rowId, rowId));

  Iterator<Map.Entry<Key,Value>> iterator = scan.iterator();
  while (iterator.hasNext()) {
   Map.Entry<Key,Value> entry = iterator.next();
   Key key = entry.getKey();
   Value value = entry.getValue();
   System.out.println(key + " ==> " + value);
  }
 }
}