MQTT Detection via Bro Script

Bro is a network intrusion detection system (NIDS) that can be extended through the use of "Broscript" a built-in scripting language that utilizes events generated by the Bro engine to exchange data with a user's handcrafted logic.

MQTT is a highly-efficient IoT protocol used for relaying all sorts of information between resource-constrained IoT devices.


Above: The smallest MQTT subscribe-all packet

As part of the WWU Advanced Network Security class, I was tasked with building a script that could parse an MQTT packet for a "Subscribe All" event - an attack where a client device begins scraping all MQTT traffic on the network - and alert the Bro admin (via an alert in Notice.log). Additionally, there was a requirement to log all MQTT subscribe events (whether "Subscribe All" or not).


Broscript requires an export section where you can define environmental settings unique to your script and extend other Bro components. Here, I'm adding a new notice and log type for Mqtt and defining a record which contains pertinent information for the mqtt.log (any subscribe) event.

module Mqtt;

export {
  redef enum Notice::Type += { Mqtt::Subscribe };

  redef enum Log::ID += { LOG };

  type Info: record {
    ts: time &log;
    src_ip: addr &log;
    src_port: port &log;
    dst_ip: addr &log;
    dst_port: port &log;
    length: count &log;
    payload: string &log;
  };
}

There are a number of different types for an Mqtt packet defined in the documentation for the protocol format - the type is given in the first nibble of the packet, and any flags are set in the lower nibble. For instance 0x82 is the subscribe type (0x80) and a constant flag (0x02).

# Returns the type of Mqtt packet we are looking at (first 4 bits)
function mqtt_parse_type(contents: string) : count {
  local mqtt_type_byte = (sub_bytes(contents, 1, 1));

  local mqtt_type_num = bytestring_to_count(hexstr_to_bytestring(string_to_ascii_hex(mqtt_type_byte)));

  # mqtt_type_byte was a 2-digit hex value, type is the MSD
  local mqtt_type = (mqtt_type_num / 16);

  # Flag is current unused - for subscribe it is reserved and set to 0x2
  local mqtt_type_flag = (mqtt_type_num % 16);

  return mqtt_type;
}

The normal way to get Mqtt length involves bit-shift operators (which Bro claims to support for the count type, but which don't work in practice). This function moves to the next byte whenever the current byte is greater than 127 (0x8F) - since this indicates that the most signifigant bit is set in the current byte.

# Mqtt uses a maximum 4-byte value to express up to 256MB of packet length
# This function parses the format for remaining_length in Mqtt
function mqtt_parse_remaining_length(contents: string) : table[string] of count {
  # Skip first byte
  local index = 2;
  local rem_length = "";

  # Local temp variables for storing place, data and decoding values
  local value = 0;
  local multiplier = 1;
  local done_flag = 0;
  local number_count = 1;

  local current_digits = 0;

  # Iterate through 4 bytes for value, checking for continuation bit
  while (number_count < 5) {
    # Get the decimal vale of the current digits
    current_digits = 
    bytestring_to_count(
      hexstr_to_bytestring(
        string_to_ascii_hex(
          sub_bytes(contents, index, 1)
        )
      )
    );

    if (current_digits > 128) {
      current_digits -= 128;
    } else {
        done_flag = 1;
    }

    value = value + (current_digits * multiplier);

    if (done_flag == 1) { break; }

    multiplier = multiplier * 128;
    number_count += 1;
    index += 1;
  }

  local return_values = table(
    ["Value"] = value,
    ["Length"] = number_count
  );

  return return_values;
}

The MQTT payload can differ based on the type of packet we're dealing with. For the subscribe packet type it is neccessary to first skip any constant bytes (these are the type and length bytes and 4 header bytes). Then truncate the last byte (a QoS byte) before returning the resolved string. This function does all of that.

# Get the payload as a string type
function mqtt_get_payload_string(contents: string, rem_length: count) : string {
  # Always skip the first two bytes (type and remaining length)
  local index = 3;

  # Skip the next two bytes, they are the variable header
  # This is when I realised I only need to parse subscribe packets
  # I would make this dynamic based on the packet but SUBSCRIBE has two bytes of VH 
  index += 2;

  # Skip the next two bytes, they are the payload length values
  # Might be useful for other types, but for subscribe I can just truncate the last byte
  index += 2;

  # Get the payload string - starts at index, length is:
  #    rem_length - 2 byte (type and remaining_length bytes)
  local payload_string = sub_bytes(contents, index, (rem_length - (index - 2)));

  return payload_string;
}

Most of the parsing is handled by the above functions, here I take that information and use it to determine what actions to take with Bro. If the packet is of type "Subscribe" then I create an entry in the log file - once that is complete I can determine whether a subscribe all indicator is present (# in the string) and raise an alert when that is the case.

# This function parses a single packet of Mqtt data
# Returns the length of the packet (from remaining length)
function mqtt_parse_packet(c: connection, remaining_contents: string) : int {
  local mqtt_type = mqtt_parse_type(remaining_contents);
  local rem_length_table = mqtt_parse_remaining_length(remaining_contents);

  local rem_length = rem_length_table["Value"];
  local rem_length_length = rem_length_table["Length"];

  # Mqtt type 8 is the SUBSCRIBE type - write to the log
  if (mqtt_type == 8) {
    local mqtt_payload_string = mqtt_get_payload_string(remaining_contents, rem_length);

    local rec: Mqtt::Info = [
      $ts = network_time(),
      $src_ip = c$id$orig_h,
      $src_port = c$id$orig_p,
      $dst_ip = c$id$resp_h,
      $dst_port = c$id$resp_p,
      $length = rem_length + 2,
      $payload = mqtt_payload_string
    ];

    Log::write(Mqtt::LOG, rec);

    # Now check if the payload string has a # which indicates a subscribe all
    if ("#" in mqtt_payload_string) {
      NOTICE([
        $note = Mqtt::Subscribe,
        $msg = fmt("%s attempts to subscribe to all topics.", c$id$orig_h)
      ]);
    }
  }

  return (rem_length + rem_length_length + 1);
}

The above function works only when passed the start of a single packet - so I used this helper function to increment through the payload string.

# This function deals with multiple packets being present in a single
# contents string and indexes through the string for the Mqtt parser 
function mqtt_parse_contents(c: connection, contents: string) {
  # Use 1 for index start because 0 and 1 are Bro first string index
  # Using 0 will create an off-by-one for first packet
  local index = 1;

  while (index <= |contents|) {

    local rem_length = mqtt_parse_packet(c, sub_bytes(contents, index, |contents|));

    index = index + rem_length;
  } 
}

The packet_contents event is incredibly resource-hungry, and so is not suitable for production environments. For the assignment I am parsing short pcap files - the efficieny loss is not relevant on even a fairly lowly single-core machine. The last chunk (bro_init) simply creates a stream for writing the log file - a mandatory step in the creation of additional files.

event packet_contents (c: connection, contents: string) {
  if (c$id$resp_p == 1883/tcp) {
    mqtt_parse_contents(c, contents);
  }
}

event bro_init() &priority=5 {
  Log::create_stream(Mqtt::LOG, [$columns=Info, $path="mqtt"]);
}