Steps To Big Data: Hello, Cascading

In previous post Steps To Big Data: Hello, Pail, we have defined our fact model and shown how to use Pail to store out data. What is fact model?

A Fact Model, as we see it, structures basic knowledge about business operations from a business perspective. “Basic” means that the knowledge it represents cannot be derived or computed from any other knowledge. It that sense, a Fact Model is a crucial starting point for developing more advanced forms of business knowledge, including measures and rules.

As described above, fact model cannot be derived or computed from any other knowledge. In other words, its purpose is to generate data. The process of generating data from fact model could be illustrated as below.

Cascading supports to create and execute complex data processing workflows on top of Hadoop using any JVM-based language(Java, Scala, JRuby, Clojure, etc.) Most cascading related libraries are in the Maven Repository http://conjars.org/

We are going to use cascading to count users’ actions.

The first class is a simple job which defines source, sink and workflows, and then wire them up.

public final class ActionsInContextCounter implements Serializable, Runnable {
    private final String pailRoot;
    private final String outputPath;
    private final long start;
    private final long end;

    public ActionsInContextCounter(final String pailRoot,
                                   final String outputPath,
                                   final long start,
                                   final long end) {
        this.start = start;
        this.end = end;
        this.pailRoot = pailRoot;
        this.outputPath = outputPath;
    }

    @Override
    public void run() {
        // set-up source and sink
        PailSpec spec = new PailSpec(new DataPailStructure());
        PailTap.PailTapOptions options = new PailTap.PailTapOptions(
            spec,
            "pageview", // initial field declaration
            new List[]{
                spec.getStructure().getTarget(DataPailStructure.PAGEVIEW_DATA)
            }, // get the edge index of the page view edge to retrieve data
            null);

        Tap source = new PailTap(pailRoot, options); // read data from pail source
        Tap sink = new Hfs(new TextDelimited(Fields.ALL, ","), // output into a csv file
                           outputPath,
                           SinkMode.REPLACE);

        Pipe assembly = new Pipe("action count");
        // DataProtos.Data
        // ...

        Fields fieldDeclaration = new Fields("start", "end", "context", "action");
        assembly = new Each(assembly, new Fields("pageview"),
                            new GetUrlFunction(fieldDeclaration, start, end),
                            fieldDeclaration);
        // start, end, context, action
        // ...

        assembly = new GroupBy(assembly, new Fields("start", "end", "context", "action"));
        assembly = new Every(assembly, new Count(new Fields("count")));
        // start, end, context, action, count
        // ...

        Properties properties = new Properties();
        AppProps.setApplicationJarClass(properties, ActionsInContextCounter.class);
        FlowConnector flowConnector = new HadoopFlowConnector(properties);
        Flow flow = flowConnector.connect(source, sink, assembly);
        flow.complete();
    }
}

In the Each pipe above we used GetUrlFunction to convert data from Protobuf into tuples for further process. The class is defined as below.

public class GetUrlFunction extends BaseOperation<DataProtos.Data>
                            implements Function<DataProtos.Data> {
    private final long start;
    private final long end;

    public GetUrlFunction(final Fields fieldDeclaration, final long start, final long end) {
        super(fieldDeclaration);
        this.start = start;
        this.end = end;
    }

    @Override
    public void operate(FlowProcess flowProcess, FunctionCall functionCall) {
        Object object = functionCall.getArguments().getObject(0);
        DataProtos.Data data = (DataProtos.Data) object;
        long timestamp = data.getTimestamp();

        if (start <= timestamp && timestamp < end) {
            DataProtos.PageViewEdge pageViewEdge = data.getDataUnit().getPageViewEdge();
            String url = pageViewEdge.getUrl();
            String referrer = pageViewEdge.getReferrer();
            SiteAction action = SiteAction.actionFromUrl(url);
            SiteContext context = SiteContext.contextFromUrl(referrer);

            if (action != SiteAction.UNKNOWN) {  // just care about known actions
                Tuple tuple = new Tuple(start, end, context.toString(), action.toString());
                functionCall.getOutputCollector().add(tuple);
            }
        }
    }
}

We are using enum to define SiteAction with regular expressions and SiteContext is similiar.

public enum SiteAction implements Serializable {
    RECOMMEND ("^/recommend/recommend/(\\d|\\w)+$"),
    FOLLOW ("^/follow/toggle-follow(.*(followAction=follow))?"),
    UNFOLLOW ("^/follow/unfollow"),
    POST ("^/spark/create"),
    COMMENT ("^/comment/create"),
    LIKE ("^/spark/toggle-like"),
    SHARE ("^/share/share-post$"),
    INITIATEPURCHASE ("^/step/offer/checkout"),
    COMPLETEPURCHASE ("^/step/offer/transaction"),
    UNKNOWN (null);

    private final Pattern urlPattern;

    SiteAction(final String urlPattern) {
        this.urlPattern = urlPattern != null ? Pattern.compile(urlPattern) : null;
    }

    public boolean isActionUrl(final String url) {
        return urlPattern != null && urlPattern.matcher(url).find();
    }

    public static SiteAction actionFromUrl(final String url) {
        for (final SiteAction action : SiteAction.values())
            if (action.isActionUrl(url))
                return action;
        return UNKNOWN;
    }
}

The final result looks like.

1343743200,1343829600,BUSINESSPROFILE,COMMENT,17
1343743200,1343829600,BUSINESSPROFILE,FOLLOW,187
1343743200,1343829600,BUSINESSPROFILE,LIKE,18
1343743200,1343829600,BUSINESSPROFILE,POST,61
1343743200,1343829600,BUSINESSPROFILE,RECOMMEND,55
1343743200,1343829600,BUSINESSPROFILE,SHARE,2
1343743200,1343829600,BUSINESSPROFILE,UNFOLLOW,7
1343743200,1343829600,GIVEAWAY,FOLLOW,203
1343743200,1343829600,GIVEAWAY,RECOMMEND,15
1343743200,1343829600,LOCATIONPROFILE,FOLLOW,4
1343743200,1343829600,LOCATIONPROFILE,RECOMMEND,1
1343743200,1343829600,NOTICEBOARD,FOLLOW,587
1343743200,1343829600,NOTICEBOARD,RECOMMEND,90
1343743200,1343829600,NOTICEDETAIL,COMMENT,4
1343743200,1343829600,NOTICEDETAIL,FOLLOW,1
1343743200,1343829600,NOTICEDETAIL,LIKE,12
1343743200,1343829600,NOTICEDETAIL,RECOMMEND,20
1343743200,1343829600,OFFERDETAIL,COMMENT,6
1343743200,1343829600,OFFERDETAIL,INITIATEPURCHASE,7
1343743200,1343829600,OFFERDETAIL,LIKE,7
1343743200,1343829600,OFFERDETAIL,RECOMMEND,6
1343743200,1343829600,POSTDETAIL,COMMENT,8
1343743200,1343829600,POSTDETAIL,LIKE,15
1343743200,1343829600,POSTDETAIL,RECOMMEND,8
1343743200,1343829600,UNKNOWN,COMPLETEPURCHASE,4
1343743200,1343829600,USERPROFILE,COMMENT,2
1343743200,1343829600,USERPROFILE,FOLLOW,14
1343743200,1343829600,USERPROFILE,LIKE,14
1343743200,1343829600,USERPROFILE,POST,22
1343743200,1343829600,USERPROFILE,RECOMMEND,9
1343743200,1343829600,USERPROFILE,UNFOLLOW,1

To wrap up, cascading is pretty handy especially for developers. It is very extensible to integrate various data provider. And most importantly, it uses hadoop to schedule map/reduce jobs.

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s