CloudStudio for AWS now available

I’m excited to announce the initial release of a project I’ve been working on – CloudStudio.

Amazon’s AWS provides a ton of great services, but not all of them have easy UIs for testing and interacting with them. As a frequent user of services like Kinesis and Firehose, I’ve often wished I could have an easy way to push a message onto a stream, or take a peek at messages coming in, so I started building a suite of tools to just do that.

This initial release focuses on Kinesis, Firehose, and SQS. You can send messages to all 3, and you can poll messages from Kinesis, all with a simple and easy-to-use GUI. It is available now for Mac for $9.99, but I’m wrapping up work on Windows and Linux versions in the next week or so as well.

It may be starting out simple, but I have some really big plans to make it even more useful in the near future, by doing things like allowing you to stream to Kinesis from a file, tail an S3 file, and more. I’ve made my roadmap open on Trello, and I’ve added a forum to solicit feedback on what features you want.

I want this to be a really powerful tool for AWS users, so I hope you’ll check it out here and give me some feedback on how I can make it more useful for you.

Running a single Mocha test file in VS Code

I’ve been doing a lot of Node development lately, after doing mostly Java the past 10 years. There are lots of comparisons between the two, and I’ve come away with a better understanding of where one is better than the other and what I wish I could take from each one.

One thing I miss about doing Java development is being able to right-click in a single test, run it, and be able to easily debug it. You can get close to this in Node-land, but it is nowhere near as simple or seamless. You can, however, get kind of close with VS Code (which I’m loving more and more every day) by creating a custom launch configuration that lets you debug a single mocha file.

1. Create a launch configuration that only runs the current file

This creates a launch configuration that passes the current file to the mocha command.

2. If you have a mocha.opts, you may need to override it

A lot of projects have a mocha.opts file that has something like this ‘–recursive test/.’

Command-line args should override options in mocha.opts, but it looks like the file specification part does not get overridden. So, what I did was create a dummy mocha-debug.opts that is empty, then point to it in the config:

3. You can now run and debug a single Mocha file.

Betterment substantially increases fees

I recently became familiar with Betterment since my employer switched to them for our 401k provider. I started looking into the services they provided, and became really intrigued by their automated investing and tax-loss harvesting. I’ve usually stuck with Vanguard and their low fees, but with a wrap fee of .15% if you had over $100k invested with them, it was tempting to try Betterment, since in theory, at least, the tax-loss harvesting would more than pay for the additional fees.

Getting all of my investments to them took a fair amount of time and money (though they made the process as easy as possible), and I was excited when I got my emails this morning saying my last 2 big accounts had been received by them.

Less than half an hour later, I got another email titled, innocuously, “New Betterment service plans for 2017”. Reading through the email, they discussed their new options that would allow you to use the services of a CFP, which is odd, given their pitch about automated investing, but not a big deal. Then, tucked down 4 paragraphs is the real reason for the changes:

Each plan will cost a simple, flat rate. Starting June 1, your Digital plan will be 0.25% per year of your average balance.

For accounts with over $100,000 in them, this represents an increase of 67% (from .15% to .25%)! And Betterment tries to make it as low-key as possible that they are increasing fees on us by a huge amount and not offering anything in return. I am really disappointed, both in the increase in fees and in the way they announced it – I had a high opinion of the company before this.

At this point, it looks like WealthFront is a better option. Both do a lot of the same things and offer similar features at a fixed .25% fee, but WealthFront manages the first $10,000 for free and offers a Direct Indexing service that lets you avoid ETF fees, making the combined fee substantially less than Betterment.

Fees should be getting cheaper as companies like this get more assets under management, not going higher by almost 70%, and companies should be more forthright in the way they raise fees.

Combining data from multiple sources with Spark and Zeppelin

I’ve been doing a lot with Spark lately, and I love how easy it is to pull in data from various locations, in various formats, and have be able to query/manipulate it with a unified interface.

Last week, I had a situation at work where I needed to get a list of users (from our mySQL database), filtered with ancillary data from JSON files in S3 and a CSV file that had been sent to me and was sitting on my local machine. Using Spark and Zeppelin, I was able to do this in just a few minutes – analyzing a few GBs of data from multiple sources in multiple formats from my local machine took only a few minutes to execute, too (this approach would work with much larger data also, you just would want to run it on a cluster..).

I wanted to share a simplified, adapted version of that with others to show the power of Spark and Zeppelin. This is a really trivial example with a tiny amount of data, but hopefully it gives an idea of what is possible – and while it may seem kind of gimmicky, I’ve already used something very similar once for an actual business use case, and I can see lots of other situations where it could come in handy as well.

The Task

Marketing has sent us a list of users – some of whom they sent an email, some they didn’t. They want to know how many pages the users who got sent our email viewed versus users who did not. Unfortunately, our analytics data only has the user id, and marketing only has their email address – we’ll have to use the data in our database to bridge them.

Requirements

Zeppelin
mySQL
AWS S3 (You can obviously change this to a local file, if you want)

Files

I’ve uploaded samples for all of the assets you need, as well as the Zeppelin notebook itself:

SQL file for creating the users table and the users in it
JSON file with analytics data to be loaded from S3
Email list with all of the users that marketing emailed in CSV format
Zeppelin file that has all of the code

I’ve posted screenshots in this post, if you’d like the code, grab the Zeppelin file.

Step 1: Imports

We need to import a mySQL driver, the AWS SDK (so we can load data from S3), and the databricks library that can create RDDs from CSV files.

dependencies

Step 2: Load users from mySQL

We want to create a DataFrame from our users table in our database. Note that it will parse the schema and turn it into a DataFrame with similar column names as are in the table. We now have all of our users and their ids/names/emails available to us.

database

Step 3: Load JSON file from S3

Spark is really awesome at loading JSON files and making them queryable. In this case, we’re doing a little extra work to load it from S3 – just give it your access key, secret key, and then point it at the right bucket and it will download it and turn it into a DataFrame based on the JSON structure.

Now we have a record of all events from all of our users.

events-s3

Step 4: Load the email list CSV from our local filesystem

We have a simple CSV file in the format of <email address>,<promo email sent: true|false>. Let’s read it in now, too, so we know who got an email and who didn’t. In this case, our CSV has a header file that Spark will use to create the DataFrame columns, and it will attempt to infer the schema based on the data in the file – we can also pass in an explicit schema if we need to.

emaillist-csv

Step 5: Query

Now, let’s write a query that joins them all together and tells us who got an email and how many events they produced. As you can see, this looks like any other SQL query, only it is pulling data in from a mySQL database, a JSON file we pulled from S3, and a local CSV file.

query-1

Step 6: Graph

We can even use the %sql interpreter in Zeppelin to write a query without any Scala code and graph the results:

query-2

Conclusion

As you can see, the combination of Spark and Zeppelin is incredibly powerful. Data exploration and complex reporting – even with terabytes of data spread across multiple data sources – is fairly easy to do.

Aliased index rebuilds in Elasticsearch

Aliases are really useful tools in Elasticsearch. They allow you to create something that looks and acts like an index, but is really a pointer to another index (or multiple indices).

One really handy use of aliases is in doing full-index rebuilds. There are a handful of reasons you may need to do a rebuld:

  1. You only rebuild an index periodically – either on a schedule or in response to some sort of event (such as an import of some sort).
  2. You do online updates to your index, but want to periodically rebuild it to keep it in sync with the system of record in cases where you might miss new/updated data for various reasons.
  3. You do online updates to your index, but want to do a full rebuild in background in response to certain events, because handling them online would be inefficient or cause delays to real-time events.
  4. You want to update your index in a way that is backwards compatible, and do so in a way that there is no downtime.

The general idea behind an aliased index rebuild is that you never query an index directly – instead, you query an alias that points to the real index. When you need to rebuild the index, you create a new index, populate it, and then point your alias at it.

There are several advantages to doing this:

  1. Your index can be rebuilt as often as you’d like, with no downtime – even if it takes a long time to rebuild.
  2. You can keep a history (space allowing) of past index instances – if there is a problem with new data, you can easily roll back to a previous index.

There are two main strategies for how you structure your backing indexes:

  1. Rolling index. You choose to have 2 or more backing indices. When you start, it will be on index 0. Subsequent rebuilds increment the index counter, until it reaches the amount of indices you want to keep, then roll over and start at the beginning. So, for example, if you chose to only have 2 indices (one hot, one cold), the first time you built your index, it would point to 0. Then when you rebuilt the index, index 1 would be created and when it was done being populated, the alias would point to it. When you rebuilt it a second time, index 0 would be deleted and rebuilt with new data, then the alias would switch from 1 to 0.
  2. Time-based. You start with an index that has some element of time in it (for example, 2016-04-09) and then create new ones also based on time. The alias always points at the latest-built one. Pruning old indices when they are already needed is done by a separate process.

We’re going to look at how to do the first one – using a rolling index – in this example. Sample code that you can run can be found here. The steps are roughly as follows:

  1. Get a lock to make sure no other process can also attempt to rebuild the index. This would leave the alias/index in a bad state and would possibly result in downtime.
  2. Figure out which index (i), if any, the alias currently belongs to. If the alias doesn’t exist or doesn’t point to any index yet, we’ll start at 0. Otherwise, we start at i + 1. The underlying index name will be <alias name>-<index number>. So, if our alias was called ‘cars’ and we were building it for the first time, the index would be named ‘cars-0’.
  3. If an index with this name already exists, delete it.
  4. Create the index, with the given properties (index name, shards, replica count).
  5. Put the type mapping, if needed.
  6. Rebuild the index, using a user-supplied function.
  7. Create the alias (if it doesn’t exist), or switch an existing alias to point to our newly-created and populated index.
  8. Release the lock (if any errors occur, this probably should happen as well).

The sample project will go through all of these steps using the Java API. It isn’t production-ready code, but it will run out of the box against Elasticsearch 2.3 (run net.uresk.samples.elastic.aliasing.sample.AliasRebuildApp) and should get you pointed in the right direction. All of the interesting code is in net.uresk.samples.elastic.aliasing.AliasingRebuildService, which I’ll walk through now.

Step 1 – Get a lock

We want to create a distributed locking mechanism to ensure that only one thing is attempting to rebuild any given index at a time. If two process were to rebuild an index at the same time, they would overwrite each other in various ways, and would almost certainly result in temporary availability issues with the index.

We can do this by creating an empty document in a special locks index in Elasticsearch. If we try to index a document and set create = true, it will fail if the document already exists. This guarantees that if we successfully create a document, we now hold the lock and can proceed.

This code will attempt to create a document. It will return true if it does, and false if it cannot. If the result is false, we assume something else is rebuilding the index and terminate.

One thing we don’t address here is a timeout on the lock – Elasticsearch does support TTLs on documents, and we could use this so that a lock would expire after a certain amount of time. This is useful to handle cases where a rebuild job crashes for whatever reason and you don’t want future jobs to get blocked. However, figuring out a good TTL value takes a bit of work, and I’ve left it out of this example.

Step 2 – Decide which index to use

Here we want to figure out which index we should be writing to, based on where the alias is currently pointing. If the alias doesn’t exist, we’ll start with <alias name>-0. Otherwise, we’ll use <alias name>-(i+1), where i is the current index. If we’ve reached the maximum number of indices to create, we’ll start back at 0.

These 3 methods are useful for doing this. If we find an index name that matches the alias name we are trying to use, or if the alias is pointing at more than one index, we throw an exception and stop processing because we are in an unknown state and don’t want to break the current setup.

Step 3 – Delete the index if it exists

This is pretty straightforward. If we are writing to a previously used index, we want to delete it first.

Step 4 – Create the index

Again, pretty straightforward – we create the index with our name (derived from the alias name) and the shard/replica counts specified.

Step 5 – Put the type mapping, if needed

In the sample, I’m not doing this, but it is often necessary to specify type mappings (ie, to define how certain should be stored or analyzed, or to tell it to not analyze certain fields). This will load JSON from src/main/mappings/<aliasName>.json and use it to create a type mapping.

Step 6 – Rebuild the index

Here we just call a user-defined function for rebuilding the index. We can pass it the actual index name so it knows where to index documents. It can also return data about its results (a count, or a list of ids) so we can report it back.

Step 7 – Create/Move the alias

Now it is time to either create the alias, or move an existing alias to a new index. It is important that moving/renaming the alias be done atomically, otherwise, you could end up in an inconsistent state (an alias pointing at 2 indices) or without an alias at all.

Step 8 – Release the lock

Now that we’re done, we can release the lock by deleting the document we created in step 1.

That’s it! Now our index is rebuilt and search traffic is hitting it, without any downtime.

Note: If you are doing online updates in addition to rebuilds, you’ll probably want to pre-create all of your indices ahead of time and do your online updates to all of them – this will minimize the amount of stale data that sits in your index due to data that changed or was inserted while the rebuild job was running.

If you have any problems, questions, or ideas for making this better, please let me know!

————————————————–
Also – If you are working with Elasticsearch, I’ve found Elasticsearch: The Definitive Guide to be a really useful resource.

————————————————–