Combining data from multiple sources with Spark and Zeppelin

I’ve been doing a lot with Spark lately, and I love how easy it is to pull in data from various locations, in various formats, and have be able to query/manipulate it with a unified interface.

Last week, I had a situation at work where I needed to get a list of users (from our mySQL database), filtered with ancillary data from JSON files in S3 and a CSV file that had been sent to me and was sitting on my local machine. Using Spark and Zeppelin, I was able to do this in just a few minutes – analyzing a few GBs of data from multiple sources in multiple formats from my local machine took only a few minutes to execute, too (this approach would work with much larger data also, you just would want to run it on a cluster..).

I wanted to share a simplified, adapted version of that with others to show the power of Spark and Zeppelin. This is a really trivial example with a tiny amount of data, but hopefully it gives an idea of what is possible – and while it may seem kind of gimmicky, I’ve already used something very similar once for an actual business use case, and I can see lots of other situations where it could come in handy as well.

The Task

Marketing has sent us a list of users – some of whom they sent an email, some they didn’t. They want to know how many pages the users who got sent our email viewed versus users who did not. Unfortunately, our analytics data only has the user id, and marketing only has their email address – we’ll have to use the data in our database to bridge them.

Requirements

Zeppelin
mySQL
AWS S3 (You can obviously change this to a local file, if you want)

Files

I’ve uploaded samples for all of the assets you need, as well as the Zeppelin notebook itself:

SQL file for creating the users table and the users in it
JSON file with analytics data to be loaded from S3
Email list with all of the users that marketing emailed in CSV format
Zeppelin file that has all of the code

I’ve posted screenshots in this post, if you’d like the code, grab the Zeppelin file.

Step 1: Imports

We need to import a mySQL driver, the AWS SDK (so we can load data from S3), and the databricks library that can create RDDs from CSV files.

dependencies

Step 2: Load users from mySQL

We want to create a DataFrame from our users table in our database. Note that it will parse the schema and turn it into a DataFrame with similar column names as are in the table. We now have all of our users and their ids/names/emails available to us.

database

Step 3: Load JSON file from S3

Spark is really awesome at loading JSON files and making them queryable. In this case, we’re doing a little extra work to load it from S3 – just give it your access key, secret key, and then point it at the right bucket and it will download it and turn it into a DataFrame based on the JSON structure.

Now we have a record of all events from all of our users.

events-s3

Step 4: Load the email list CSV from our local filesystem

We have a simple CSV file in the format of <email address>,<promo email sent: true|false>. Let’s read it in now, too, so we know who got an email and who didn’t. In this case, our CSV has a header file that Spark will use to create the DataFrame columns, and it will attempt to infer the schema based on the data in the file – we can also pass in an explicit schema if we need to.

emaillist-csv

Step 5: Query

Now, let’s write a query that joins them all together and tells us who got an email and how many events they produced. As you can see, this looks like any other SQL query, only it is pulling data in from a mySQL database, a JSON file we pulled from S3, and a local CSV file.

query-1

Step 6: Graph

We can even use the %sql interpreter in Zeppelin to write a query without any Scala code and graph the results:

query-2

Conclusion

As you can see, the combination of Spark and Zeppelin is incredibly powerful. Data exploration and complex reporting – even with terabytes of data spread across multiple data sources – is fairly easy to do.

Aliased index rebuilds in Elasticsearch

Aliases are really useful tools in Elasticsearch. They allow you to create something that looks and acts like an index, but is really a pointer to another index (or multiple indices).

One really handy use of aliases is in doing full-index rebuilds. There are a handful of reasons you may need to do a rebuld:

  1. You only rebuild an index periodically – either on a schedule or in response to some sort of event (such as an import of some sort).
  2. You do online updates to your index, but want to periodically rebuild it to keep it in sync with the system of record in cases where you might miss new/updated data for various reasons.
  3. You do online updates to your index, but want to do a full rebuild in background in response to certain events, because handling them online would be inefficient or cause delays to real-time events.
  4. You want to update your index in a way that is backwards compatible, and do so in a way that there is no downtime.

The general idea behind an aliased index rebuild is that you never query an index directly – instead, you query an alias that points to the real index. When you need to rebuild the index, you create a new index, populate it, and then point your alias at it.

There are several advantages to doing this:

  1. Your index can be rebuilt as often as you’d like, with no downtime – even if it takes a long time to rebuild.
  2. You can keep a history (space allowing) of past index instances – if there is a problem with new data, you can easily roll back to a previous index.

There are two main strategies for how you structure your backing indexes:

  1. Rolling index. You choose to have 2 or more backing indices. When you start, it will be on index 0. Subsequent rebuilds increment the index counter, until it reaches the amount of indices you want to keep, then roll over and start at the beginning. So, for example, if you chose to only have 2 indices (one hot, one cold), the first time you built your index, it would point to 0. Then when you rebuilt the index, index 1 would be created and when it was done being populated, the alias would point to it. When you rebuilt it a second time, index 0 would be deleted and rebuilt with new data, then the alias would switch from 1 to 0.
  2. Time-based. You start with an index that has some element of time in it (for example, 2016-04-09) and then create new ones also based on time. The alias always points at the latest-built one. Pruning old indices when they are already needed is done by a separate process.

We’re going to look at how to do the first one – using a rolling index – in this example. Sample code that you can run can be found here. The steps are roughly as follows:

  1. Get a lock to make sure no other process can also attempt to rebuild the index. This would leave the alias/index in a bad state and would possibly result in downtime.
  2. Figure out which index (i), if any, the alias currently belongs to. If the alias doesn’t exist or doesn’t point to any index yet, we’ll start at 0. Otherwise, we start at i + 1. The underlying index name will be <alias name>-<index number>. So, if our alias was called ‘cars’ and we were building it for the first time, the index would be named ‘cars-0’.
  3. If an index with this name already exists, delete it.
  4. Create the index, with the given properties (index name, shards, replica count).
  5. Put the type mapping, if needed.
  6. Rebuild the index, using a user-supplied function.
  7. Create the alias (if it doesn’t exist), or switch an existing alias to point to our newly-created and populated index.
  8. Release the lock (if any errors occur, this probably should happen as well).

The sample project will go through all of these steps using the Java API. It isn’t production-ready code, but it will run out of the box against Elasticsearch 2.3 (run net.uresk.samples.elastic.aliasing.sample.AliasRebuildApp) and should get you pointed in the right direction. All of the interesting code is in net.uresk.samples.elastic.aliasing.AliasingRebuildService, which I’ll walk through now.

Step 1 – Get a lock

We want to create a distributed locking mechanism to ensure that only one thing is attempting to rebuild any given index at a time. If two process were to rebuild an index at the same time, they would overwrite each other in various ways, and would almost certainly result in temporary availability issues with the index.

We can do this by creating an empty document in a special locks index in Elasticsearch. If we try to index a document and set create = true, it will fail if the document already exists. This guarantees that if we successfully create a document, we now hold the lock and can proceed.

This code will attempt to create a document. It will return true if it does, and false if it cannot. If the result is false, we assume something else is rebuilding the index and terminate.

One thing we don’t address here is a timeout on the lock – Elasticsearch does support TTLs on documents, and we could use this so that a lock would expire after a certain amount of time. This is useful to handle cases where a rebuild job crashes for whatever reason and you don’t want future jobs to get blocked. However, figuring out a good TTL value takes a bit of work, and I’ve left it out of this example.

Step 2 – Decide which index to use

Here we want to figure out which index we should be writing to, based on where the alias is currently pointing. If the alias doesn’t exist, we’ll start with <alias name>-0. Otherwise, we’ll use <alias name>-(i+1), where i is the current index. If we’ve reached the maximum number of indices to create, we’ll start back at 0.

These 3 methods are useful for doing this. If we find an index name that matches the alias name we are trying to use, or if the alias is pointing at more than one index, we throw an exception and stop processing because we are in an unknown state and don’t want to break the current setup.

Step 3 – Delete the index if it exists

This is pretty straightforward. If we are writing to a previously used index, we want to delete it first.

Step 4 – Create the index

Again, pretty straightforward – we create the index with our name (derived from the alias name) and the shard/replica counts specified.

Step 5 – Put the type mapping, if needed

In the sample, I’m not doing this, but it is often necessary to specify type mappings (ie, to define how certain should be stored or analyzed, or to tell it to not analyze certain fields). This will load JSON from src/main/mappings/<aliasName>.json and use it to create a type mapping.

Step 6 – Rebuild the index

Here we just call a user-defined function for rebuilding the index. We can pass it the actual index name so it knows where to index documents. It can also return data about its results (a count, or a list of ids) so we can report it back.

Step 7 – Create/Move the alias

Now it is time to either create the alias, or move an existing alias to a new index. It is important that moving/renaming the alias be done atomically, otherwise, you could end up in an inconsistent state (an alias pointing at 2 indices) or without an alias at all.

Step 8 – Release the lock

Now that we’re done, we can release the lock by deleting the document we created in step 1.

That’s it! Now our index is rebuilt and search traffic is hitting it, without any downtime.

Note: If you are doing online updates in addition to rebuilds, you’ll probably want to pre-create all of your indices ahead of time and do your online updates to all of them – this will minimize the amount of stale data that sits in your index due to data that changed or was inserted while the rebuild job was running.

If you have any problems, questions, or ideas for making this better, please let me know!

————————————————–
Also – If you are working with Elasticsearch, I’ve found Elasticsearch: The Definitive Guide to be a really useful resource.

————————————————–

New site – CashBack Optimizer

I’m pleased to announce a site I’ve been working really hard on the past month or so – CashBack Optimizer.

If you’re like me and like to maximize the cashback you get from your cashback credit cards by using the most optimal one for a given store/category, you know that keeping track of which card to use where can be a little tricky. So, I built a simple website that shows you the best card for any given category – it automatically tracks rotating bonus categories that Discover, Chase, and others use.

You can even signup (free) and create a wallet, which lists all of your cards and shows you the best card you own in every category. Finally, the site has reviews from a number of users of each card to help you learn more about any cards you may be interested in.

Cashback credit cards can be a great way to earn a little extra money every month (provided you are careful and never carry a balance), so I hope this site is useful to people.

Check it out and let me know what you think!

Books I’ve read in 2015

I read a bunch of books this year – a lot of them really good, some of them less so. Here are some of the more notable ones in a few categories, and a short paragraph about my thoughts on them. I already have a bunch of good ones lined up to read in 2016, but I’d love to hear about any you read (and liked) this year!

History

Dead Wake: The Last Crossing of the Lusitania
by Erik Larson

This was a really interesting book about the sinking of the Lusitania. It approaches the story from three angles: 1) The personal lives of many of the passengers and crew of the ill-fated vessel, 2) all of the things that had to go wrong in order for it to be in position to be sunk, and 3) how engaged Winston Churchill and the Admiralty were in getting the United States to enter World War I, and how disengaged Woodrow Wilson may have been (being more focused on a love interest). Ultimately, there were a number of missed opportunities and a myriad of different ways the Lusitania could have made its voyage safely, and the author seems to question whether the British government at least allowed – if not helped it – to be sunk. The book is gripping and well-researched. 5/5


The Bully Pulpit: Theodore Roosevelt, William Howard Taft, and the Golden Age of Journalism
by Doris Kearns Goodwin

Team of Rivals – written by the same author – is one of my favorite books, so I was excited to read Bully Pulpit this year. This book is interesting, but it is a long read that felt tedious at times. The journalism parts were interesting and were a good introduction to 20th century muckraking, but also made the book less focused. The interactions between Taft and Roosevelt were fun to read and some there are some parallels in the politics of Roosevelt fracturing the Republican Party (Tea Party, anyone?) to ultimately make this worth reading, but this 750 page (admittedly well-researched) behemoth felt unfocused and slow to me. 3/5

In the Kingdom of Ice: The Grand and Terrible Polar Voyage of the USS Jeannette
by Hampton Sides

In the Kingdom of Ice is a fascinating tale of a young and competent captain, eager to be the first person to visit the North Pole for his young country, an ill-fated voyage that is doomed to fail due to poor understanding of the geography of the time but wouldn’t have ended so disastrously if it had not been for a few small events, and the survival of men pushed to limits we can only imagine. This book definitely starts out a little slow with the backstory, but eventually pulls you in as you travel with the men of the Jeannette and eagerly read to find their fate. 4/5


The Box: How the Shipping Container Made the World Smaller and the World Economy Bigger
by Marc Levinson

I never realized how huge container ships were until I was walking near the Oakland-San Francisco bay bridge and saw one of them heading through the bay. This book does a decent job of covering what at first might seem like a dry topic, but was really an interesting dive into a somewhat hidden aspect of our daily lives. It does a good job of showing how the container became dominant, how that affected global trade, and how the system works in general. Some downsides: it got bogged down in a lot of figures that would have communicated the point much better as graphs, it ignored some technical details of how things worked, and it introduces a central character (McLean), but doesn’t get enough detail of his life to satisfy you as a reader. Still, a really good (and relatively quick) read. 4/5

Business

Predictive Analytics, Revised and Updated: The Power to Predict Who Will Click, Buy, Lie, or Die
by Eric Siegel

The latter half of 2015 at my job was spent building a system for analytics, something I already have experience in but wouldn’t consider myself an expert. This book was a reasonably good introduction into the types of things analytics can tell us or help us figure out. It doesn’t really get into the “how” very much or give you really any direction as to where you should turn to find out more about the “how”, but it does at least whet your appetite a bit and give you some ideas. It is a good book for a really high-level look at analytics. 3/5


The Hard Thing About Hard Things: Building a Business When There Are No Easy Answers
by Ben Horowitz

This is largely based on a bunch of blog posts by Ben, and the organization suffers from it. It still, however, is a good book that offers some valuable insights into running a startup and leadership in general. Ben is pretty candid about when and how he screwed up, and is open about the hard parts of running a business. It is also a fun look back at the early days of Netscape and the internet in general, which I’m always a sucker for. The rap lyrics to start every chapter were a bit weird. 4/5

Technology

Countdown to Zero Day: Stuxnet and the Launch of the World’s First Digital Weapon
by Kim Zetter

Reverse-engineering, Capture-the-Flag competitions, and computer security in general are big hobbies of mine, so I was really excited to read this book. It covers the Stuxnet virus, which was a clever hack (all but certainly built by the United States and Israel) to cause Iran’s nuclear fuel production equipment to fail, causing setbacks to the program. This is a well-researched book that was interesting enough to keep me up late several nights trying to get to the end of it. This also raises a number of questions – how safe is our infrastructure from attacks like this? What are the ethics of virtual weapons? Highly recommended. 5/5


Turing’s Cathedral: The Origins of the Digital Universe
by George Dyson

The title is somewhat misleading – this isn’t a book about Alan Turing, but rather John von Neumann and the development of the computer in the first few decades after World War II. The history in this thoroughly researched book is amazing, and I really enjoyed learning more about von Neumann and other pioneers of the computing industry. It is, however, somewhat of a dry read. Software developers will probably still find the many stories about the early days of our profession to be fascinating. I’d recommend this book to computer enthusiasts and developers, but probably people who don’t have an inherent interest in the field. 3/5

Hadoop: The Definitive Guide
by Tom White

Hadoop, Sqoop, Spark, YARN, HDFS, HBase, Pig, ZooKeeper… So many weird names, so many things to learn. As someone who has been trying to come up to speed on the Hadoop ecosystem this year, this has been an incredible resource. It isn’t something you are likely to read straight-through (except for the first handful of chapters), but it has been an invaluable resource for the core parts of the Hadoop ecosystem and a good high-level overview of other parts of it. 5/5

Fiction

All the Light We Cannot See
by Anthony Doerr

I don’t read enough fiction, but my wife got me this book that I’d had my eye on for a while. It is about two young children on opposite sides of World War II and how their world’s intersect. One is a blind French girl, the other is a curious and gifted, but naive German boy. The book is beautifully written, if not a little wordy at times. It is a good story and the author is talented, but I found the back-and-forth short chapters to be jarring. There were some historical inaccuracies that were distracting, and the ending felt rushed and, frankly, unsatisfying. Still a good read, and it has won a Pulitzer Prize. 4/5

Data Twister 1.1 released

Data Twister 1.1 has been released to the Mac App Store. It has a few small changes:

1) Fixed an issue where the input/output boxes didn’t scroll correctly in some cases when the text overflowed.

2) Added the ability to load a file to use as the input. This makes it handy for importing large amounts of data and also doing quick hash checks on files.

Get it here.