Steps To Big Data: Hello, Pail

Just before I had my first anniversary holiday since I had joined communityengine. I started my new role in the recommendation team. The team is relatively new, so we are open to use virtually any technologies.

In the book Big Data (MEAP), Nathan Marz describes a framework calls Pail (dfs-datastores) which is a data storage solution on top of hadoop. It supports schema, merging small files into a large chunk for better hdfs performance etc.

We started to use Pail for our data collection. So what have we done?

  • a data schema defined in protobuf
  • an implementation of PailStructure
  • a job to run the data collection

Ready to dig deeper? Let’s get it started.

Schema Definition

The protobuf definition shows here is a simple one. Just have a Data which contains a DataUnit, timestamp and the source of the data collecting from.

DataUnit just have a PageViewEdge which describes which url has been visited by who.

message PageViewEdge{
    required string userid = 1;
    required string url = 2;
}

message SearchEdge{
    required string userid = 1;
    required string query= 2;
}

message DataUnit{
    optional PageViewEdge pageViewEdge = 1;
    optional SearchEdge searchEdge = 2;
}

message Data{
    required int64 timestamp = 1;
    required string source = 2;
    required DataUnit dataUnit = 3;
}

The definition is in the DataProtos.proto file. To compile it into Java class just run protoc DataProtos.proto –java_out=.

PailStructure Implementation

To implement a PailStructure is pretty straight-forward.

public final class DataPailStructure implements PailStructure<DataProtos.Data> {
    @Override
    public Class getType() {
        return DataProtos.Data.class;
    }

    @Override
    public DataProtos.Data deserialize(byte[] bytes) {
        return null;  // code for deserializing protobuf stream
    }

    @Override
    public byte[] serialize(DataProtos.Data data) {
        return new byte[0];  // serialize object into protobuf stream
    }

    @Override
    public List<String> getTarget(DataProtos.Data data) {
        return null;  // get the path (subpail) to store data
    }

    @Override
    public boolean isValidTarget(String... strings) {
        return false; // indicate if the target is correct
    }
}

Target here is a little bit tricky, we define indexes for each property in protobuf. We will store each property defined in DataUnit into a separate sub-folder. i.e

PailDataset\
  1\    <- folder to store PageViewEdge
  2\    <- folder to store SearchEdge

Detailed implementation as below (click to expand)

public final class DataPailStructure implements PailStructure<DataProtos.Data> {
    // we assume all property messages have a property-value field
    private final static String PROPERTYVALUE_FIELD_NAME = "value";

    // metadata about DataProtos.Data used to determine whether a target directory is valid or not
    private final Map<Integer, Set<Integer>> validDataTypes;
    private final Set<Integer> edgeTypes;

    // extract metadata about the DataProtos.Data type and its fields. We assume a somewhat rigid
    // structure here and this process will fail if these assumptions are violated.
    {
        final Map<Integer, Set<Integer>> vdts = new HashMap<Integer, Set<Integer>>();
        final Set<Integer> ets = new HashSet<Integer>();
        // Data must contain a DataUnit
        for (final Descriptors.FieldDescriptor fd : DataProtos.DataUnit.getDescriptor().getFields()) {
            final Integer fn = fd.getNumber();
            vdts.put(fn, new HashSet<Integer>());
            // all DataUnit fields MUST be of type Message. If the call to getMessageType() fails,
            // you've got an unsuitable schema. fix it!
            boolean isProperty = false;
            for (final Descriptors.FieldDescriptor pd : fd.getMessageType().getFields()) {
                // if the DataUInit field contains a value field, we assume that it represents a PropertyValue
                if (PROPERTYVALUE_FIELD_NAME.equals(pd.getName())) {
                    for (final Descriptors.FieldDescriptor pvd : pd.getMessageType().getFields())
                        vdts.get(fn).add(pvd.getNumber());
                    isProperty = true;
                    break;
                }
            }
            if (!isProperty)
                // we haven't found a value field, so we assume it's and edge
                ets.add(fn);
        }
        validDataTypes = Collections.unmodifiableMap(vdts);
        edgeTypes = Collections.unmodifiableSet(ets);
    }

    @Override
    public DataProtos.Data deserialize(byte[] serialized) {
        try {
            return DataProtos.Data.parseFrom(serialized);
        } catch (final Exception ex) {
            throw new RuntimeException(ex);
        }
    }

    @Override
    public byte[] serialize(final DataProtos.Data object) {
        return object.toByteArray();
    }

    @Override
    public Class getType() {
        return DataProtos.Data.class;
    }

    @Override
    public List<String> getTarget(final DataProtos.Data object) {
        final List<String> result = new ArrayList<String>(2);

        final DataProtos.DataUnit du = object.getDataUnit();
        final Descriptors.FieldDescriptor dd = getFirstSetField(du);
        if (dd == null)
            throw new IllegalArgumentException("supplied object must not contain an empty data unit");
        result.add(String.valueOf(dd.getNumber()));

        // a DataUnit will only contain fields of type Message
        final Message propertyValue = getPropertyValue((Message) du.getField(dd));
        if (propertyValue != null) {
            final Descriptors.FieldDescriptor pd = getFirstSetField(propertyValue);
            if (pd == null)
                throw new IllegalArgumentException("supplied object must not contain an empty property value");
            result.add(String.valueOf(pd.getNumber()));
        }
        return result;
    }

    private Descriptors.FieldDescriptor getFirstSetField(final Message message) {
        assert message != null;
        for (final Descriptors.FieldDescriptor fd : message.getDescriptorForType().getFields()) {
            if (message.hasField(fd)) {
                return fd;
            }
        }
        return null;
    }

    private Message getPropertyValue(final Message message) {
        assert message != null;
        for (final Descriptors.FieldDescriptor fd : message.getDescriptorForType().getFields()) {
            if (PROPERTYVALUE_FIELD_NAME.equals(fd.getName()))
                return (Message) message.getField(fd);
        }
        return null;
    }

    @Override
    public boolean isValidTarget(final String... dirs) {
        // this is somewhat ugly, as dirs only contains a filename as its last element if we
        // try to write something to the pail. when reading, dirs contains only the directories
        // on the path
        if (dirs.length == 0)
            return false;
        try {
            final Integer n = Integer.decode(dirs[0]);
            if (!isValidDataUnitType(n))
                return false;
            if (dirs.length > 1 && !isEdgeType(n)) {
                final Integer m = Integer.decode(dirs[1]);
                if (!isValidPropertyType(n, m))
                    return false;
            }
            return true;
        } catch (NumberFormatException ex) {
            return false;
        }
    }

    private boolean isValidDataUnitType(final int n) {
        return validDataTypes.containsKey(n);
    }

    private boolean isValidPropertyType(final int n, final int m) {
        return validDataTypes.containsKey(n) && validDataTypes.get(n).contains(m);
    }

    private boolean isEdgeType(final int n) {
        return edgeTypes.contains(n);
    }
}

Data Collecting Job

We use Amazon S3 to store the data. Hadoop knows how to handle it as long as you pass the correct path (s3://accesskey:secretkey@bucket/key). The job extracts the data from IIS log and write it into s3 using pail.

public final class IISLogProcessor implements Runnable {
    private String LOG_SOURCE = "IIS";
    private String accesskey = "your access key here";
    private String secretkey = "your secret key here";
    private String bucket = "datacollection";
    private String path = "s3://" + accesskey + ":" + secretkey + "@" + bucket;
    private Iterator<String> logIterator;
    private Pail<DataProtos.Data>.TypedRecordOutputStream outputStream;
    private DataPailStructure pailStructure;

    public IISLogProcessor(Iterator<String> logIterator) {
        this.logIterator = logIterator;
        this.pailStructure = new DataPailStructure()
    }

    @Override
    public void run() {
        open();

        long nonce = 0;

        while (logIterator.hasNext()) {
            String line = logIterator.next();

            /*
                logic to extract data from log
            */

            appendPageView(userId, url, timestamp, nonce++);
        }

        close();
    }

    public void open() {
        try {
            if (outputStream == null)
                outputStream = Pail.create(path, pailStructure).openWrite();
        } catch (final Exception ex) {
            throw new DataSinkException("could not open pail for writing", ex);
        }
    }

    public void close() {
        try {
            if (outputStream != null)
                outputStream.close();
            outputStream = null;
        } catch (final Exception ex) {
            throw new DataSinkException("could not close pail", ex);
        }
    }

    public void appendPageView(String userId, String url, long timestamp, long nonce) {
        DataProtos.DataUnit dataUnit = DataProtos.DataUnit.newBuilder()
            .setPageViewEdge(DataProtos.PageViewEdge.newBuilder()
                .setUserId(userId)
                .setUrl(url)
            ).build();

        saveData(timestamp, dataUnit);
    }

    public void saveData(long timestamp, DataProtos.DataUnit dataUnit) {
        DataProtos.Data data = DataProtos.Data.newBuilder()
            .setTimestamp(timestamp)
            .setSource(LOG_SOURCE)
            .setDataUnit(dataUnit).build();

        outputStream.outputStream(data);
    }
}

What else

That’s pretty much it. But after running the job, you probably want to check if it works. You can use the tool like CloudBerry Explorer to check your bucket. You will see similiar structure as below.
pail folder structure

If you want to preview the data, this DataPreviewer reads data from pail and return the readable text format of protobuf objects.

public class DataPreviewer {
    private final Pail<DataProtos.Data> pail;

    public DataPreviewer(final Pail<DataProtos.Data> pail) {
        this.pail = pail;
    }

    public ArrayList<String> preview(final String relpath, final int limit) {
        int count = 0;

        ArrayList<String> result = new ArrayList<String>(limit);

        try {
            Pail<DataProtos.Data>.PailIterator iterator = pail.getSubPail(relpath).iterator();
            while (iterator.hasNext() && count++ < limit) {
                result.add(iterator.next().toString());
            }
            iterator.close();
        } catch (IOException e) {
            result.add(e.getMessage());
        }

        return result;
    }
}

e.g. preview top 10 PageViewEdge data

new DataPreviewer(pail).preview("1", 10)
About these ads

3 thoughts on “Steps To Big Data: Hello, Pail

  1. I’m trying to reproduce all the step you mentioned above. I was able to compile the DataProtos.proto file into Java class. However, I have no idea on how to compile the Data Collecting Job with Pail. Could you share the steps to do so? I appreciate any input. Thanks.

    • In short, data collecting job using pail as i/o to store protobuf encoded information. As you have successful compiled protobuf definition into java, which is good.
      I failed to wire the flow when I was simplifing the code. Sorry about that.

      Before doing any processing, call open() to prepare a stream for write, and at the end call close() to close the stream.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s