30 Aug 2012

I’ve been running a site for the Chale Community Project for a while now that allows members of the community monitor their energy usage online.

tl;dr; I changed some code that was using MySQL to store meter readings to use redis instead. The rest of this post describes some of the design decisions needed to make the shift.

The energy readings are published using a version of my modified CurrentCost bridge over MQTT. As well as the whole house reading, a meter might have a number of individual appliance monitors attached. Each of these readings is published to its own topic in a very simple hierarchy:

sm00/CF35D16315BF93EC053E4EFFC614E3E944C2A626/1/2

The first two topic levels identify the meter and the second two identify the individual devices attached.

With readings arriving for each device every 6 seconds, I started with simply dumping them into MySQL – with each level of the topic as a column in the table, thrown in with a timestamp. As this granularity of data isn’t needed for the site, a job could then run every 10 minutes to average the readings into 10 minute slots and put them into another table. This was good enough at the time, but it wasn’t a long term solution.

Aside from some serious normalisation of the database (adding the 40 character meter ID in every row turned out not to be the most efficient thing to do), I wanted to move to using redis for the live data. There were a few reasons for this, but primarily it was a combination of wanting to lighten the load on the (shared) MySQL server, as well as to do something more interesting with redis.

The shift from relational table to key-value store required a change in mindset as to how to store the readings. The main requirement was to minimise the work needed to store a reading as well as the work needed to calculate the average reading for each device.

Redis provides a list data type which, when using the receiving topic as the key, is a perfect fit for storing all of the readings for a particular device. But that causes problems in generating the average as the list will contain all values for the device, not just the values for a particular 10 minute timeslot.

The solution is to simply add the timeslot to the key. The timeslot is the current time, such as 2012-08-20 15:21:34 with the last three digits set to 0 – 2012-08-20 15:20:00.

This leads to keys such as:

20120820152000:sm00:CF35D16315BF93EC053E4EFFC614E3E944C2A626:1:2
20120820153000:sm00:CF35D16315BF93EC053E4EFFC614E3E944C2A626:1:2

That gets the readings stored pretty efficiently, already split into a list-per-device-per-timeslot.

The task of averaging the lists could then just look at the current time, generate the timeslot string for the previoius timeslot, retrieve all keys that begin with that string and process all the lists. But there are two problems with this approach.

First, redis doesn’t recommend using its KEYS pattern command in production. I doubt this system would ever scale to the point where the performance hit of that command became an issue, but still, it’s worth sticking to the appropriate best practices.

The second issue is that it makes a big assumption that the only timeslot that needs to be processed is the previous one. If the task fails to run for any reason, it needs to process all of the outstanding timeslots.

This needed one of the other data types available in redis; the sorted set. This allows you to have an ordered list of values that are guaranteed to be unique. When each reading arrives, the timeslot value is put into a sorted set called, imaginatively, ‘timeslots‘. As it is a set, it doesn’t matter that the same timeslot value will be put into the set thousands of times – it will only result in a single entry. As it is sorted, the task doing the averaging can process the outstanding timeslots in order.

Finally, a normal set is used to store all of the reading keys that have been used for a particular timeslot. As this is per-timeslot, the key for the set is ‘readings:‘ appended with the timeslot.

So, with that in place, the averaging task takes the follow steps.

  1. Gets the list of timeslots that there are readings for from the sorted set timeslots.
  2. For each timeslot, that is not the current one:
    1. Remove the timeslot from the set
    2. Get the list of reading keys from list readings:timeslot
    3. For each reading key, retrieve the list, calculate the average, insert it into the MySQL table and delete the list
    4. Delete the readings:timeslot list

And that is what I’ve had running for a few weeks now. No grand conclusion.

8 Jun 2012

Cosm, formerly known as Pachube, has had MQTT support for a while now and I’ve been meaning to do something with it.

One of the strengths of a publish/subscribe system, such as MQTT, is the decoupling of the producers and consumers of messages. I don’t have to modify the code that reads my energy usage from the CurrentCost meter each time I think of a new use of the data – I just add a new, self-contained, subscriber.

So to get my energy usage data into a datastream on Cosm, I could simply write a client that subscribes to the house/cc/power topic on my home broker and republishes it to the appropriate topic on the Cosm broker.

But even that’s far more work than is really needed – why write code when the broker can do it for me? RSMB lets you create bridge connections to other brokers and define how topics should be mapped between them. Now, I’ve done this plenty of times, but I still get confused with the topic-mapping syntax RSMB uses in its config file. So, for future reference, here’s how to do it.

connection cosm
try_private false
address 216.52.233.121 1883
username <COSM_API_KEY>
clientid knolleary
topic "" out house/cc/power /v2/feeds/62907/datastreams/0.csv

The topic line shows how you can achieve a one-to-one mapping between different topics – the non-obvious part being the need to have the empty “” as the topicString.

If you are using Mosquitto, an additional parameter is needed in the topic line to specify the QoS to use:

topic "" out 0 house/cc/power /v2/feeds/62907/datastreams/0.csv

Going the other way, subscribing to a feed on Cosm, is just as easy:

topic "" in inbound/cosm /v1/feeds/XXXXXX/datastreams/0.csv

Or, if you are using Mosquitto:

topic "" in 0 inbound/cosm /v1/feeds/XXXXXX/datastreams/0.csv

Note the use of /v1/ in the remote topic string this time around. As documented, this gets you just the raw data value, rather than the comma-separated time-stamp and data value returned by the /v2/ api. Of course, depending on what you’re doing, you may want the time-stamp. Equally, you may want the full JSON or <shuddder> XML formats – in which case you would change the .csv to .json or .xml respectively.

In a future post, I’ll look more at how the topic-mapping in RSMB (and Mosquitto) can be used to good effect in scaling an MQTT system.

11 Apr 2012

A question has come up a couple times on the MQTT mailing list asking how it can be used for workload distribution; given tasks being published to a topic, how can they be distributed fairly amongst the subscribers.

The short answer is that you can’t – it isn’t how things work in a publish/subscribe system. With MQTT, each subscriber is treated equal and every message published to a topic will go to every subscriber.

Despite the natural urge to shoehorn MQTT into any solution, the correct answer is to use the right tool for the job. In this case, something that provides proper queuing semantics to ensure each message is only consumed once – something like Celery for example.

That said, let’s have a look at one way you could shoehorn MQTT into this particular scenario.

First, lets define the scenario. Say, for example, you have a stream of tweets being published to MQTT. Specifically a stream of all tweets that contain the text “4sq”. Why would you do this? Well, as explained by Matt Biddulph in his Where 2012 workshop, this gives you a feed of all check-ins on foursquare that have been shared on Twitter. Each tweet contains a url which gives you more metadata about the check-in. To do anything meaningful with the tweets, you need to retrieve the url, which, as it uses the foursqure url-shortener, requires two http requests.

The rate at which the tweets arrive, 25,000 an hour Matt suggests, makes it impractical to do anything but the most basic of operations on them. It certainly isn’t practical to do the two http requests to retrieve the foursquare information for each tweet in real-time.

There isn’t any real magic needed here; a single subscriber to the topic dumps the messages onto a queue of some form that can then have as many active consumers as makes sense for the scenario.

A couple of years ago, I would have jumped straight at dumping the tweets into a MySQL table – how very 2010. These days, it’s so much easier to use something like redis.

import redis
import mosquitto

// ...
// exercise for the reader to:
// - create MQTT client, connect it and subscribe to the appropriate topic
// - create a redis client and connect it
// ...

// on_message callback for the mosquitto client
def onmqttmessage(client,message):
    redis.lpush(message.topic,message.payload)

The consumers then become simple redis clients that redis.brpop(topic) to get the next tweet to process. If you want to get extra fancy, then redis.brpoplpush(topic,processing_queue) and a housekeeping thread lets you make sure nothing gets lost if a consumer dies unexpectedly.

Looking at this, you may ask why bother with MQTT at all? Why not just replace the MQTT publishing bits in the twitter streaming code with the redis putting bits? Well, I have used the word “shoehorn” three times in this post.

4 Apr 2012

For a demo I’m putting together at work, I needed to have tweets available on an MQTT topic. The whys and wherefores of the demo might be something I’ll write about in the future, but for now, this post is about getting the tweets into MQTT.

To be honest, it is pretty trivial. The Twitter Streaming API is well documented and can be used to get a stream of tweets based on whatever search terms you choose. As each tweet is received, it just needs to be published with an MQTT client to the topic of choice.

It’s barely worth a blog post.

I’ve put the code up on github.

8 Mar 2012

A new version of the Arduino MQTT client is available – from the usual place.

This release brings a handful of changes, but there are a couple I wanted to draw attention to; particularly as one will require a minor change to sketches using this library.

When I wrote the first prototype of the client, I made the decision to limit packets to 128 bytes. This was a pragmatic decision to balance memory usage and usefulness; who would possible want to publish more than 128 bytes from an Arduino?

I knew the limit was just a #define in the code, so anyone could change the limit if they wanted. But I didn’t think that all the way through and it turns out there were a couple of issues with this:

  • uint8_t was used in lots of places to store lengths, which put an internal limit of 256 bytes,
  • the packet length was always encoded as a single byte – which is only correct for packets shorter than 128 bytes due to the multi-byte integer encoding MQTT uses.

So, with this release, you can now modify the #define in PubSubClient.h to a larger value and it’ll work.

The second change is a wide ranging tidy up of the types used in the library, both internally and on the API. The external part of this is primarily that all of the functions that were defined to return an int type to indicate success or failure, now return a boolean type. This change should be compatible with existing scripts.

A knock-on effect of both of these changes will require a change to any sketch that provides a message callback. The length argument of the callback function signature has changed from int to unsigned int. When you try to compile a sketch without this change, you’ll get an error along the lines of:

error: invalid conversion from 
  'void (*)(char*, byte*, int)' 
 to
  'void (*)(char*, uint8_t*, unsigned int)'

Hopefully that will help the google-juice for anyone who hits this error and hasn’t read the change log.

As ever, check out the project page for more details and a download link.