Sunday, March 31, 2013

Example using Accumulo's RegExFilter class



package com.affy;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.SortedMap;
import java.util.TreeMap;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.IteratorEnvironment;
import org.apache.accumulo.core.iterators.IteratorUtil;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.accumulo.core.iterators.SortedMapIterator;
import org.apache.accumulo.core.iterators.system.MapFileIterator;
import org.apache.accumulo.core.iterators.user.RegExFilter;
import org.apache.accumulo.core.util.CachedConfiguration;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.log4j.Logger;

public class AccumuloRegExIteratorPlayground {

    private final Logger log = Logger.getLogger(AccumuloRegExIteratorPlayground.class);
    private static final Collection<ByteSequence> EMPTY_COL_FAMS = new ArrayList<ByteSequence>();

    public void process() throws IOException {
        final String regularExpression = "/.*";

        final SortedMap<Key, Value> input = new TreeMap<Key, Value>();
        input.put(new Key("1111", "2222", "3333", 0), new Value("4444".getBytes()));
        input.put(new Key("/1111", "2222", "3333", 0), new Value("4444".getBytes()));

        final RegExFilter rei = new RegExFilter();
        IteratorSetting is = new IteratorSetting(1, RegExFilter.class);
        RegExFilter.setRegexs(is, regularExpression, null, null, null, false);

        if (!rei.validateOptions(is.getOptions())) {
            throw new RuntimeException("invalid options.");
        }

        rei.init(new SortedMapIterator(input), is.getOptions(), new IteratorEnvironment() {
            @Override
            public SortedKeyValueIterator<Key, Value> reserveMapFileReader(String mapFileName) throws IOException {
                Configuration conf = CachedConfiguration.getInstance();
                FileSystem fs = FileSystem.get(conf);
                return new MapFileIterator(AccumuloConfiguration.getDefaultConfiguration(), fs, mapFileName, conf);
            }

            @Override
            public AccumuloConfiguration getConfig() {
                return AccumuloConfiguration.getDefaultConfiguration();
            }

            @Override
            public IteratorUtil.IteratorScope getIteratorScope() {
                throw new UnsupportedOperationException("Not supported yet.");
            }

            @Override
            public boolean isFullMajorCompaction() {
                throw new UnsupportedOperationException("Not supported yet.");
            }

            @Override
            public void registerSideChannel(SortedKeyValueIterator<Key, Value> iter) {
                throw new UnsupportedOperationException("Not supported yet.");
            }
        });
        rei.seek(new Range(), EMPTY_COL_FAMS, false);

        while (rei.hasTop()) {
            final Key key = rei.getTopKey();
            final Value value = rei.getTopValue();
            log.info(key + " --> " + value);
            rei.next();
        }
    }

    public static void main(final String[] args) throws IOException {
        AccumuloRegExIteratorPlayground driver = new AccumuloRegExIteratorPlayground();
        driver.process();
    }
}
Post a Comment