28 Oct 2012

Cosm has had MQTT support for some time now and I’ve had my home energy usage bridged up there from my local RSMB quite happily. One thing I hadn’t played with properly was talking to Cosm directly from an Arduino – aside from a 5 minute proof-I-could-do-it.

I’m putting together a new release of the PubSubClient, and having had the occasional report of problems talking to Cosm with it, I decided to see where things stood. Sure enough, the connection was pretty unstable; often failing to connect, hanging and certainly not reliably sending or receiving messages.

Knowing the client was producing perfectly valid MQTT packets and works with other brokers, the problems had to either exist in the network connection to Cosm or with their broker implementation. The fact my RSMB bridge to Cosm work from the same network ruled out a fundamental network issue. Equally, there are plenty of MQTT brokers on-line these days to check it wasn’t a problem with the Arduino talking to a remote broker, rather than one on the same subnet.

Having debugged this sort of thing with other implementations, I had a good idea what the issue was; splitting MQTT packets over multiple TCP packets – something I knew the PubSubClient did.

When reading an MQTT packet from the network, it’s easy to assume that a read from the network will provide the next byte of the packet. When testing on local networks, that will mostly be true. But when dealing with internet-scale lag between TCP packets, the code doing the reading must handle the case when the next byte hasn’t arrived yet. This is what I think the Cosm broker is tripping over.

A quick test compared writing the packets to the network with multiple calls versus doing it with a single call and saw all of the reliability issues disappear. So I decided to update the client library to do just that.

If all you’re after is working code, then you can grab the latest library from GitHub. Note that this code includes an API change on each of the constructors – you need to provide an instance of EthernetClient rather than leave it to the library to create one. You can see what I mean in the updated examples.

In case you’re interested, here’s a run down of the changes I had to make to get this to work.

For the purposes of this exercise, an MQTT packet consists of three components; a single-byte header, between 1 and 4 bytes encoding the remaining length and finally the remaining payload bytes. The existing library provides a function that takes the header and payload, calculates the remaining length bytes and writes each of them to the network in turn – in other words, three separate network writes.

boolean PubSubClient::write(uint8_t header, uint8_t* buf, uint16_t length) {
   uint8_t lenBuf[4];
   uint8_t llen = 0;
   uint8_t digit;
   uint8_t pos = 0;
   uint8_t rc;
   uint8_t len = length;
   do {
      digit = len % 128;
      len = len / 128;
      if (len > 0) {
         digit |= 0x80;
      lenBuf[pos++] = digit;
   } while(len>0);

   rc = _client->write(header);
   rc += _client->write(lenBuf,llen);
   rc += _client->write(buf,length);
   lastOutActivity = millis();
   return (rc == 1+llen+length);

The crude way to reduce the network writes would be for the function to copy all of those bytes to a single buffer and then pass that single buffer to the network. But that would involve a lot of unnecessary copying of bytes around as well as the increased memory usage for the buffer.

   // Allocate yet more memory
   static uint8_t packet[MQTT_MAX_PACKET_SIZE];
   pos = 0;
   packet[pos++] = header;
   for (int i=0;i<llen;i++) {
      packet[pos++] = lenBuf[i];
   // Copy lots of bytes around   
   for (int i=0;i<length;i++) {
      packet[pos++] = buf[i];
   rc += _client->write(packet,pos);

There is already a buffer used for incoming packets which can be reused for the outgoing packets. The difficulty is the remaining length bytes; until the payload is constructed the code doesn’t know how long it will be, which means it does not know how many bytes are needed to encode the length. Without knowing how many bytes are used to encode the length, the client doesn’t know the offset into the buffer to start writing the payload.

The trick is knowing that there will be at most 4 bytes of length and one of header – so the payload can be written starting at the 6th byte of the buffer. Then, once the length bytes are calculated, they can be inserted into the buffer working back from the 5th byte and the header byte can be inserted ahead of the length bytes. This gets the MQTT packet into a single contiguous buffer, with minimal copying of bytes, which can be passed to the network – albeit starting at a known offset from the start of the buffer.

   // llen is the number of length bytes
   buf[4-llen] = header;
   for (int i=0;i<llen;i++) {
      buf[5-llen+i] = lenBuf[i];
   rc = _client->write(buf+(4-llen),length+1+llen);

Simple as that.

There is one place in the library that can’t be reduced to a single network write; the recently added publish_P function allows the message payload to be stored in PROGMEN. The benefits that brings would be entirely lost if they were then copied into regular memory before writing to the network. Not much to be done about that.

30 Aug 2012

I’ve been running a site for the Chale Community Project for a while now that allows members of the community monitor their energy usage online.

tl;dr; I changed some code that was using MySQL to store meter readings to use redis instead. The rest of this post describes some of the design decisions needed to make the shift.

The energy readings are published using a version of my modified CurrentCost bridge over MQTT. As well as the whole house reading, a meter might have a number of individual appliance monitors attached. Each of these readings is published to its own topic in a very simple hierarchy:


The first two topic levels identify the meter and the second two identify the individual devices attached.

With readings arriving for each device every 6 seconds, I started with simply dumping them into MySQL – with each level of the topic as a column in the table, thrown in with a timestamp. As this granularity of data isn’t needed for the site, a job could then run every 10 minutes to average the readings into 10 minute slots and put them into another table. This was good enough at the time, but it wasn’t a long term solution.

Aside from some serious normalisation of the database (adding the 40 character meter ID in every row turned out not to be the most efficient thing to do), I wanted to move to using redis for the live data. There were a few reasons for this, but primarily it was a combination of wanting to lighten the load on the (shared) MySQL server, as well as to do something more interesting with redis.

The shift from relational table to key-value store required a change in mindset as to how to store the readings. The main requirement was to minimise the work needed to store a reading as well as the work needed to calculate the average reading for each device.

Redis provides a list data type which, when using the receiving topic as the key, is a perfect fit for storing all of the readings for a particular device. But that causes problems in generating the average as the list will contain all values for the device, not just the values for a particular 10 minute timeslot.

The solution is to simply add the timeslot to the key. The timeslot is the current time, such as 2012-08-20 15:21:34 with the last three digits set to 0 – 2012-08-20 15:20:00.

This leads to keys such as:


That gets the readings stored pretty efficiently, already split into a list-per-device-per-timeslot.

The task of averaging the lists could then just look at the current time, generate the timeslot string for the previoius timeslot, retrieve all keys that begin with that string and process all the lists. But there are two problems with this approach.

First, redis doesn’t recommend using its KEYS pattern command in production. I doubt this system would ever scale to the point where the performance hit of that command became an issue, but still, it’s worth sticking to the appropriate best practices.

The second issue is that it makes a big assumption that the only timeslot that needs to be processed is the previous one. If the task fails to run for any reason, it needs to process all of the outstanding timeslots.

This needed one of the other data types available in redis; the sorted set. This allows you to have an ordered list of values that are guaranteed to be unique. When each reading arrives, the timeslot value is put into a sorted set called, imaginatively, ‘timeslots‘. As it is a set, it doesn’t matter that the same timeslot value will be put into the set thousands of times – it will only result in a single entry. As it is sorted, the task doing the averaging can process the outstanding timeslots in order.

Finally, a normal set is used to store all of the reading keys that have been used for a particular timeslot. As this is per-timeslot, the key for the set is ‘readings:‘ appended with the timeslot.

So, with that in place, the averaging task takes the follow steps.

  1. Gets the list of timeslots that there are readings for from the sorted set timeslots.
  2. For each timeslot, that is not the current one:
    1. Remove the timeslot from the set
    2. Get the list of reading keys from list readings:timeslot
    3. For each reading key, retrieve the list, calculate the average, insert it into the MySQL table and delete the list
    4. Delete the readings:timeslot list

And that is what I’ve had running for a few weeks now. No grand conclusion.

8 Jun 2012

Cosm, formerly known as Pachube, has had MQTT support for a while now and I’ve been meaning to do something with it.

One of the strengths of a publish/subscribe system, such as MQTT, is the decoupling of the producers and consumers of messages. I don’t have to modify the code that reads my energy usage from the CurrentCost meter each time I think of a new use of the data – I just add a new, self-contained, subscriber.

So to get my energy usage data into a datastream on Cosm, I could simply write a client that subscribes to the house/cc/power topic on my home broker and republishes it to the appropriate topic on the Cosm broker.

But even that’s far more work than is really needed – why write code when the broker can do it for me? RSMB lets you create bridge connections to other brokers and define how topics should be mapped between them. Now, I’ve done this plenty of times, but I still get confused with the topic-mapping syntax RSMB uses in its config file. So, for future reference, here’s how to do it.

connection cosm
try_private false
address 1883
username <COSM_API_KEY>
clientid knolleary
topic "" out house/cc/power /v2/feeds/62907/datastreams/0.csv

The topic line shows how you can achieve a one-to-one mapping between different topics – the non-obvious part being the need to have the empty “” as the topicString.

If you are using Mosquitto, an additional parameter is needed in the topic line to specify the QoS to use:

topic "" out 0 house/cc/power /v2/feeds/62907/datastreams/0.csv

Going the other way, subscribing to a feed on Cosm, is just as easy:

topic "" in inbound/cosm /v1/feeds/XXXXXX/datastreams/0.csv

Or, if you are using Mosquitto:

topic "" in 0 inbound/cosm /v1/feeds/XXXXXX/datastreams/0.csv

Note the use of /v1/ in the remote topic string this time around. As documented, this gets you just the raw data value, rather than the comma-separated time-stamp and data value returned by the /v2/ api. Of course, depending on what you’re doing, you may want the time-stamp. Equally, you may want the full JSON or <shuddder> XML formats – in which case you would change the .csv to .json or .xml respectively.

In a future post, I’ll look more at how the topic-mapping in RSMB (and Mosquitto) can be used to good effect in scaling an MQTT system.

11 Apr 2012

A question has come up a couple times on the MQTT mailing list asking how it can be used for workload distribution; given tasks being published to a topic, how can they be distributed fairly amongst the subscribers.

The short answer is that you can’t – it isn’t how things work in a publish/subscribe system. With MQTT, each subscriber is treated equal and every message published to a topic will go to every subscriber.

Despite the natural urge to shoehorn MQTT into any solution, the correct answer is to use the right tool for the job. In this case, something that provides proper queuing semantics to ensure each message is only consumed once – something like Celery for example.

That said, let’s have a look at one way you could shoehorn MQTT into this particular scenario.

First, lets define the scenario. Say, for example, you have a stream of tweets being published to MQTT. Specifically a stream of all tweets that contain the text “4sq”. Why would you do this? Well, as explained by Matt Biddulph in his Where 2012 workshop, this gives you a feed of all check-ins on foursquare that have been shared on Twitter. Each tweet contains a url which gives you more metadata about the check-in. To do anything meaningful with the tweets, you need to retrieve the url, which, as it uses the foursqure url-shortener, requires two http requests.

The rate at which the tweets arrive, 25,000 an hour Matt suggests, makes it impractical to do anything but the most basic of operations on them. It certainly isn’t practical to do the two http requests to retrieve the foursquare information for each tweet in real-time.

There isn’t any real magic needed here; a single subscriber to the topic dumps the messages onto a queue of some form that can then have as many active consumers as makes sense for the scenario.

A couple of years ago, I would have jumped straight at dumping the tweets into a MySQL table – how very 2010. These days, it’s so much easier to use something like redis.

import redis
import mosquitto

// ...
// exercise for the reader to:
// - create MQTT client, connect it and subscribe to the appropriate topic
// - create a redis client and connect it
// ...

// on_message callback for the mosquitto client
def onmqttmessage(client,message):

The consumers then become simple redis clients that redis.brpop(topic) to get the next tweet to process. If you want to get extra fancy, then redis.brpoplpush(topic,processing_queue) and a housekeeping thread lets you make sure nothing gets lost if a consumer dies unexpectedly.

Looking at this, you may ask why bother with MQTT at all? Why not just replace the MQTT publishing bits in the twitter streaming code with the redis putting bits? Well, I have used the word “shoehorn” three times in this post.

4 Apr 2012

For a demo I’m putting together at work, I needed to have tweets available on an MQTT topic. The whys and wherefores of the demo might be something I’ll write about in the future, but for now, this post is about getting the tweets into MQTT.

To be honest, it is pretty trivial. The Twitter Streaming API is well documented and can be used to get a stream of tweets based on whatever search terms you choose. As each tweet is received, it just needs to be published with an MQTT client to the topic of choice.

It’s barely worth a blog post.

I’ve put the code up on github.