A Pico-Based Platform for ESProto Sensors

Connected things need a platform to accomplish anything more than sending data. Picos make an ideal system for providing intelligence to connected devices. This post shows how I did that for the ESProto sensor system and talks about the work my lab is currently doing to make that easier than ever.

ESProto

ESProto is a collection of sensor devices based on the ESP8266, an Arduino-based chip with a built-in WiFi module. My friend Scott Lemon is the inventor of ESProto. Scott gave me a couple of early prototypes to play with: a simple temperature sensor and a multi-sensor array (MSA) that includes two temperature transducers (one on a probe), a pressure transducer, and a humidity transducer.

ESProto Mutli-Sensor Array
ESProto Multi-Sensor Array
ESProto Temperature Sensor
ESProto Temperature Sensor

One of the things I love about Scott's design is that the sensors aren't hardwired to a specific platform. When setting up a sensor unit, you provide a URL to which the sensor will periodically POST (via HTTP) a standard payload of data. In stark contrast to most of the Internet of Things products we see on the market, ESProto let's you decide where the data goes.1

Setting up an ESProto sensor device follows the standard methodology for connecting something without a user interface to a WiFi network: (1) put the device in access point mode, (2) connect to it from your phone or laptop, (3) fill out a configuration screen, and (4) reboot. The only difference with the ESProto is that in addition to the WiFi configuration, you enter the data POST URL.

Once configured, the ESProto periodically wakes, makes it's readings, POSTs the data payload, and then goes back to sleep. The sleep period can be adjusted, but is nominally 10 minutes.

The ESProto design can support devices with many different types of transducers in myriad configurations. Scott anticipates that they will be used primarily in commercial settings.

Spimes and Picos

A spime is a computational object that can track the meta data about a physical object or concept through space and time. Bruce Sterling coined the neologism as a contraction of space and time. Spimes can contain profile information about an object, provenance data, design information, historical data, and so on. Spimes provide an excellent conceptual model for the Internet of Things.

Picos are persistent compute objects. Picos run in the pico engine and provide an actor-model for distributed programming. Picos are always on; they are continually listening for events on HTTP-based event channels. Because picos have individual identity, persistent state, customizable programming, and APIs that arise from their programming, they make a great platform for implementing spimes.2

Because picos are always online, they are reactive. When used to create spimes, they don't simply hold meta-data as passive repositories, but rather can be active participants in the Internet of Things. While they are cloud-based, picos don't have to run in a single pico engine to work together. Picos employ a hosting model that allows them to be run on different pico engines and to be moved between them.

In our conception of the Internet of Things, we create a spime for each physical object, whether or not it has a processor. In the case of ESProto, we create a pico-based spime for each ESProto device:

MSA_pico
An ESProto MSA connected to its spime

Spimes can also represent concepts. For organizing devices we not only represent the device itself with a pico-based spime, we also create a spime for each interesting collection. For example, we might have two spimes, representing a multi-sensor array and a temperature sensor. If these are both installed in a hallway, we could create a spime representing the collection of sensors in the hallway. This spime stores and processes meta-data for the hallway, including processing and aggregating data readings from the various sensors in the hallway.

MSA_Temp_Hallway
MSA and Temperature sensors in a Hallway collection

Spimes can belong to more than one collection. The same two sensors that are part of the hallway collection might also, for example, be part of a battery collection that is collecting low battery notifications and is used by maintenance to ensure the batteries are always replaced. The battery collection could also be used to control the battery conservation strategies of various sensor types. For example, sensors could increase their sleep time after they drop below a given battery level. The battery spime would be responsible for managing these policies and ensuring all the spimes representing battery-powered devices were properly configured.

MSA_Temp_Hallway_Many
Multiple MSA and Temperature Sensors with two in the hallway collection

Spimes and ESProto

Pico-based spimes provide an excellent platform for making use of ESProto and other connected devices. We can create a one-to-one mapping between ESProto devices and a spime that represents them. This has several advantages:

  • things don't need to be very smart—a low-power Arduino-based processor is sufficient for powering the ESProto, but it cannot keep up with the computing needs of a large, flexible collection of sensors without a significant increase in cost. Using spimes, we can keep the processing needs of the devices light, and thus inexpensive, without sacrificing processing and storage requirements.
  • things can be low power—the ESProto device is designed to run on battery power and thus needs to be very low power. This implies that they can't be always available to answer queries or respond to events. Having a virtual, cloud-based persona for the device enables the system to treat the devices as if they are always on, caching readings from the device and instructions for the device.
  • things can be loosely coupled—the actor-model of distributed computing used by picos supports loosely coupled collections of things working together while maintaining their independence through isolation of both state and processing.
  • each device and each collection gets its own identity—there is intellectual leverage in closely mapping the computation domain to the physical domain3. We also gain tremendous programming flexibility in creating an independent spime for each device and collection.

Each pico-based spime can present multiple URL-based channels that other actors can use. In the case of ESProto devices, we create a specific channel for the transducer to POST to. The device is tied, by the URL, to the specific spime that represents it.

Using ESProto with Picos

My lab is creating a general, pico-based spime framework. ESProto presents an excellent opportunity to design the new spime framework.

For this experiment, I used manually configured picos to explore how the spime framework should function. To do this, I used our developer tools to create and configure a pico for each individual sensor I own and put them in a collection.

I also created some initial rulesets for the ESProto devices and for a simple collection. The goal of these rulesets is to test readings from the ESProto device against a set of saved thresholds and notify the collection whenever there's a threshold violation. The collection merely logs the violation for inspection.

The Device Pico

I created two rulesets for the device pico: esproto_router.krl and esproto_device.krl. The router is primarily concerned with getting the raw data dump from the ESProto sensor and making sense of it using the semantic translation pattern. For example, the following rule, check_battery, looks at the ESProto data and determines whether or not the battery level is low. If it is, then the rule raises the battery_level_low event:

rule check_battery {
  select when wovynEmitter thingHeartbeat 
  pre {
    sensor_data = sensorData();
    sensor_id = event:attr("emitterGUID");
    sensor_properties = event:attr("property");
  }
  if (sensor_data{"healthPercent"}) < healthy_battery_level
  then noop()
  fired {
    log "Battery is low";
    raise esproto event "battery_level_low"
      with sensor_id = sensor_id
       and properties = sensor_properties
       and health_percent = sensor_data{"healthPercent"}
       and timestamp = time:now();
  } else {
    log "Battery is fine";    
  }
}

The resulting event, battery_level_low, is much more meaningful and precise than the large data dump that the sensor provides. Other rules, in this or other rulesets, can listen for the battery_level_low event and respond appropriately.

Another rule, route_readings, also provides a semantic translation of the ESProto data for each sensor reading. This rule is more general than the check_battery rule, raising the appropriate event for any sensor that is installed in the ESProto device.

rule route_readings {
  select when wovynEmitter thingHeartbeat
  foreach sensorData(["data"]) setting (sensor_type, sensor_readings)
    pre {
      event_name = "new_" + sensor_type + "_reading".klog("Event ");

     }
     always {
       raise esproto event event_name attributes
	 {"readings":  sensor_readings,
	  "sensor_id": event:attr("emitterGUID"),
	  "timestamp": time:now()
	 };
     }
}

This rule constructs the event from the sensor type in the sensor data and will thus adapt to different sensors without modification. In the case of the MSA, this would raise a new_temperature_reading, a new_pressure_reading, and a new_humidity_reading from the sensor heartbeat. Again, other interested rules could respond to these as appropriate.

The esproto_device ruleset provides the means of setting thresholds. In addition, the check_threshold rule listens for new_*_readingevents to check for threshold violations:

rule check_threshold {
  select when esproto new_temperature_reading
	   or esproto new_humidity_reading
	   or esproto new_pressure_reading
  foreach event:attr("readings") setting (reading)
    pre {
      event_type = event:type().klog("Event type: ");

      // thresholds
      threshold_type = event_map{event_type}; 
      threshold_map = thresholds(threshold_type);
      lower_threshold = threshold_map{["limits","lower"]};
      upper_threshold = threshold_map{["limits","upper"]};

      // sensor readings
      data = reading.klog("Reading from #{threshold_type}: ");
      reading_value = data{reading_map{threshold_type}};
      sensor_name = data{"name"};

      // decide
      under = reading_value < lower_threshold;
      over = upper_threshold < reading_value;
      msg = under => "#{threshold_type} is under threshold: #{lower_threshold}"
	  | over  => "#{threshold_type} is over threshold: #{upper_threshold}"
	  |          "";
    }
    if(  under || over ) then noop();
    fired {
      raise esproto event "threshold_violation" attributes
	{"reading": reading.encode(),
	 "threshold": under => lower_threshold | upper_threshold,
	 "message": "threshold violation: #{msg} for #{sensor_name}"
	}	      

    }
}

The rule is made more complex by its generality. Any given sensor can have multiple readings of a given type. For example, the MSA shown in the picture at the top of this post contains two temperature sensors. Consequently, a foreach is used to check each reading for a threshold violation. The rule also constructs an appropriate message to deliver with the violation, if one occurs. The rule conditional checks if the threshold violation has occurred, and if it has, the rule raises the threshold_violation event.

In addition to rules inside the device pico that might care about a threshold violation, the esproto_device ruleset also contains a rule dedicated to routing certain events to the collections that the device belongs to. The route_to_collections rule routes all threshold_violation and battery_level_low events to any collection to which the device belongs.

rule route_to_collections {
  select when esproto threshold_violation
	   or esproto battery_level_low
  foreach collectionSubscriptions() setting (sub_name, sub_value)
    pre {
      eci = sub_value{"event_eci"};
    }
    event:send({"cid": eci}, "esproto", event:type())
      with attrs = event:attrs();
}

Again, this rule makes use of a foreach to loop over the collection subscriptions and send the event upon which the rule selected to the collection.

This is a fairly simple routing rule that just routes all interesting events to all the device's collections. A more sophisticated router could use attributes on the subscriptions to pick what events to route to which collections.

The Collection Pico

At present, the collection pico runs a simple rule, log_violation, that merely logs the violation. Whenever it sees a threshold_violation event, it formats the readings and messages and adds a timestamp4:

rule log_violation {
  select when esproto threshold_violation
  pre {
    readings = event:attr("reading").decode();
    timestamp = time:now(); // should come from device
    new_log = ent:violation_log
		   .put([timestamp], {"reading": readings,
				      "message": event:attr("message")})
		   .klog("New log ");
  }
  always {
    set ent:violation_log new_log
  }
}

A request to see the violations results in a JSON structure like the following:

{"2016-05-02T19:03:36Z":
     {"reading": {
         "temperatureC": "26",
      	 "name": "probe temp",
	 "transducerGUID": "5CCF7F0EC86F.1.1",
         "temperatureF": "78.8",
         "units": "degrees"
         },
       "message": "threshold violation: temperature is over threshold of 76 for probe temp"
     },
 "2016-05-02T20:03:18Z":
     {"reading": {
	 "temperatureC": "27.29",
	 "name": "enclosure temp",
	 "transducerGUID": "5CCF7F0EC86F.1.2",
	 "units": "degrees",
	 "temperatureF": "81.12"
         },
      "message": "threshold violation: temperature is over threshold of 76 for enclosure temp"
     },
     ...
}

We have now constructed a rudimentary sensor platform from some generic rules. The platform accommodates multiple collections and records threshold violations for the any transducer ESProto platform accepts.

MSA_pico_threshold
A ESProto device, associated pico, and collection pico logging a threshold violation

A more complete system would entail rules that do more than just log the violations, allow for more configuration, respond to low battery conditions, and so on.

Spime Design Considerations

The spime framework that we are building is a generalization of the ideas and functionality of that developed for the Fuse connected car platform. At the same time is leverages the learnings of the Squaretag system.

The spime framework will make working with devices like ESProto easier because developers will be able to define a prototype for each device type that defines the channels, rulesets, and initialization events for a new pico. For example, the multi-sensor array could be specified using a prototype such as the following:

{"esproto-msa-16266":
    {"channels": {"name": "transducer",
                  "type": "ESProto"
                 },
     "rulesets": [{"name": "esproto_router.krl",
                   "rid": "b16x37"
                  },
     		  {"name": "esproto_device.krl",
                   "rid": "b16x38"
                  }
                 ],
     "initialization": [{"domain": "esproto",
                         "type": "reset"
                        }
                       ]
    },
 "esproto-temp-2788": {...},
 ...
}

Given such a prototype, a pico representing the spime for the ESProto multi-sensor array could be created by the new_child() rule action5:

wrangler:new_child() with
  name = "msa_00" and
  prototype = "esproto-msa-15266"

Assuming a collection for the hallway already existing, the add_to_collection() action would put the newly created child in the collection6:

spimes:add_to_collection("hallway", "msa_00") with
  collection_role = "" and
  subscriber_role = "esproto_device"

This action would add the spime named "msa_00" to the collection named "hallway" with the proper subscriptions between the device and collection.

Conclusions

This post has discussed the use of picos to create spimes, why spimes are a good organizing idea for the Internet of Things, and demonstrated how they would work using ESProto sensors as a specific example. While the demonstration given here is not sufficient for a complete ESProto system, it is a good foundation and shows the principles that would be necessary to use spimes to build an ESProto platform.

There are several important advantages to the resulting system:

  • Using the spime framework on top of picos is much easier than creating a backend platform from scratch.
  • The use of picos with their actor-model of distributed programming eases the burden associated with programming large collections of indepedent processes.
  • The system naturally scales to meet demand because of the architecture of picos.
  • The use of rules allows picos to naturally layer on functionality. Customizing a pico-based spime is easily accomplished by installing additional rulesets or replacing the stock rulesets with custom implementations. Each pico has a unique set of rulesets and consequently a unique behavior and API.
  • The hosted model of picos enables them to be created, programmed, and operated on one platform and later moved, without loss of functionality or and necessary reprogramming, to another platform. This supports flexibility and substitutability.

Notes:

  1. I believe this is pretty common in the commercial transducer space. Consumer products build a platform and link their devices to it to provide a simple user experience.
  2. The Squaretag platform was implemented on an earlier version of picos. Squaretag provided metadata about physical objects and was our first experiment with spimes as a model for organizing the Internet of Things.
  3. I am a big fan of domain-driven design and believe it applies as much to physical objects in the Internet of Things as it does to conceptual objects in other programming domains.
  4. Ideally, the timestamp would come from the device pico itself to account for message delivery delays and the collection would only supply one if it was missing, or perhaps add a "received" timestamp.
  5. The wrangler prefix identifies this action as being part of the pico operating system, the home of the operations for managing pico lifecycles.
  6. The spimes prefix identifies this action as the part of the framework for managing spimes. Collections are a spime concept.


Self-Sovereign Identity and Legal Identity

Passport

The Peace of Westphalia, ending the 30 Year's War in 1648 created the concept of Westphalian sovereignty, the principle of international law that "each nation state has sovereignty over its territory and domestic affairs, to the exclusion of all external powers, on the principle of non-interference in another country's domestic affairs, and that each state (no matter how large or small) is equal in international law."

The next century saw many of these states begin civil registration for their citizens. These registrations, from which our modern system of birth certificates springs, became the basis for personal identity and legal identity in a way which conflated these two concepts.

Birth certificates are both the source of identity and proof of citizenship. People present proof of civil registration for many purposes. The birth certificate is thus the basis for individual identity in most countries. We use our physical control of a piece of paper to prove who we are and, springing from that, our citizenship. Civil registration has become the foundation for how states relate to the citizens. As modern nation states have become more and more powerful in the lives of their citizens, civil registration and its attendant legal identity have come to play a larger and larger role in our lives.

Descartes didn't say "I have a birth certificate, therefore, I am." We are, obviously, more than a legal identity. Nevertheless, the civil registration has been with us for almost four centuries and most of us cannot conceive of any basis for trusted identity independent of civil registration.

And yet, presently, 1.8 billion people are without this basic form of identity. As a result, they have difficulty getting basic government services. Most of these people are refugees displaced by war or territorial disputes, victims of famine or ethnic cleansing, outcasts from society, or victims of unscrupulous employers, smugglers, or organized crime. People who want to help them have difficulty because without legal identity they are illegible to state apparatus.

Without a birth certificate, people are in a bind. They have nothing upon which they can establish a legal identity and become legible to governments. And since birth certificates link identity and citizenship, both of these problems have to be solved at once, creating a paradox for authorities trying to help these unidentified people. In this system, they can't be made legible, and thus able to call on governments for aid or protections, without also being granted citizenship of some kind.

We are at a point in the development of identity that it is possible to develop and deploy technologies that allow individuals to create a self-sovereign basis for their identity independent from civil registration.

Such systems allow us to tease apart the purposes of the birth certificate by recognizing a self-sovereign identity independent of the proof of citizenship. This doesn't, by itself, solve the problem of providing legal identity since the self-sovereign identity is self-asserted. But it does provide a foundation upon which a legal identity could be built: specifically it is an identifier that a person can prove they control. Constructing a legal identity on this self-sovereign identity is possible, but would require changes to existing statutes, rules, policy, and processes.

ID2020 is a summit being held at the UN in May with the goal to "by 2030, provide legal identity to all, including birth registration." For this to work, I believe we must succeed in recognizing one or more sources of self-sovereign identity that people can use to bootstrap the process. Such systems must be trustworthy enough that governments will be willing to use them as a basis for a legal identity. Governments must be persuaded to accept this self-declared identity as the basis for establishing a relationship to the state.

The self-sovereign identity will not be all that is needed. The self-sovereign identity won't, at first, be associated with any validated claims. It provides only the identifier that the person can prove they have control over. Beyond that, legal systems will have to provide a route for using that identity to validate the attributes needed for a person to form a recognized relationship with various governments and their agencies.

Please note that this doesn't require that the self-sovereign identity be controlled by the state, only that it be trustable by the state. This also doesn't mean that this same self-sovereign identity wouldn't be usable as the basis for identity in other administrative systems. And using the self-sovereign identity in administrative systems need not diminish its independence in any way.

I'm very excited about developments in self-sovereign identity. There is much to do, but I feel like I can finally see a way forward on an idea many in the identity community have been working on for a decade: to understand how people can use identity in a way that doesn't always rely on some administrative authority to grant that identity.


Note: Christopher Allen's The Path to Self-Sovereign Identity got me thinking about writing this post when he circulated an early draft of his post last week. As he points out, a lot of people have been working on this for many years. User-centric identity was the word we used in the early phrase to refer to these ideas. The Internet Identity Workshop, which is holding it's 22nd meeting this week, was founded to explore user-centric identity on the Internet. As I said, many people in the identity community have been pushing this idea for a long time. And there's finally some light at the end of the tunnel.

Chris is also hosting a Rebooting the Web of Trust workshop in New York in the two days after the UN Summit. This is the second #RebootingWebOfTrust Design Workshop on decentralized identity technologies. The first produced a number of white papers on these ideas.


We're Teaching Programming Wrong

Rule

We're teaching programming the wrong way. Interesting study of how people naturally express solutions to problems concludes that starting with imperative programming languages may not be the best way to teach programming skills. What works? Event-based and rule systems, naturally. Maybe we ought to use KRL for introductory programming. I'm willing to try it.

The majority of the statements written by the participants were in a production-rule or event-based style, beginning with words like if or when. However, the raters observed a significant number of statements using other styles, such as constraints, other declarative statements (that were not constraints), and imperative statements.

The dominance of rule- or event-based statements suggests that a primarily imperative language may not be the most natural choice. One characteristic of imperative languages is explicit control over program flow. Although imperative languages have if statements, they are evaluated only when the program flow reaches them. The participants’ solutions seem to be more reactive, without attention to the global flow of control.

Studying the Language and Structure in Non-Programmers’ Solutions to Programming Problems


Properties of Permissioned and Permissionless Blockchains

chains

According to Robert Sams, there are three properties we want from a decentralized ledger system:

  1. Avoid forgery He calls these "sins of commission".
  2. Avoid censorship He calls these "sins of omission". Censorship might be hiding transactions, but its more often going to be regulatory control over transactions.
  3. Avoid deletion He calls these "sins of deletion". This amounts to reversing transactions after they've been written in the ledger.

Sams's thesis is that this is kind of like the CAP theorem because we get two of these. Since (1) is necessary in almost all cases and quite easily solved through cryptographic means, we really get to choose between the other two.

Permissionless blockchains (like the ones in Bitcoin, Namecoin, etc.) optimize (2) over (3) while permissioned blockchain systems (like the ones in Ripple, Evernym, etc) optimize (3) over (2).

Permissionless blockchains are distinguished by their use of a proof-of-work (or proof-of-stake) to avoid Sybil attacks caused by the cheap pseudonym problem. Permissionless blockchain systems exhibit features that make banks and other existing institutions leery of them. They are hard to use in a regulatory regime where authorities want to exert control over them. This, for blockchain enthusiasts is a feature, not a bug. There are places where (2) is more important than (3). This resembles cash and most of the disadvantages people raise about bitcoin are similar to those they'd raise about cash.

Permissioned blockchains have a governance process that selects validators and thus don't have to survive Sybil attacks since the validators are known--there are no pseudonymous validators. Permissioned blockchains are useful where the ledger bumps up against physical, so-called "off chain" transactions and have to make claims about things about, say a property title. They value (3) over (2) in that case because (a) we have to rely on the legal mechanism of the state to enforce the asset's ownership since it's off-chain and (b) those mechanisms won't rely on transactions that might be reversed (even if unlikely and rare) by anonymous verifiers who can't be held legally responsible for their choices.

Permissionless blockchain cheerleaders will say "write the title in the chain". But there are a bunch of people with guns who have a monopoly on violence (i.e. Governments) who are unlikely to relinquish their control of property records, etc. blithely.

So the world will likely be full of a mix of permissioned and perimissionless ledger systems that don't interoperate or only do so uneasily for some time. The important thing to remember is that they can co-exist.


Reactive Programming Patterns: Examples from Fuse

Radioactive Minifig

Microservices are hot and, consequently, so is reactive programming. Reactive programming, particularly the actor-model, is a natural way to build systems that have the asynchrony and data isolation that good microservices demand. Reactive programming requires different skills from what most programmers learn to complete more traditional programming projects. One way to understand reactive programming is to study common reactive programming patterns.

I realized as I was getting ready to teach my CS462 class about reactive programming patterns, that I'd used each of the patterns I wanted to talk about as part of the Fuse Connected Car project we built at Kynetx in 2014. The Fuse API code is open-source and available on Github. So, Fuse is an excellent resource for showing off these important patterns.

After some background on the technical details of Fuse and the system it's built in, we'll review a selection of reactive programming patterns and show code examples from Fuse that illustrate the pattern for both intra-pico and pico-to-pico interactions.

Fuse Background

Fuse is a connected-car platform. I've written about extensively on Fuse. For the purposes of this post, it's important to understand the following:

  • Fuse is build using a reactive programming system called picos (short for persistent-compute objects). Picos are based on the actor model and most pico interaction is event-driven.
  • Picos use a set of pre-built services called Wrangler to manage things like creating and destroying picos, pico-to-pico subscriptions, storing profiles, and so on.
  • Pico functionality is determined by the rules installed in the pico.
  • Rules are collected into rulesets that can share function definitions.
  • Each ruleset has a separate persistent key-value store from every other ruleset. This is manifest in persistent variables. As such most pico-based system have no need of an external database.
  • Rules are programmed in a language called KRL.
  • When we create a pico, it is automatically endowed with an event bus that connects all rules installed in the pico.
  • Fuse creates a separate pico for each vehicle.
  • Wrangler provides a ruleset that functions as a persistent data store for the entire pico called the PDS. The PDS provides a standard profile for each pico. Fuse stores all of the vehicle's configuration and identity information in the pico profile.
  • Other vehicle data is stored by the individual Fuse service. For example, the trip service stores information about trips, the fuel service stores information about fuel purchase, and so on.
  • Rules can raise events on the pico's internal event bus, sent events to other picos, and use HTTP to interface with other Web-based APIs.

Not only do we create a pico for each vehicle, but we also create one for each owner, and one to represent the fleet. The following diagram shows schematically how these various picos are related to each other.

Fuse System Pico Graph
Fuse pico arrangement

Fuse uses a service called Carvoyant to manage devices and provide an API that is used to get vehicle data. The Carvoyant API is a well-designed RESTful API that uses OAuth for user authorization.

Carvoyant accounts correspond roughly to the concept of a fleet in Fuse. Each account has a collection of vehicles as well as information about the owner. Each vehicle pico in Fuse is linked through a set of event subscriptions (using a specialization of the webhook pattern called the Evented API) to the vehicle in Carvoyant as shown in the following diagram:

Fuse fleet Carvoyant correspondence
Correspondence between Fuse Fleet and Carvoyant Account

Whenever the device in a vehicle detects something interesting (e.g. a change in ignition status, entering or leaving a geofence, key parameters like fuel level going over or under a threshold, and so on), it communicates that state change to Carvoyant via a cellular network. Carvoyant is programmed to raise that event, via an event subscription to the corresponding vehicle pico.

Intra-Pico Event Patterns

Every pico has an internal event bus. Any event sent to the pico via one of its event channels is raised on the internal bus. Any rule installed in the pico that selects on that event will run in response to the raised event. Rules in the pico can also raise events to the internal event bus.

There are several common patterns used by KRL rules to process events. Here's a list of the patterns discussed in this section:

Raising Events

Any rule can raise an event on the internal event bus using the raise command. Any rule installed in the pico can see that event and if it selects on that event, will run.

raising events
Raising Events

Here's the set_debug_pref rule from the Fuse Owner ruleset showing a raise command in the postlude:

rule set_debug_pref {
  select when fuse new_debug_value
  pre {
    new_val = event:attr("debug_value");
  }
  always {
    raise pds event "new_settings_attribute" 
          with setRID   = meta:rid() // this rid
           and setAttr  = "debugPreference"
           and setValue = new_val
  }
}

Commentary

  • This rule shows a new_settings_attribute event being raised with a specific set of event attributes.
  • The raised event can contain computer event attributes
  • This rule postlude is introduced with the keyword always meaning the postlude statements will always execute if the rule is selected.

In addition to the with..and... syntax shown here, attributes can be sent with the event using the attributes keyword which accepts a map (hash) that gives the attribute names and values1. If we rewrote the preceding rule to use that format, it would look like this:

rule set_debug_pref {
  select when fuse new_debug_value
  pre {
    settings =  {"setRID"   : meta:rid(), // this rid
                 "setAttr"  : "debugPreference",
                 "setValue" : event:attr("debug_value")
                };
  }
  always {
    raise pds event "new_settings_attribute" attributes settings
  }
}

There's no functional difference between the two forms. The latter allows the entire set of attributes to be computed (including attribute names) rather than just the values.

As you read the following examples that use raise, keep in mind that raise puts events on the pico's internal event bus. Both the event bus and persistent storage of each pico are isolated from those of other picos.

Rule Chaining and Abstraction

Often when a rule raises an event, the goal is to signal a general state change in the pico. For example, a rule that is updating the profile might raise the profile_updated event. Any other rule in the pico that is interested in knowing when the profile has been updated can listen for that event and do any necessary processing.

Sometimes, however, the reason for raising an event is more tactical. We simply need to do something that takes two or more rules. This is called rule chaining.

raising events
Rule Chaining

In rule chaining, rule A is raising an event with the explicit goal of causing rule B to be selected.

As an example of this, consider the following two rules (slightly simplified) from the Fuse Fuel ruleset:

rule record_fuel_purchase {
  select when fuse new_fuel_purchase
  pre {
    // new records can't have id
    rec = event:attrs().delete(["id"]); 
  }  
  send_directive("Recording fill up") with rec = rec
  fired {
    raise fuse event "updated_fuel_purchase" attributes rec; 
  }
}
  
rule update_fuel_purchase {
  select when fuse updated_fuel_purchase
  pre {

    // if no id, assume new record and create one
    id = event:attr("id") || random:uuid();  
    // build a fuel purchase record
    ...
  }
  if( not volume.isnull() ...
   && not id.isnull()
    ) then {
      send_directive("Updating fill up") with rec = rec
  }
  fired {
    set ent:fuel_purchases{id} rec;
    raise fuse event "fuel_purchase_saved";
  }
}

Commentary:

  • The second rule, updated_fuel_purchase, does all the work of storing a fuel purchase record. It's designed to either update an existing record or create a new record when the id attribute is null.
  • The first rule, record_fuel_purchase, is for new fuel purchase records, but all it does is ensure that the incoming record doesn't for some reason, have an id.
  • record_fuel_purchase raises an explicit event to chain to updated_fuel_purchase.

Rule chaining is one way for a rules to avoid repeating logic (logical coupling). There's only one place where a fuel purchase record is created and stored even though there are two different events that signal a new_fuel_purchase or an updated_fuel_purchase. We're using rule chaining to abstract the updated_fuel_purchase event.

Guard Rules and Idempotence

One of the most important reasons to chain rules is to guard the action a rule takes to ensure idempotence. Pico-based systems are easier to program when responses to an event are idempotent, meaning that they can run multiple times without cumulative effect.

Many operations are idempotent (i.e. installing a ruleset in a pico over and over only results in the ruleset being added once). For operations that aren't naturally idempotent, we can make the rule idempotent using the rule's guard condition. Using a guard condition we can ensure the rule only fires when specific conditions are met.

There may be reasons why just using the guard condition in the rule isn't satisfactory. First, there are several library calls that are poorly designed and cause side effects in the prelude.2. Second, even when side-effecting code isn't an issue, we might not want to execute an entire, computationally heavy prelude before checking the condition.

guard rule pattern
The Guard Rule Pattern

A guard rule uses rule chaining so that the guard rule can test the guard condition and only raise an event for the second rule when it passes. The guard rule:

  1. responds to the triggering event
  2. tests a condition that ensures idempotence
  3. raises an explicit event in the postlude for which the second rule is listening

Here's an example from the Fuse Owner ruleset. This rule is called when a new Fuse owner account is created.

rule kickoff_new_fuse_instance {
  select when fuse need_fleet
  pre {
    fleet_channel = pds:get_item(common:namespace(),"fleet_channel");
  }
  if(fleet_channel.isnull()) then
    send_directive("requesting new Fuse setup");
  fired {
    raise explicit event "need_new_fleet" 
	  with _api = "sky"
	   and fleet = event:attr("fleet") || "My Fleet";
  }
}

Commentary:

  • When a new account is created, we want to create a fleet pico. But we want that action to be idempotent since any Fuse account should have one and only one fleet pico.
  • The rule that creates the fleet pico, create_fleet, is listening for the need_new_fleet event.
  • We cannot enforce idempotence with the conditional statement in the create_fleet rule because the common:factory() call has the side effect of creating a new child pico.
  • We use the existence of the fleet channel to test for whether or not the owner has already been initialized. If the fleet channel is null we assume the owner pico needs to be initialized.

Self-Healing Systems

Self-healing systems are a hallmark of robust system design. Rules can watch triggering events, check pico state, and raise an event that signals the problem. The trigger could be some regularly occurring event or a scheduled event. Regardless, we only want to signal a problem if one exists. A guard rule is a natural fit here.

self healing
Guard Rules and Self Healing

In this pattern, one or more rules listen for the event that signals a problem and raise an event that signals the problem so other rules can take some action to fix the break.

Here's an example from Fuse that illustrates a check for a problem that needs to be corrected. Fuse subscribes to events from Carvoyant like ignition_status that signal some change in the vehicle. These subscriptions are set up when the vehicle is initialized. They can become dirty for various reasons:

  • The vehicle event channel could change and the subscriptions to Carvoyant need to be updated to send events to the pico over the new channel.
  • The design of Fuse might change necessitating new or different event subscriptions to Carvoyant.
  • Something else might go wrong in either Fuse of Carvoyant that requires the subscriptions be resynchronized.

Since picos are independent system they cannot be updated en masse using, for example, an SQL statement. Rather we require that they fix themselves when a fix is needed. For example, adding new functionality to Fuse sometimes requires changing the event subscriptions that a Fuse vehicle pico has to the Carvoyant API. The Fuse system is design to automatically fix subscriptions so that they match the specification of needed Carvoyant events.

rule check_subscriptions {
  select when fuse subscription_check
  pre {
    vid = carvoyant:vehicle_id(); 
    my_subs = carvoyant:getSubscription(vid);
  }
  if( not subscriptionsOk(my_subs) ) then
  {
    send_directive("subscriptions not OK")
      with my_subscriptions = my_subs
       and should_have = should_have
  }
  fired {
    raise carvoyant event dirty_subscriptions;
  }
}

Commentary:

  • The primary work is done by the subscriptionsOk() predicate. That predicate checks all the needed event subscriptions (from a list) against the existing subscriptions and ensures they're all there.
  • The rules that respond to the dirty_subscriptions event use the same list to find any missing or bad subscriptions and recreate them.

Event Splitting

Event splitting is useful when a single event can signal different state changes based on its attributes or the current state of the receiving pico.

event splitting pattern
The Event Splitting Pattern

For example, in Fuse, the Carvoyant system sends an ignitionStatus event when the the ignition key is turned on or off. The event includes an event attribute, status, that indicates how the ignition state changed.

Fuse treats these two states differently. The ignition being turned on indicates the start of a trip. The Carvoyant system manages trip tracking using telemetry data from the in-vehicle unit, so Fuse doesn't have to do anything with a trip when the ignition is turned on. Fuse does, however, use this as a signal to do a few clean-up and self-healing procedures. For example, the system checks for missed trips when the vehicle starts up and grabs the current vehicle status. We use the ignition being turned on as a convenient (and usually frequent) signal to accomplish tasks while the vehicle pico is not otherwise busy.

On the other hand, when the ignition is turned off, Fuse processes the trip that has just completed, saving it for later use. The ignitionStatus event doesn't contain all of the trip information, it merely signals the state change. Fuse uses Carvoyant's API to gather the detailed trip information, process it, and store the results.

The following two rules from the Fuse Carvoyant ruleset show how Fuse distinguishes and processes these two different state changes:

rule process_ignition_on  {  
  select when carvoyant ignitionStatus where status eq "ON"
  pre {
    ignition_data = normalize_carvoyant_attributes(event:attrs());
  }
  noop();
  always {
    raise fuse event "trip_check" with duration = 2; // recover lost trips
    raise fuse event ignition_processed attributes ignition_data;
  }
}

rule process_ignition_off {  
  select when carvoyant ignitionStatus where status eq "OFF"
  pre {
    tid = event:attr("tripId");
    ignition_data = normalize_carvoyant_attributes(event:attrs());
  }
  if not tid.isnull() then noop();
  fired {
    raise fuse event "new_trip" with tripId = tid;
    raise fuse event ignition_processed attributes ignition_data;
  } else {
    error warn "No trip ID " + ignition_data.encode();
  }
}

Commentary:

  • We use an attribute expression (signaled by the where keyword) in the event expression to split the event and process it differently.
  • The attribute expression creates an event expression that only selects when the specified condition is true.
  • In this case there are only the two options, but you could split an event multiple ways using this technique.

This example uses the event expression to split the incoming event, but you could do it in the rule body as well, computing different events to be raised based on the incoming event, the current state or the pico, or even external information from an API call.

Event Logging

Event logging is a great example of the power of loose coupling in event-based systems. A logging rule can log important information for notification, monitoring, or support on the side without inserting the logging logic into the primary event flow. You can add multiple logging rules, change them as needed, or delete them without touching the base system.

event logging pattern
The Event Logging Pattern

The finalize_new_users rule in the Fuse Owner ruleset demonstrates this:

rule finalize_new_users {
  select when fuse new_fleet_initialized
	  and pds profile_updated
  pre {
    me = pds:get_all_me();
    my_email =  me{"myProfileEmail"} || random:uuid();
    msg = <<
A new fleet was created for #{me.encode()} with ECI #{meta:eci()}
>>;
  }
  sendgrid:send("Kynetx Fleet Team", "fuse-support@kynetx.com",
                "New Fuse Fleet", msg);
  always {
    set app:fuse_users{my_email} makeAcctRecord(me)
  }
}

Commentary:

  • This rule is selected the first time a new Owner pico is set up (signalled by the new_fleet_initialized event).
  • The event expression also requires the profile_updated event (using the and event operator) so that it won't run until after initialization is complete.
  • The rule sends an email to fuse-support using Sendgrid and records a record in an application persistent variable (app:fuse_users).

Note that because the pico's event bus is isolated, there's no danger that a new_fleet_initialized event will combine with a profile_updated event from another pico and mistakenly select. This rule selects only when both those events happen (in any order) in this pico.

Event Errors

Rules can raise error events in the rule postlude using the error statement. Rules that detect an error condition raise an error event and rules programmed to handle errors responded. A set of rules can declare a specific ruleset for handling errors using the errors to pragma in the meta block of the ruleset. Error events, unlike events generated by the raise statement, are routed directly to the designated error handling ruleset.

error event pattern
The Error Event Pattern

The Fuse error handling ruleset is fairly simple-minded. There is one rule, handle_error, that responds to all errors by mailing them to fuse-support. The message contains detailed information about what went wrong.

rule handle_error {
    select when system error 
    pre {
	genus = event:attr("genus");
	species = event:attr("species") || "none";
	level = event:attr("level");
	rid = event:attr("error_rid");
	rule_name = event:attr("rule_name");
	msg = event:attr("msg");
	eci = meta:eci();
	session = CloudOS:currentSession() || "none";
	ent_keys = rsm:entity_keys().encode();
	kre = meta:host();

	send_email=true;

	error_email_body = <<
A Fuse error occurred with the following details:
  Time: #{time:now()}

  RID: #{rid}
  Rule: #{rule_name}
  Host: #{kre}
  

  level: #{level}
  genus: #{genus}
  species: #{species}
  message: #{msg}

  eci: #{eci}
  txn_id: #{meta:txnId()}
  PCI Session Token: #{session}
  RSM Entity Keys: #{ent_keys}
>>;
    }
    if (send_email) then
	sendgrid:send(to_name, to_addr, subject, error_email_body);
}

Commentary:

  • Most of the information sent in the email comes from event attributes automatically generated when the error event is raised.
  • The email body is composed using extended quoting and beestings.

When Fuse was young, this was quite handy since I was told about things that went wrong and was able to track them down.

More sophisticated error handling can attempt to fix the problem and retry. For example, occasionally Carvoyant signals that the ignition has been turned off and doesn't include a trip ID. Without a trip ID, Fuse cannot query the Carvoyant API for the trip's parameters. Because this is caused by a race condition in the Carvoyant system, the trip can be recovered after waiting a short period. The no_trip_id rule in the Fuse error handling ruleset schedules a retry for recovering a trip in one minute:

rule no_trip_id {
  select when system error where msg.match(re/No trip ID/)
  sendgrid:send(to_name, to_addr, subject,
                "Scheduling retry for " + event:attr("msg"));
  always {
    schedule fuse event "trip_check" at time:add(time:now(),{"minutes" : 1}) 
       with duration = 1; 
  }
}

Commentary:

  • The event expression uses an attribute expression to only select when the error signals that it wa caused by no trip ID. This is simply matching the text of the error message using a regular expression.
  • The schedule command is similar to the raise command except that it gives a time for raising the event instead of raising the event immediately.
  • In this case we're calculating the time for scheduling the event to be one minute from now.

Event Abstraction

Because of the nature of rule languages, you may often write several rules that have the same event expression. This goes against the grain of programmers of traditional programming languages where repeating yourself is not only wasteful but leads to code maintenance problems (logical coupling). The answer is to abstract the portions of those multiple rules that are repetitive and that are apt to be changed frequently.

rule abstraction
The Rule Abstraction Pattern

The post_process_ignition rule from the Fuse Carvoyant ruleset is a good example of this. You'll recall from the Event Splitting pattern that there are separate rules for when the ignitionStatus event is raised with status equal to "ON" and when it's "OFF". Even so, there are some things we want to happen as part of both of those rules. The answer is to have a single rule that does these common activities that is selected after either of the rules for processing the ignitionStatus event have fired.

rule post_process_ignition {
  select when fuse ignition_processed
  pre {
    ignition_data = event:attrs();
  }
  noop();
  always {
    raise fuse event "need_vehicle_status";
    raise pds event "new_data_available"
	attributes {
	  "namespace": namespace(),
	  "keyvalue": "ignitionStatus_fired",
	  "value": ignition_data,
	  "_api": "sky"

	};
  }
}

Commentary:

  • This rule selects on the ignition_processed event which all the rules for processing the ignitionStatus event raise.
  • The rule raises several events in response to the incoming event, abstracting the multiple events into a single event using a rule.

Similar to the example in the Rule Chaining and Abstraction section above, we are avoiding logical coupling by not repeating logic that then has to be updated in multiple places. The key difference in this pattern and that one is that we're doing it specifically to abstract the events.

Enriching Events

Events are often quite sparse in the information they carry with them. Because events can be frequent, they are also usually lightweight. In addition, events are often broadcast on an event bus and event buses are usually kept simple for performance and administrative reasons. Consequently, events may be seen by systems not authorized for the full payload that could accompany the event.

event enrichment pattern
The Event Enrichment Pattern

Event enrichment deals with these issues by adding information to an event before passing it along to other rules in the system. The save_trip rule in Fuse Trips is an example of this:

rule save_trip {
  select when fuse new_trip 
  pre {
    vid = carvoyant:vehicle_id();
    raw_trip_info = carvoyant:tripInfo(incoming{"tripId"}, vid);
    tid = mkTid(raw_trip_info{"id"}).klog(">>>>> trip ID >>>>>");

    trip_info = raw_trip_info.delete(["data"]);
    
    ...
    
    final_trip_info = trip_info
		 .put(["cost"], trip_summary{"cost"})
		 .put(["interval"], trip_summary{"interval"})
		 .put(["avgSpeed"], trip_summary{"avgSpeed"})
		 .put(["name"], trip_name)
		 .put(["category"], trip_category)
		 ;
  }
  if( end_time neq "ERROR_NO_TIMESTAMP_AVAILABLE" 
   && trip_info{"mileage"} > 0.01
    ) then
  {send_directive("Adding trip #{tid}") with 
    end_time = end_time and
    trip_summary = trip_summary
    ;
  }
  fired {
    raise fuse event trip_saved with 
      tripId = tid and
      tripSummary = trip_summary;
    set ent:trips_by_id{tid} final_trip_info;
    set ent:trip_summaries{tid} trip_summary;
  } else {
    ...
  }
}

Even though I've simplified this rule, it's still fairly complex. I've left in the some of the complexity so illustrate the idea of enrichment.

  • The rule prelude makes a call to the Carvoyant API with the tripId to get details about the trip that was just completed.
  • The prelude computes things like trip cost and average speed and adds those to the trip information.
  • It also compares this trip to past trips to see if it should be categorized or named.
  • Finally, in addition to saving that information in a persistent entity variable, it raises a new event, trip_saved, that has been enriched with new information.

Note that the rule condition ensures that the enriched event is only raised if the length of the trip is greater than 0.01 miles as a way of ensuring that we don't record trips when the vehicle ignition was simply turned on and off without the vehicle moving. The condition also checks for other error conditions. Consequently, the enriched event is also a surer signal that a trip happened.

The update_vehicle_data rule in the Fuse Vehicle ruleset is another example of gathering information from various places to create a richer event, updated_vehicle, that contains significantly more information than the original rule.

Semantic Translation

Semantic translation is a special kind of event enrichment. Semantic translation interprets an event in light the context in which it is raised. The result is new event that has a different, usually richer meaning.

For example, an event that says you're at the airport could be turned into an event that says you're leaving on a trip, if the rule can confirm you have a ticket or your calendar shows a trip. An event that signals leaving on a trip is more specific and thus more meaningful than one that merely indicates presence at the airport. There might be some rules that only care that you're at the airport, but leaving on a trip indicates a different intention.

semantic translation pattern
The Semantic Translation Pattern

The process_ignition_off rule from the Fuse Carvoyant ruleset gives an example of this.

rule process_ignition_off {  
  select when carvoyant ignitionStatus where status eq "OFF"
  pre {
    tid = event:attr("tripId");
    ignition_data = normalize_carvoyant_attributes(event:attrs());
  }
  if not tid.isnull() then noop();
  fired {
    raise fuse event "new_trip" with tripId = tid;
    raise fuse event ignition_processed attributes ignition_data;
  } else {
    error warn "No trip ID " + ignition_data.encode();
  }
}

As we've seen, this rule selects when the ignitionStatus event is raised and the event attribute status is equal to "OFF". If the event also has a tripId, we interpret this as the signal that a trip has ended and raise the new_trip event.

Pico-to-Pico Event Patterns

Pico-to-pico events are the primary way that picos interact. Picos can send messages that contain events to other picos. Most interesting applications for picos are built from systems of picos cooperating to solve a problem. Carl Hewitt, the inventor of the Actor model of programming said "One actor is no actor. Actors come in systems."

Picos send events to a specific pico on a specific channel. There is a difference between raising an event on the internal event bus and sending an event to another pico. Raising an event inside the pico effectively broadcasts the event to all of the rules installed in the pico. On the other hand, pico-to-pico events are point-to-point, from one pico to another. A pico can have as many incoming and outgoing event channels as are needed for receiving events from and sending events to other picos.

pico_to_pico_events
Pico To Pico Events

Once an event is received by a pico, it is raised on the pico’s internal event bus and any installed rule can select if its event expression is met. As we’ve seen, the state of one picos is completely isolated from any other.

As we discussed in the Background section, Fuse consists of at least three picos representing the owner, the fleet, and one for each vehicle. These picos interact to provide Fuse's functionality. The following sections describe some common event patterns with examples from the Fuse code base.

Correlating Events

When one pico sends an event to another, it often is expecting an asynchronous response. A correlation identifier can be used to associate these two events. A correlation identifier links conversational state in asynchronous interactions.

event_correlation
event correlation

Correlation identifiers are passed as event attributes and can be any string that is unique within the conversation.

Rules use a correlation identifier to ensure that two processes don't get confused with one another. For example, the correlation identifier can be used in the pico's persistent state to keep data about different conversations separate.

The following rule from the Fuse Fleet ruleset shows the calculation and use of an correlation number as part of creating Fuse fleet reports. The rule has been simplified to emphasize the idea of correlation numbers.

rule start_periodic_report {
  select when fuse periodic_report_start
  pre {
    new_rcn = genCorrelationNumber();
    rcn = event:attr("report_correlation_number")
	       .defaultsTo(new_rcn);
    ...
    report_data = {"period": period,
		   "start": start,
		   "end": end,
		   "timezone": tz};
    augmented_attrs = event:attrs()
                       .put(["report_correlation_number"], rcn);
  }
  fired {
    raise explicit event periodic_report_routable
      with attributes augmented_attrs;
    set ent:report_data{rcn} report_data;
    schedule explicit event "periodic_report_timer_expired"
      at time:add(time:now(),{"minutes" : 2}) 
      attributes {"report_correlation_number": rcn, "timezone": tz}
  }
}

Commentary:

  • The correlation number is generated by a function that ensures that it's a unique string.
  • The implicit understanding is that any rule that sees the correlation number will pass it along so that every player can correlate their actions. Future versions of Wrangler, the pico operating system, will provide more automation for correlation.
  • The correlation number is used in internal events (the raise in the postlude).
  • The rule stores the correlation number for later use. In this case it's used as a key for storing other information in a persistent variable (the report_data) that will be used later for interactions involving this event.

Event Recipient Lists

Event or message routing is a critical task in reactive systems. The simplest way to route events to other picos is to keep a list of picos. Sending events to each of the picos on a recipient list is a simple matter. In this pattern one rule sends the same event to each of a number of other picos. The recipient list is analogous to the To: list on an email message.

event_recipient_list
Event Recipient List

The recipient list can be static, based on a particular configuration of picos in the computation, or it can be calculated in various ways. Computed recipient lists allow a pico to act as an event router.

The following example from Fuse shows an example of a rule that uses an event recipient list that contains all the vehicles in the fleet. The list isn't static, it's computed by calling the activeVehicleSummary() function. This function could change the recipient list based on vehicles being added or removed from the fleet, or merely by vehicles being inactive (not connected to a device).

rule process_periodic_report_with_rcn {
  select when explicit periodic_report_routable
  foreach activeVehicleSummary() setting(vsum)
    pre {
      rcn = event:attr("report_correlation_number");
      channel = {"cid": vsum{"channel"}};
      ...
    }
    if(not rcn.isnull()) then {
      event:send(channel, "fuse", "periodic_vehicle_report")
	  with attrs = {
	    "report_correlation_number": rcn,
	    "vehicle_id": vsum{"deviceId"},
	    "start": common:convertToUTC(start),
	    "end": common:convertToUTC(end)
	  };
    }
    ...
}

Commentary:

  • The foreach statement runs the rule once for each vehicle returned by activeVehicleSummaries()
  • Each iteration of the foreach loop sets the vsum variable with the vehicle summary for a specific vehicle.
  • The rule routes the periodic_vehicle_report event to each vehicle using event:send() and the event channel in vsum.
  • The correlation number is taken from an event attribute.
  • If the correlation number is missing, the rule doesn't fire.
  • The correlation number is sent with the event to each vehicle pico.

Content-Based Event Routing

Another way to route messages is by content. In content-based event routing, the routing pico knows about some number of other picos and selects where to route the event based on the event domain, name, or attributes and other information such as the current state of the pico and external information from APIs.

The routing rule usually attaches a correlation identifier to the event before routing it.

event_routing
Event Routing

The route_to_owner rule from the Fuse Fleet ruleset is a simple example of this idea. A Fuse fleet can have more than one owner and the fleet needs to generally keep the owner aware of certain things by routing events.

rule route_to_owner {
  select when fuse new_fleet
           or fuse reminders_ready
           or fuse email_for_owner
  pre {
    owner_subs =
       CloudOS:subscriptionList(common:namespace(),"FleetOwner");
    // find the owner who contacted us (could be more than one)
    matching_owner = owner_subs.filter(
              function(sub){ sub{"backChannel"} eq meta:eci()
                           }
              );
    // use first owner if no match
    owner_list = matching_owner.length() > 0 => matching_owner
                                              | owner_subs;
    owner = owner_list.head().pick("$.eventChannel");
  }
  {
    send_directive("Routing to owner")
      with channel = owner 
       and attrs = event:attrs();
    event:send({"cid": owner}, "fuse", event:type())
      with attrs = event:attrs();
  }
}

Commentary:

  • The event expression is used to determine what events get routed to the owner. This method is static and bound early. KRL offers no mechanism at present for dynamically computing an event expression, but rule chaining with computed event types could be used to achieve a similar effect.
  • The rule is designed to route events to just one owner. The event is routed to the owner who sent the incoming event or to the first owner, if the incoming event didn't come from an owner.
  • If needed, this rule could be extended to route to all owners.

Pico Registration

The most general way to route events is to create a service directory and allow picos to register for events based on specific criteria.

pico registry
Pico Registration

In this pattern, picos send registration events that include information like their name, an event channel identifier, and attributes that are important in routing. The registration pico might be a special pico that serves as a directory in a large system or the registration might just be a ruleset in pico with other responsibilities. The registration pico might route events based on pico type, name, or other attributes.

One important feature of a directory is to allow picos to change their event channel for security reasons without losing service. A directory also allows picos to move to other hosting providers without loss of functionality.

A more complex example of this idea is to use a registrar. A registrar is a third party that manages the registry on behalf of the instances. The registrar watches for child_created or child_destroyed events to know when instances are created or destroyed and registers or deregisters them as appropriate. The registrar also periodically checks the health of instances and automatically deregisters those it deems incapacitated. The registrar decouples instances from the registry. They can be oblivious to the existence of the registry so that they never need explicitly register.

There is no good example of this pattern in Fuse. However, the following example code shows how this could work. First let's look at a simple rule to process registration events:

rule register_picos {
  select when system pico_registration
  pre {
    topic = event:attr("topic");
    pico_data =  makeRegistrationRecord(event:attrs());
  }
  if(not pico_data{"eci"}.isnull()) then noop();
  fired {
     set ent:registrations{topic} = pico_data
  }
}

Commentary:

  • The rule makes use of a topic event attribute to determine which topic the registering pico is interested in.
  • The rule uses a function, makeRegistrationRecord() to process the incoming event attributes and create a record of anything important.
  • The rule only fires if the incoming registration event includes an event channel. Obviously, this check could include any necessary required information.
  • The rule ultimately stores the registration request in an entity variable called ent:registrations by topic.

The following code routes to registered picos by topic. This rule would be in the same pico as the pico_registration rule shown above.

rule route_to_registered {
  select when fuse events_to_route
           or fuse another_event_to_route
  foreach ent:registrations{event:attr("topic")}
    setting(registered_pico)
    pre {
      channel = {"cid": registered_pico{"eci"}}
    }
    event:send(channel, "fuse", event:type()) with attrs = event:attrs();
}

Commentary:

  • This rule selects on any routable events that have been set in the rule's event expression.
  • The foreach loop will run once for any pico in the ent:registrations variable for the topic specified in the attributes of the incoming event. The topic could be computed rather than relying on it being in the incoming event.
  • All the incoming event attributes and the incoming event type are routed to the picos that are registered for the topic.

Aggregators

The aggregator pattern listens for incoming events and collects them. Once the required events have been collected, the aggregator raises an event or takes some another action.

event_aggregation
Event Aggregation

The following two rules from the Fuse Fleet ruleset comprise an event aggregator. The first, catch_periodic_vehicle_reports, watches for periodic_vehicle_report_created events from one of the vehicles in the fleet, saves the information in the event as that vehicle's report, and raises an event that indicates the vehicle report was added.

rule catch_periodic_vehicle_reports {
  select when fuse periodic_vehicle_report_created

  pre {
    vehicle_id = event:attr("vehicle_id");
    rcn = event:attr("report_correlation_number");
    updated_vehicle_reports =
        (ent:vehicle_reports{[rcn,"reports"]})
		  .defaultsTo([])
	          .append(event:attr("vehicle_details").decode());

  }
  noop();
  always {
    set ent:vehicle_reports{[rcn,"reports"]} updated_vehicle_reports;
    raise explicit event periodic_vehicle_report_added with
      report_correlation_number = rcn
  }

}    

Commentary:

  • The report correlation number is sent with the incoming event as an event attribute.
  • The report correlation number is used to store the report in an entity variable. Thus multiple simultaneous reports could in play at the same time without interfering with each other.
  • The same report correlation number is raised with the periodic_vehicle_report_added event as an attribute.

The second rule, determines when sufficient reports have been collected. In this case, it's comparing the number of reports received with the number of active vehicles. So long as there are insufficient reports received, the the rule does nothing.

rule check_periodic_report_status {
  select when explicit periodic_vehicle_report_added
  pre {
    rcn = event:attr("report_correlation_number");
    vehicles_in_fleet = activeVehicleSummary().length();
    number_of_reports_received = (ent:vehicle_reports{[rcn,"reports"]})
				   .length();
  }
  if ( vehicles_in_fleet <= number_of_reports_received ) then noop();
  fired {
    log "process vehicle reports ";
    raise explicit event periodic_report_ready with
      report_correlation_number = rcn;
  } else {
    log "we're still waiting for " +
        (vehicles_in_fleet - number_of_reports_received) +
	" reports on #{rcn}";
  }
}

Commentary:

  • The prelude calculates how many vehicle reports have been received using the report correlation number.
  • The report correlation number is passed along with the periodic_report_ready event so that any downstream rules can process the right report.
  • We use the log() statement in the else clause of the postlude to show information in the logs about how many reports have been received.

Scatter-Gather Pattern

The scatter-gather pattern is useful when you need to get some number of picos to do something and then aggregate the results to complete the computation. This is a common pattern for asynchronous processing in reactive systems. Events are sent asynchronously and, consequently, the sending pico does not block and is free to process other data while it's waiting for the result. Similarly, the picos that receive the event can process the event and respond when ready.

Fuse uses the scatter-gather pattern in creating weekly vehicle reports. We've already seen the process_periodic_report_with_rcn rule in the Event Recipient Lists pattern. This rule scatters events telling the vehicles that the fleet needs the vehicle report. The rules we just saw in the Aggregator pattern are the gathering part of this set of rules.

When we combine the pictures from those two patterns, we get a set up that looks like this:

scatter_gather
Scatter Gather

The scatter-gather pattern and it's use in Fuse is described in some detail in Using the Scatter-Gather Pattern to Asynchronously Create Fuse Reports. Here's a diagram showing the specific interactions in Fuse:

scatter-gather pattern
scatter-gather pattern

The owner pico kicks everything off by sending the request_periodic_report event to the fleet. The start_periodic_report rule in the fleet pico scatters the periodic_vehicle_report event to each vehicle in the fleet, whether there's 1 or 100. Of course, these events are asynchronous as well. Consequently the vehicle picos are not under time pressure to complete.

When each vehicle pico completes, it sends a periodic_vehicle_report_created event to the fleet pico. The catch_vehicle_reports rule is listening and gathers the reports. Once it's added the vehicle report, it fires the periodic_vehicle_report_added event. Another rule in the fleet pico, check_report_status is checking to see if every vehicle has responded. When the number of reports equals the number of vehicles, it raises the periodic_report_data_ready event and the data is turned into a report and the owner pico is notified it's ready for emailing.

As we've seen, these rules make extensive use of the report correlation number to ensure that reports are not intermingled if a request_periodic_report event happens to be sent before the previous one finishes.

Dealing with Failure

Because events may be lost, asynchronous systems have to be prepared to deal with failure.

event failure
Event Failure

In the case of Fuse, the start_periodic_report rule that we saw in the section on Correlating Events also schedules the periodic_report_timer_expired event for two minutes in the future:

schedule explicit event "periodic_report_timer_expired"
  at time:add(time:now(),{"minutes" : 2}) 
  attributes {"report_correlation_number": rcn, "timezone": tz}

Another rule, retry_from_expired_timer, listens for this event and retries missing vehicles:

rule retry_from_expired_timer {
  select when explicit periodic_report_timer_expired
  pre {
    max_retries = 2;
    rcn = event:attr("report_correlation_number");
    tz = event:attr("timezone");
    vehicle_summaries = vehicleSummary();
    vehicle_reports = ent:vehicle_reports{[rcn,"reports"]}
                        .defaultsTo([]);
    vehicle_summaries_keys = vehicle_summaries
                               .map(function(r){r{"deviceId"}});
    vehicle_reports_keys = vehicle_reports
                             .map(function(r){r{"deviceId"}});

    missing_vehicles = vehicle_summaries_keys
			   .difference(vehicle_reports_keys);
    in_array = function(k,a){
      a.filter(function(x){x eq k}).length() > 0;
    };
    needed = vehicle_summaries.filter(
                 function(s){
		   in_array(s{"deviceId"}, missing_vehicles)
		 });

    rcn_unprocessed = not ent:vehicle_reports{rcn}.isnull();
  }
  if ( needed.length() > 0
    && ent:retry_count < max_retries
     ) then {
    noop();
  }
  fired {
    log "Retrying for " + (needed.length()) + " vehicles";
    set ent:retry_count ent_retry_count+1;
    raise fuse event periodic_report_start attributes {
       "vehicle_summaries": needed,
       "timezone": tz,
       "report_correlation_number": rcn 
      } if rcn_unprocessed;
    schedule explicit event "periodic_report_timer_expired"
      at time:add(time:now(),{"minutes" : 2}) 
      attributes {"report_correlation_number": rcn, "timezone": tz}
  } else {
    clear ent:retry_count;
    raise explicit event periodic_report_ready with
      report_correlation_number = rcn if rcn_unprocessed;
  }
}

Commentary:

  • The retry in two minutes is somewhat arbitrary. Since reports are only generated once a week, waiting two minutes to retry does not seem overly long.
  • The logic in the prelude of this rule is primarily concerned with calculating needed, a list of the vehicles that have not yet sent reports.
  • If reports are needed, the same event, periodic_report_start, is raised, but with a vehicle_summaries attribute.
  • The scheduled event is also reset in case this retry fails as well.
  • The rule is designed to retry a maximum number of times. If the maximum is reached, the rule raises the periodic_report_ready event even if not all reports have been received. The report is processed without them.

Conclusions

Part of the challenge in building reactive systems is leaning to program in a completely new style. Reactive systems are by nature asynchronous and loosely coupled. Furthermore, program flow doesn't follow in the same manner as in object-oriented or imperative systems, but rather is based on the event-flow. Rule-based systems may have any number of independent responses to a given event. Knowing patterns can help ease the transition to this new style of programming.

This post has demonstrated and explained a number intra-pico and pico-to-pico event patterns. The list we have reviewed is by no means exhaustive, but it is indicative of the wide variety of event processing that happens in actor-based systems. The Fuse code base is open-source and thus represents an example of a large, Internet of Things system programmed in a reactive style using the actor model.


  1. KRL data structure are just JSON.
  2. In general, KRL is designed so that declarations in the prelude cannot cause side effects to persistent variables.


Reactive Programming Patterns: Examples from Fuse

Radioactive Minifig

Microservices are hot and, consequently, so is reactive programming. Reactrive programming, particularly the actor-model, is a natural way to build systems that have the asynchrony and data isolation that good microservices demand. Reactive programming requires different skills from what most programmers learn to complete more traditional programming projects. One way to understand reactive programming is to study common reactive programming patterns.

I realized as I was getting ready to teach my CS462 class about reactive programming patterns, that I'd used each of the patterns I wanted to talk about as part of the Fuse Connected Car project we built at Kynetx in 2014. The Fuse API code is open-source and available on Github. So, Fuse is an excellent resource for showing off these important patterns.

After some background on the technical details of Fuse and the system it's built in, we'll review a selection of reactive programming patterns and show code examples from Fuse that illustrate the pattern for both intra-pico and pico-to-pico interactions.

Fuse Background

Fuse is a connected-car platform. I've written about extensively on Fuse. For the purposes of this post, it's important to understand the following:

  • Fuse is build using a reactive programming system called picos (short for persistent-compute objects). Picos are based on the actor model and most pico interaction is event-driven.
  • Picos use a set of pre-built services called Wrangler to manage things like creating and destroying picos, pico-to-pico subscriptions, storing profiles, and so on.
  • Pico funcitonality is determined by the rules installed in the pico.
  • Rules are collected into rulesets that can share function definitions.
  • Each ruleset has a separate persistent key-value store from every other ruleset. This is manifest in persistent variables. As such most pico-based system have no need of an external database.
  • Rules are programmed in a language called KRL.
  • When we create a pico, it is automatically endowed with an event bus that connects all rules installed in the pico.
  • Fuse creates a separate pico for each vehicle.
  • Wrangler provides a ruleset that functions as a persistent data store for the entire pico called the PDS. The PDS provides a standard profile for each pico. Fuse stores all of the vehicle's configuration and identity information in the pico profile.
  • Other vehicle data is stored by the individual Fuse service. For example, the trip service stores information about trips, the fuel service stores information about fuel purchase, and so on.
  • Rules can raise events on the pico's internal event bus, sent events to other picos, and use HTTP to interface with other Web-based APIs.

Not only do we create a pico for each vehicle, but we also create one for each owner, and one to represent the fleet. The following diagram shows schematically how these various picos are related to each other.

Fuse System Pico Graph
Fuse pico arrangement

Fuse uses a service called Carvoyant to manage devices and provide an API that is used to get vehicle data. The Carvoyant API is a well-designed RESTful API that uses OAuth for user authorization.

Carvoyant accounts correspond roughly to the concept of a fleet in Fuse. Each account has a collection of vehicles as well as information about the owner. Each vehicle pico in Fuse is linked through a set of event subscriptions (using a specialization of the webhook pattern called the Evented API) to the vehicle in Carvoyant as shown in the following diagram:

Fuse fleet Carvoyant correspondence
Correspondence between Fuse Fleet and Carvoyant Account

Whenever the device in a vehicle detects something interesting (e.g. a change in ignition status, entering or leaving a geofence, key parameters like fuel level going over or under a threshold, and so on), it communicates that state change to Carvoyant via a cellular network. Carvoyant is programmed to raise that event, via an event subscription to the corresponding vehicle pico.

Intra-Pico Event Patterns

Every pico has an internal event bus. Any event sent to the pico via one of its event channels is raised on the internal bus. Any rule installed in the pico that selects on that event will run in response to the raised event. Rules in the pico can also raise events to the internal event bus.

There are several common patterns used by KRL rules to process events. Here's a list of the patterns discussed in this section:

Raising Events

Any rule can raise an event on the internal event bus using the raise command. Any rule installed in the pico can see that event and if it selects on that event, will run.

raising events
Raising Events

Here's the set_debug_pref rule from the Fuse Owner ruleset showing a raise command in the postlude:

rule set_debug_pref {
  select when fuse new_debug_value
  pre {
    new_val = event:attr("debug_value");
  }
  always {
    raise pds event "new_settings_attribute" 
          with setRID   = meta:rid() // this rid
           and setAttr  = "debugPreference"
           and setValue = new_val
  }
}

Commentary

  • This rule shows a new_settings_attribute event being raised with a specific set of event attributes.
  • The raised event can contain computer event attributes
  • This rule postlude is introduced with the keyword always meaning the postlude statements will always execite if the rule is selected.

In addition to the with..and... syntax shown here, attributes can be sent with the event using the attributes keyword which accepts a map (hash) that gives the attribute names and values1. If we rewrote the preceding rule to use that format, it would look like this:

rule set_debug_pref {
  select when fuse new_debug_value
  pre {
    settings =  {"setRID"   : meta:rid(), // this rid
                 "setAttr"  : "debugPreference",
                 "setValue" : event:attr("debug_value")
                };
  }
  always {
    raise pds event "new_settings_attribute" attributes settings
  }
}

There's no functional difference between the two forms. The latter allows the entire set of attributes to be computed (including attribute names) rather than just the values.

As you read the following examples that use raise, keep in mind that raise puts events on the pico's internal event bus. Both the event bus and persistent storage of each pico are isolated from those of other picos.

Rule Chaining and Abstraction

Often when a rule raises an event, the goal is to signal a general state change in the pico. For example, a rule that is updating the profile might raise the profile_updated event. Any other rule in the pico that is interested in knowing when the profile has been updated can listen for that event and do any necessary processing.

Sometimes, however, the reason for raising an event is more tactical. We simply need to do something that takes two or more rules. This is called rule chaining.

raising events
Rule Chaining

In rule chaining, rule A is raising an event with the explicit goal of causing rule B to be selected.

As an example of this, consider the following two rules (slightly simplified) from the Fuse Fuel ruleset:

rule record_fuel_purchase {
  select when fuse new_fuel_purchase
  pre {
    // new records can't have id
    rec = event:attrs().delete(["id"]); 
  }  
  send_directive("Recording fill up") with rec = rec
  fired {
    raise fuse event "updated_fuel_purchase" attributes rec; 
  }
}
  
rule update_fuel_purchase {
  select when fuse updated_fuel_purchase
  pre {

    // if no id, assume new record and create one
    id = event:attr("id") || random:uuid();  
    // build a fuel purchase record
    ...
  }
  if( not volume.isnull() ...
   && not id.isnull()
    ) then {
      send_directive("Updating fill up") with rec = rec
  }
  fired {
    set ent:fuel_purchases{id} rec;
    raise fuse event "fuel_purchase_saved";
  }
}

Commentary:

  • The second rule, updated_fuel_purchase, does all the work of storing a fuel purchase record. It's designed to either update an existing record or create a new record when the id attribute is null.
  • The first rule, record_fuel_purchase, is for new fuel purchase records, but all it does is ensure that the incoming record doesn't for some reason, have an id.
  • record_fuel_purchase raises an explicit event to chain to updated_fuel_purchase.

Rule chaining is one way for a rules to avoid repeating logic (logical coupling). There's only one place where a fuel purchase record is created and stored even though there are two different events that signal a new_fuel_purchase or an updated_fuel_purchase. We're using rule chaining to abstract the updated_fuel_purchase event.

Guard Rules and Idempotence

One of the most important reasons to chain rules is to guard the action a rule takes to ensure idempotence. Pico-based systems are easier to program when responses to an event are idempotent, meaning that they can run multiple times without cumulative effect.

Many operations are idempotent (i.e. installing a ruleset in a pico over and over only results in the ruleset being added once). For operations that aren't naturally idempotent, we can make the rule idempotent using the rule's guard condition. Using a guard condition we can ensure the rule only fires when specific conditions are met.

There may be reasons why just using the guard condition in the rule isn't satisfactory. First, there are several library calls that are poorly designed and cause side effects in the prelude.2. Second, even when side-effecting code isn't an issue, we might not want to execute an entire, computationally heavy prelude before checking the condition.

guard rule pattern
The Guard Rule Pattern

A guard rule uses rule chaining so that the guard rule can test the guard condition and only raise an event for the second rule when it passes. The guard rule:

  1. responds to the triggering event
  2. tests a condition that ensures idempotence
  3. raises an explicit event in the postlude for which the second rule is listening

Here's an example from the Fuse Owner ruleset. This rule is called when a new Fuse owner account is created.

rule kickoff_new_fuse_instance {
  select when fuse need_fleet
  pre {
    fleet_channel = pds:get_item(common:namespace(),"fleet_channel");
  }
  if(fleet_channel.isnull()) then
    send_directive("requesting new Fuse setup");
  fired {
    raise explicit event "need_new_fleet" 
	  with _api = "sky"
	   and fleet = event:attr("fleet") || "My Fleet";
  }
}

Commentary:

  • When a new account is created, we want to create a fleet pico. But we want that action to be idempotent since any Fuse account should have one and only one fleet pico.
  • The rule that creates the fleet pico, create_fleet, is listening for the need_new_fleet event.
  • We cannot enforce idempotence with the conditional statement in the create_fleet rule because the common:factory() call has the side effect of creating a new child pico.
  • We use the existence of the fleet channel to test for whether or not the owner has already been initialized. If the fleet channel is null we assume the owner pico needs to be initialized.

Self-Healing Systems

Self-healing systems are a hallmark of robust system design. Rules can watch triggering events, check pico state, and raise an event that signals the problem. The trigger could be some regularly occurring event or a scheduled event. Regardless, we only want to signal a problem if one exists. A guard rule is a natural fit here.

self healing
Guard Rules and Self Healing

In this pattern, one or more rules listen for the event that signals a problem and raise an event that signals the problem so other rules can take some action to fix the break.

Here's an example from Fuse that illustrates a check for a problem that needs to be corrected. Fuse subscribes to events from Carvoyant like ignition_status that signal some change in the vehicle. These subscriptions are set up when the vehicle is initialized. They can become dirty for various reasons:

  • The vehicle event channel could change and the subscriptions to Carvoyant need to be updated to send events to the pico over the new channel.
  • The design of Fuse might change necessitating new or different event subscriptions to Carvoyant.
  • Something else might go wrong in either Fuse of Carvoyant that requires the subscriptions be resynchronized.

Since picos are independent system they cannot be updated en masse using, for example, an SQL statement. Rather we require that they fix themselves when a fix is needed. For example, adding new funciontality to Fuse sometimes requires changing the event subscriptions that a Fuse vehicle pico has to the Carvoyant API. The Fuse system is design to automatically fix subscriptions so that they match the specification of needed Carvoyant events.

rule check_subscriptions {
  select when fuse subscription_check
  pre {
    vid = carvoyant:vehicle_id(); 
    my_subs = carvoyant:getSubscription(vid);
  }
  if( not subscriptionsOk(my_subs) ) then
  {
    send_directive("subscriptions not OK")
      with my_subscriptions = my_subs
       and should_have = should_have
  }
  fired {
    raise carvoyant event dirty_subscriptions;
  }
}

Commentary:

  • The primary work is done by the subscriptionsOk() predicate. That predicate checks all the needed event subscriptions (from a list) against the existing subscriptions and ensures they're all there.
  • The rules that respond to the dirty_subscriptions event use the same list to find any missing or bad subscriptions and recreate them.

Event Splitting

Event splitting is useful when a single event can signal different state changes based on its attributes or the current state of the receiving pico.

event splitting pattern
The Event Splitting Pattern

For example, in Fuse, the Carvoyant system sends an ignitionStatus event when the the ignition key is turned on or off. The event includes an event attribute, status, that indicates how the ignition state changed.

Fuse treats these two states differently. The ignition being turned on indicates the start of a trip. The Carvoyant system manages trip tracking using telemetry data from the in-vehicle unit, so Fuse doesn't have to do anything with a trip when the ignition is turned on. Fuse does, however, use this as a signal to do a few clean-up and self-healing procedures. For example, the system checks for missed trips when the vehicle starts up and grabs the current vehicle status. We use the ignition being turned on as a convenient (and usually frequent) signal to accomplish tasks while the vehicle pico is not otherwise busy.

On the other hand, when the ignition is turned off, Fuse processes the trip that has just completed, saving it for later use. The ignitionStatus event doesn't contain all of the trip information, it merely signals the state change. Fuse uses Carvoyant's API to gather the detailed trip information, process it, and store the results.

The following two rules from the Fuse Carvoyant ruleset show how Fuse distinguishes and processes these two different state changes:

rule process_ignition_on  {  
  select when carvoyant ignitionStatus where status eq "ON"
  pre {
    ignition_data = normalize_carvoyant_attributes(event:attrs());
  }
  noop();
  always {
    raise fuse event "trip_check" with duration = 2; // recover lost trips
    raise fuse event ignition_processed attributes ignition_data;
  }
}

rule process_ignition_off {  
  select when carvoyant ignitionStatus where status eq "OFF"
  pre {
    tid = event:attr("tripId");
    ignition_data = normalize_carvoyant_attributes(event:attrs());
  }
  if not tid.isnull() then noop();
  fired {
    raise fuse event "new_trip" with tripId = tid;
    raise fuse event ignition_processed attributes ignition_data;
  } else {
    error warn "No trip ID " + ignition_data.encode();
  }
}

Commentary:

  • We use an attribute expression (signaled by the where keyword) in the event expression to split the event and process it differently.
  • The attribute expression creates an event expression that only selects when the specified condition is true.
  • In this case there are only the two options, but you could split an event multiple ways using this technique.

This example uses the event expression to split the incoming event, but you could do it in the rule body as well, computing different events to be raised based on the incoming event, the current state or the pico, or even external information from an API call.

Event Logging

Event logging is a great example of the power of loose coupling in event-based systems. A logging rule can log important information for notification, monitoring, or support on the side without inserting the logging logic into the primary event flow. You can add multiple logging rules, change them as needed, or delete them without touching the base system.

event logging pattern
The Event Logging Pattern

The finalize_new_users rule in the Fuse Owner ruleset demonstrates this:

rule finalize_new_users {
  select when fuse new_fleet_initialized
	  and pds profile_updated
  pre {
    me = pds:get_all_me();
    my_email =  me{"myProfileEmail"} || random:uuid();
    msg = <<
A new fleet was created for #{me.encode()} with ECI #{meta:eci()}
>>;
  }
  sendgrid:send("Kynetx Fleet Team", "fuse-support@kynetx.com",
                "New Fuse Fleet", msg);
  always {
    set app:fuse_users{my_email} makeAcctRecord(me)
  }
}

Commentary:

  • This rule is selected the first time a new Owner pico is set up (signalled by the new_fleet_initialized event).
  • The event expression also requires the profile_updated event (using the and event operator) so that it won't run until after initialization is complete.
  • The rule sends an email to fuse-support using Sendgrid and records a record in an application persistent variable (app:fuse_users).

Note that because the pico's event bus is isolated, there's no danger that a new_fleet_initialized event will combine with a profile_updated event from another pico and mistakenly select. This rule selects only when both those events happen (in any order) in this pico.

Event Errors

Rules can raise error events in the rule postlude using the error statement. Rules that detect an error condition raise an error event and rules programmed to handle errors responded. A set of rules can declare a specific ruleset for handling errors using the errors to pragma in the meta block of the ruleset. Error events, unlike events generated by the raise statement, are routed directly to the designated error handling ruleset.

error event pattern
The Error Event Pattern

The Fuse error handling ruleset is fairly simple-minded. There is one rule, handle_error, that responds to all errors by mailing them to fuse-support. The message contains detailed information about what went wrong.

rule handle_error {
    select when system error 
    pre {
	genus = event:attr("genus");
	species = event:attr("species") || "none";
	level = event:attr("level");
	rid = event:attr("error_rid");
	rule_name = event:attr("rule_name");
	msg = event:attr("msg");
	eci = meta:eci();
	session = CloudOS:currentSession() || "none";
	ent_keys = rsm:entity_keys().encode();
	kre = meta:host();

	send_email=true;

	error_email_body = <<
A Fuse error occurred with the following details:
  Time: #{time:now()}

  RID: #{rid}
  Rule: #{rule_name}
  Host: #{kre}
  

  level: #{level}
  genus: #{genus}
  species: #{species}
  message: #{msg}

  eci: #{eci}
  txn_id: #{meta:txnId()}
  PCI Session Token: #{session}
  RSM Entity Keys: #{ent_keys}
>>;
    }
    if (send_email) then
	sendgrid:send(to_name, to_addr, subject, error_email_body);
}

Commentary:

  • Most of the information sent in the email comes from event attributes automatically generated when the error event is raised.
  • The email body is composed using extended quoting and beestings.

When Fuse was young, this was quite handy since I was told about things that went wrong and was able to track them down.

More sophisticated error handling can attempt to fix the problem and retry. For example, occasionally Carvoyant signals that the ignition has been turned off and doesn't include a trip ID. Without a trip ID, Fuse cannot query the Carvoyant API for the trip's parameters. Because this is caused by a race condition in the Carvoyant system, the trip can be recovered after waiting a short period. The no_trip_id rule in the Fuse error handling ruleset schedules a retry for recovering a trip in one minute:

rule no_trip_id {
  select when system error where msg.match(re/No trip ID/)
  sendgrid:send(to_name, to_addr, subject,
                "Scheduling retry for " + event:attr("msg"));
  always {
    schedule fuse event "trip_check" at time:add(time:now(),{"minutes" : 1}) 
       with duration = 1; 
  }
}

Commentary:

  • The event expression uses an attribute expression to only select when the error signals that it wa caused by no trip ID. This is simply matching the text of the error message using a regular expression.
  • The schedule command is similar to the raise command except that it gives a time for raising the event instead of raising the event immediately.
  • In this case we're calculating the time for scheduling the event to be one minute from now.

Event Abstraction

Because of the nature of rule languages, you may often write several rules that have the same event expression. This goes against the grain of programmers of traditional programming languages where repeating yourself is not only wasteful but leads to code maintenance problems (logical coupling). The answer is to abstract the portions of those multiple rules that are repetitive and that are apt to be changed frequently.

rule abstraction
The Rule Abstraction Pattern

The post_process_ignition rule from the Fuse Carvoyant ruleset is a good example of this. You'll recall from the Event Splitting pattern that there are separate rules for when the ignitionStatus event is raised with status equal to "ON" and when it's "OFF". Even so, there are some things we want to happen as part of both of those rules. The answer is to have a single rule that does these common activities that is selected after either of the rules for processing the ignitionStatus event have fired.

rule post_process_ignition {
  select when fuse ignition_processed
  pre {
    ignition_data = event:attrs();
  }
  noop();
  always {
    raise fuse event "need_vehicle_status";
    raise pds event "new_data_available"
	attributes {
	  "namespace": namespace(),
	  "keyvalue": "ignitionStatus_fired",
	  "value": ignition_data,
	  "_api": "sky"

	};
  }
}

Commentary:

  • This rule selects on the ignition_processed event which all the rules for processing the ignitionStatus event raise.
  • The rule raises several events in response to the incoming event, abstracting the multiple events into a single event using a rule.

Similar to the example in the Rule Chaining and Abstraction section above, we are avoiding logical coupling by not repeating logic that then has to be updated in multiple places. The key difference in this pattern and that one is that we're doing it specifically to abstract the events.

Enriching Events

Events are often quite sparse in the information they carry with them. Because events can be frequent, they are also usually lightweight. In addition, events are often broadcast on an event bus and event buses are usually kept simple for performance and administrative reasons. Consequently, events may be seen by systems not authorized for the full payload that could accompany the event.

event enrichment pattern
The Event Enrichment Pattern

Event enrichment deals with these issues by adding information to an event before passing it along to other rules in the system. The save_trip rule in Fuse Trips is an example of this:

rule save_trip {
  select when fuse new_trip 
  pre {
    vid = carvoyant:vehicle_id();
    raw_trip_info = carvoyant:tripInfo(incoming{"tripId"}, vid);
    tid = mkTid(raw_trip_info{"id"}).klog(">>>>> trip ID >>>>>");

    trip_info = raw_trip_info.delete(["data"]);
    
    ...
    
    final_trip_info = trip_info
		 .put(["cost"], trip_summary{"cost"})
		 .put(["interval"], trip_summary{"interval"})
		 .put(["avgSpeed"], trip_summary{"avgSpeed"})
		 .put(["name"], trip_name)
		 .put(["category"], trip_categoty)
		 ;
  }
  if( end_time neq "ERROR_NO_TIMESTAMP_AVAILABLE" 
   && trip_info{"mileage"} > 0.01
    ) then
  {send_directive("Adding trip #{tid}") with 
    end_time = end_time and
    trip_summary = trip_summary
    ;
  }
  fired {
    raise fuse event trip_saved with 
      tripId = tid and
      tripSummary = trip_summary;
    set ent:trips_by_id{tid} final_trip_info;
    set ent:trip_summaries{tid} trip_summary;
  } else {
    ...
  }
}

Even though I've simplified this rule, it's still fairly complex. I've left in the some of the complexity so illustrate the idea of enrichment.

  • The rule prelude makes a call to the Carvoyant API with the tripId to get details about the trip that was just completed.
  • The prelude computes things like trip cost and average speed and adds those to the trip information.
  • It also compares this trip to past trips to see if it should be categorized or named.
  • Finally, in addition to saving that information in a persistent entity variable, it raises a new event, trip_saved, that has been enriched with new information.

Note that the rule condition ensures that the enriched event is only raised if the length of the trip is greater than 0.01 miles as a way of ensuring that we don't record trips when the vehicle ignition was simply turned on and off without the vehicle moving. The condition also checks for other error conditions. Consequently, the enriched event is also a surer signal that a trip happened.

The update_vehicle_data rule in the Fuse Vehicle ruleset is another example of gathering information from various places to create a richer event, updated_vehicle, that contains significantly more information than the original rule.

Semantic Translation

Semantic translation is a special kind of event enrichment. Semantic translation interprets an event in light the context in which it is raised. The result is new event that has a different, usually richer meaning.

For example, an event that says you're at the airport could be turned into an event that says you're leaving on a trip, if the rule can confirm you have a ticket or your calendar shows a trip. An event that signals leaving on a trip is more specific and thus more meaningful than one that merely indicates presence at the airport. There might be some rules that only care that you're at the airport, but leaving on a trip indicates a different intention.

semantic translation pattern
The Semantic Translation Pattern

The process_ignition_off rule from the Fuse Carvoyant ruleset gives an example of this.

rule process_ignition_off {  
  select when carvoyant ignitionStatus where status eq "OFF"
  pre {
    tid = event:attr("tripId");
    ignition_data = normalize_carvoyant_attributes(event:attrs());
  }
  if not tid.isnull() then noop();
  fired {
    raise fuse event "new_trip" with tripId = tid;
    raise fuse event ignition_processed attributes ignition_data;
  } else {
    error warn "No trip ID " + ignition_data.encode();
  }
}

As we've seen, this rule selects when the ignitionStatus event is raised and the event attribute status is equal to "OFF". If the event also has a tripId, we interpret this as the signal that a trip has ended and raise the new_trip event.

Pico-to-Pico Event Patterns

Pico-to-pico events are the primary way that picos interact. Picos can send messages that contain events to other picos. Most interesting applications for picos are built from systems of picos cooperating to solve a problem. Carl Hewitt, the inventor of the Actor model of programming said "One actor is no actor. Actors come in systems."

Picos send events to a specific pico on a specific channel. There is a difference between raising an event on the internal event bus and sending an event to another pico. Raising an event inside the pico effectively broadcasts the event to all of the rules installed in the pico. On the other hand, pico-to-pico events are point-to-point, from one pico to another. A pico can have as many incoming and outgoing event channels as are needed for receiving events from and sending events to other picos.

pico_to_pico_events
Pico To Pico Events

Once an event is received by a pico, it is raised on the pico’s internal event bus and any installed rule can select if its event expression is met. As we’ve seen, the state of one picos is completely isolated from any other.

As we discussed in the Background section, Fuse consists of at least three picos representing the owner, the fleet, and one for each vehicle. These picos interact to provide Fuse's functionality. The following sections describe some common event patterns with examples from the Fuse code base.

Correlating Events

When one pico sends an event to another, it often is expecting an asynchronous response. A correlation identifier can be used to associate these two events. A correlation identifier links conversational state in asynchronous interactions.

event_correlation
event correlation

Correlation identifiers are passed as event attributes and can be any string that is unique within the conversation.

Rules use a correlation identifier to ensure that two processes don't get confused with one another. For example, the correlation identifier can be used in the pico's persistent state to keep data about different conversations separate.

The following rule from the Fuse Fleet ruleset shows the calculation and use of an correlation number as part of creating Fuse fleet reports. The rule has been simplified to emphasize the idea of correlation numbers.

rule start_periodic_report {
  select when fuse periodic_report_start
  pre {
    new_rcn = genCorrelationNumber();
    rcn = event:attr("report_correlation_number")
	       .defaultsTo(new_rcn);
    ...
    report_data = {"period": period,
		   "start": start,
		   "end": end,
		   "timezone": tz};
    augmented_attrs = event:attrs()
                       .put(["report_correlation_number"], rcn);
  }
  fired {
    raise explicit event periodic_report_routable
      with attributes augmented_attrs;
    set ent:report_data{rcn} report_data;
    schedule explicit event "periodic_report_timer_expired"
      at time:add(time:now(),{"minutes" : 2}) 
      attributes {"report_correlation_number": rcn, "timezone": tz}
  }
}

Commentary:

  • The correlation number is generated by a function that ensures that it's a unique string.
  • The implicit understanding is that any rule that sees the correlation number will pass it along so that every player can correlate their actions. Future versions of Wrangler, the pico operating system, will provide more automation for correlation.
  • The correlation number is used in internal events (the raise in the postlude).
  • The rule stores the correlation number for later use. In this case it's used as a key for storing other information in a persistent variable (the report_data) that will be used later for interactions involving this event.

Event Recipient Lists

Event or message routing is a critical task in reactive systems. The simplest way to route events to other picos is to keep a list of picos. Sending events to each of the picos on a recipient list is a simple matter. In this pattern one rule sends the same event to each of a number of other picos. The recipient list is analogous to the To: list on an email message.

event_recipient_list
Event Recipient List

The recipient list can be static, based on a particular configuration of picos in the computation, or it can be calculated in various ways. Computed recipient lists allow a pico to act as an event router.

The following example from Fuse shows an example of a rule that uses an event recipient list that contains all the vehicles in the fleet. The list isn't static, it's computed by calling the activeVehicleSummary() function. This function could change the recipient list based on vehicles being added or removed from the fleet, or merely by vehicles being inactive (not connected to a device).

rule process_periodic_report_with_rcn {
  select when explicit periodic_report_routable
  foreach activeVehicleSummary() setting(vsum)
    pre {
      rcn = event:attr("report_correlation_number");
      channel = {"cid": vsum{"channel"}};
      ...
    }
    if(not rcn.isnull()) then {
      event:send(channel, "fuse", "periodic_vehicle_report")
	  with attrs = {
	    "report_correlation_number": rcn,
	    "vehicle_id": vsum{"deviceId"},
	    "start": common:convertToUTC(start),
	    "end": common:convertToUTC(end)
	  };
    }
    ...
}

Commentary:

  • The foreach statement runs the rule once for each vehicle returned by activeVehicleSummaries()
  • Each iteration of the foreach loop sets the vsum varible with the vehicle summary for a specific vehicle.
  • The rule routes the periodic_vehicle_report event to each vehicle using event:send() and the event channel in vsum.
  • The correlation number is taken from an event attribute.
  • If the correlation number is missing, the rule doesn't fire.
  • The correlation number is sent with the event to each vehicle pico.

Content-Based Event Routing

Another way to route messages is by content. In content-based event routing, the routing pico knows about some number of other picos and selects where to route the event based on the event domain, name, or attributes and other information such as the current state of the pico and external information from APIs.

The routing rule usually attaches a correlation identifier to the event before routing it.

event_routing
Event Routing

The route_to_owner rule from the Fuse Fleet ruleset is a simple example of this idea. A Fuse fleet can have more than one owner and the fleet needs to generally keep the owner aware of certain things by routing events.

rule route_to_owner {
  select when fuse new_fleet
           or fuse reminders_ready
           or fuse email_for_owner
  pre {
    owner_subs =
       CloudOS:subscriptionList(common:namespace(),"FleetOwner");
    // find the owner who contacted us (could be more than one)
    matching_owner = owner_subs.filter(
              function(sub){ sub{"backChannel"} eq meta:eci()
                           }
              );
    // use first owner if no match
    owner_list = matching_owner.length() > 0 => matching_owner
                                              | owner_subs;
    owner = owner_list.head().pick("$.eventChannel");
  }
  {
    send_directive("Routing to owner")
      with channel = owner 
       and attrs = event:attrs();
    event:send({"cid": owner}, "fuse", event:type())
      with attrs = event:attrs();
  }
}

Commentary:

  • The event expression is used to determine what events get routed to the owner. This method is static and bound early. KRL offers no mechanism at present for dynamically computing an event expression, but rule chaining with computed event types could be used to achieve a similar effect.
  • The rule is designed to route events to just one owner. The event is routed to the owner who sent the incoming event or to the first owner, if the incoming event didn't come from an owner.
  • If needed, this rule could be extended to route to all owners.

Pico Registration

The most general way to route events is to create a service directory and allow picos to register for events based on specific criteria.

pico registry
Pico Registration

In this pattern, picos send registration events that include information like their name, an event channel identifier, and attributes that are important in routing. The registration pico might be a special pico that serves as a directory in a large system or the registration might just be a ruleset in pico with other responsibilities. The registration pico might route events based on pico type, name, or other attributes.

One important feature of a directory is to allow picos to change their event channel for security reasons without losing service. A directory also allows picos to move to other hosting providers without loss of functionality.

A more complex example of this idea is to use a registrar. A registrar is a third party that manages the registry on behalf of the instances. The registrar watches for child_created or child_destroyed events to know when instances are created or destroyed and registers or deregisters them as appropriate. The registrar also periodically checks the health of instances and automatically deregisters those it deems incapacitated. The registrar decouples instances from the registry. They can be oblivious to the existence of the registry so that they never need explicitly register.

There is no good example of this pattern in Fuse. However, the following example code shows how this could work. First let's look at a simple rule to process registration events:

rule register_picos {
  select when system pico_registration
  pre {
    topic = event:attr("topic");
    pico_data =  makeRegistrationRecord(event:attrs());
  }
  if(not pico_data{"eci"}.isnull()) then noop();
  fired {
     set ent:registrations{topic} = pico_data
  }
}

Commentary:

  • The rule makes use of a topic event attribute to determine which topic the registering pico is interested in.
  • The rule uses a function, makeRegistrationRecord() to process the incoming event attributes and create a record of anything important.
  • The rule only fires if the incoming registration event includes an event channel. Obviously, this check could include any necessary required information.
  • The rule ultimately stores the registration request in an entity variable called ent:registrations by topic.

The following code routes to registered picos by topic. This rule would be in the same pico as the pico_registration rule shown above.

rule route_to_registered {
  select when fuse events_to_route
           or fuse another_event_to_route
  foreach ent:registrations{event:attr("topic")}
    setting(registered_pico)
    pre {
      channel = {"cid": registered_pico{"eci"}}
    }
    event:send(channel, "fuse", event:type()) with attrs = event:attrs();
}

Commentary:

  • This rule selects on any routable events that have been set in the rule's event expression.
  • The foreach loop will run once for any pico in the ent:registrations variable for the topic specified in the attributes of the incoming event. The topic could be computed rather than relying on it being in the incoming event.
  • All the incoming event attributes and the incoming event type are routed to the picos that are registered for the topic.

Aggregators

The aggregator pattern listens for incoming events and collects them. Once the required events have been collected, the aggregator raises an event or takes some another action.

event_aggregation
Event Aggregation

The following two rules from the Fuse Fleet ruleset comprise an event aggregator. The first, catch_periodic_vehicle_reports, watches for periodic_vehicle_report_created events from one of the vehicles in the fleet, saves the information in the event as that vehicle's report, and raises an event that indicates the vehicle report was added.

rule catch_periodic_vehicle_reports {
  select when fuse periodic_vehicle_report_created

  pre {
    vehicle_id = event:attr("vehicle_id");
    rcn = event:attr("report_correlation_number");
    updated_vehicle_reports =
        (ent:vehicle_reports{[rcn,"reports"]})
		  .defaultsTo([])
	          .append(event:attr("vehicle_details").decode());

  }
  noop();
  always {
    set ent:vehicle_reports{[rcn,"reports"]} updated_vehicle_reports;
    raise explicit event periodic_vehicle_report_added with
      report_correlation_number = rcn
  }

}    

Commentary:

  • The report correlation number is sent with the incoming event as an event attribute.
  • The report correlation number is used to store the report in an entity variable. Thus multiple simultaneous reports could in play at the same time without interfering with each other.
  • The same report correlation number is raised with the periodic_vehicle_report_added event as an attribute.

The second rule, determines when sufficient reports have been collected. In this case, it's comparing the number of reports received with the number of active vehicles. So long as there are insufficient reports received, the the rule does nothing.

rule check_periodic_report_status {
  select when explicit periodic_vehicle_report_added
  pre {
    rcn = event:attr("report_correlation_number");
    vehicles_in_fleet = activeVehicleSummary().length();
    number_of_reports_received = (ent:vehicle_reports{[rcn,"reports"]})
				   .length();
  }
  if ( vehicles_in_fleet <= number_of_reports_received ) then noop();
  fired {
    log "process vehicle reports ";
    raise explicit event periodic_report_ready with
      report_correlation_number = rcn;
  } else {
    log "we're still waiting for " +
        (vehicles_in_fleet - number_of_reports_received) +
	" reports on #{rcn}";
  }
}

Commentary:

  • The prelude calculates how many vehicle reports have been received using the report correlation number.
  • The report correlation number is passed along with the periodic_report_ready event so that any downstream rules can process the right report.
  • We use the log() statement in the else clause of the postlude to show information in the logs about how many reports have been received.

Scatter-Gather Pattern

The scatter-gather pattern is useful when you need to get some number of picos to do something and then aggregate the results to complete the computation. This is a common pattern for asynchronous processing in reactive systems. Events are sent asynchronously and, consequently, the sending pico does not block and is free to process other data while it's waiting for the result. Similarly, the picos that receive the event can process the event and respond when ready.

Fuse uses the scatter-gather pattern in creating weekly vehicle reports. We've already seen the process_periodic_report_with_rcn rule in the Event Recipient Lists pattern. This rule scatters events telling the vehicles that the fleet needs the vehicle report. The rules we just saw in the Aggregator pattern are the gathering part of this set of rules.

When we combine the pictures from those two patterns, we get a set up that looks like this:

scatter_gather
Scatter Gather

The scatter-gather pattern and it's use in Fuse is described in some detail in Using the Scatter-Gather Pattern to Asynchronously Create Fuse Reports. Here's a diagram showing the specific interactions in Fuse:

scatter-gather pattern
scatter-gather pattern

The owner pico kicks everything off by sending the request_periodic_report event to the fleet. The start_periodic_report rule in the fleet pico scatters the periodic_vehicle_report event to each vehicle in the fleet, whether there's 1 or 100. Of course, these events are asynchronous as well. Consequently the vehicle picos are not under time pressure to complete.

When each vehicle pico completes, it sends a periodic_vehicle_report_created event to the fleet pico. The catch_vehicle_reports rule is listening and gathers the reports. Once it's added the vehicle report, it fires the periodic_vehicle_report_added event. Another rule in the fleet pico, check_report_status is checking to see if every vehicle has responded. When the number of reports equals the number of vehicles, it raises the periodic_report_data_ready event and the data is turned into a report and the owner pico is notified it's ready for emailing.

As we've seen, these rules make extensive use of the report correlation number to ensure that reports are not intermingled if a request_periodic_report event happens to be sent before the previous one finishes.

Dealing with Failure

Because events may be lost, asynchronous systems have to be prepared to deal with failure.

event failure
Event Failure

In the case of Fuse, the start_periodic_report rule that we saw in the section on Correlating Events also schedules the periodic_report_timer_expired event for two minutes in the future:

schedule explicit event "periodic_report_timer_expired"
  at time:add(time:now(),{"minutes" : 2}) 
  attributes {"report_correlation_number": rcn, "timezone": tz}

Another rule, retry_from_expired_timer, listens for this event and retries missing vehicles:

rule retry_from_expired_timer {
  select when explicit periodic_report_timer_expired
  pre {
    max_retries = 2;
    rcn = event:attr("report_correlation_number");
    tz = event:attr("timezone");
    vehicle_summaries = vehicleSummary();
    vehicle_reports = ent:vehicle_reports{[rcn,"reports"]}
                        .defaultsTo([]);
    vehicle_summaries_keys = vehicle_summaries
                               .map(function(r){r{"deviceId"}});
    vehicle_reports_keys = vehicle_reports
                             .map(function(r){r{"deviceId"}});

    missing_vehicles = vehicle_summaries_keys
			   .difference(vehicle_reports_keys);
    in_array = function(k,a){
      a.filter(function(x){x eq k}).length() > 0;
    };
    needed = vehicle_summaries.filter(
                 function(s){
		   in_array(s{"deviceId"}, missing_vehicles)
		 });

    rcn_unprocessed = not ent:vehicle_reports{rcn}.isnull();
  }
  if ( needed.length() > 0
    && ent:retry_count < max_retries
     ) then {
    noop();
  }
  fired {
    log "Retrying for " + (needed.length()) + " vehicles";
    set ent:retry_count ent_retry_count+1;
    raise fuse event periodic_report_start attributes {
       "vehicle_summaries": needed,
       "timezone": tz,
       "report_correlation_number": rcn 
      } if rcn_unprocessed;
    schedule explicit event "periodic_report_timer_expired"
      at time:add(time:now(),{"minutes" : 2}) 
      attributes {"report_correlation_number": rcn, "timezone": tz}
  } else {
    clear ent:retry_count;
    raise explicit event periodic_report_ready with
      report_correlation_number = rcn if rcn_unprocessed;
  }
}

Commentary:

  • The retry in two minutes is somewhat arbitrary. Since reports are only generated once a week, waiting two minutes to retry does not seem overly long.
  • The logic in the prelude of this rule is primarily concerned with calculating needed, a list of the vehicles that have not yet sent reports.
  • If reports are needed, the same event, periodic_report_start, is raised, but with a vehicle_summaries attribute.
  • The scheduled event is also reset in case this retry fails as well.
  • The rule is designed to retry a maximum number of times. If the maximum is reached, the rule raises the periodic_report_ready event even if not all reports have been received. The report is processed without them.

Conclusions

Part of the challenge in building reactive systems is leaning to program in a completely new style. Reactive systems are by nature asynchronous and loosely coupled. Furthermore, program flow doesn't follow in the same manner as in object-oriented or imperative systems, but rather is based on the event-flow. Rule-based systems may have any number of independent responses to a given event. Knowing patterns can help ease the transition to this new style of programming.

This post has demostrated and explained a number intra-pico and pico-to-pico event patterns. The list we have reviewed is by no means exhaustive, but it is indicative of the wide variety of event processing that happens in actor-based systems. The Fuse code base is open-souce and thus represents an example of a large, Internet of Things system programmed in a reactive style using the actor model.


  1. KRL data structure are just JSON.
  2. In general, KRL is designd so that declarations in the prelude cannot cause side effects to persistent variables.


O-Ring Theory of Production

Fascinating look at the way the multiplicative nature of quality in complex tasks leads to exponential effects in overall productivity. The video is about 20 minutes long, but worth the time.

The video is fairly general, but I think anyone involved with software development will see the implications right away. One is that only small drops in productivity of one member of a team can be disastrous. For jobs that require getting many things right (made from lots of individual tasks) then your chance of getting a satisfactory result drops to almost zero with only a few low quality results on individual tasks. Everyone has to be on top of their game to successfully complete complex tasks.

Another is that good people want to work with good people. This leads to virtuous or vicious cycles that are hard to get our of. Hiring and keeping good people is vital. And poor performers hurt everyone. If you can't get rid of them, best to isolate them.

For a firm, hiring the best people matters...a lot. Putting those people in good teams and a good environment matters...a lot. For an economy, training people to do high quality jobs matters...a lot.

Adrian Colyer applies this to DevOps in The O-Ring Theory of DevOps. He points out that O-ring theory implies that every stage in the DevOps pipeline needs to be performing at high quality and failure of even on stage (e.g. failure of tests to catch problems) can cause big problems. Conversely, automating steps well implies that the quality will be consistent.


Rebuilding KRL

The mountains, the lake and the cloud

The Kynetx Rules Engine, or KRE, is the evaluator that implements persistent compute objects (picos) and evaluates KRL inside them. Picos are actors that are first-class Internet citizens. Thus, KRE supports an actor-model for programming the Internet of Things. Picos are the underlying infrastructural element for my ideas on social things.

The current implementation of KRE is an Apache module written in Perl5. Apache connects KRE to the Web and functions as a application server. Mark Horstmeier and I wrote it over the past eight years with help from Cid Dennis, Sam Curren, Wade Billings, and others.

I believe that for KRL to be widely used in the Internet of Things, the current implementation will need to be replaced with something much faster. KRE is an excellent test bed and pilot implementation that has hosted multiple production products. But there are some products we avoided due to performance and security issues.

Here are the principles I think should govern a reimplementation:

  • Speed — The1 new evaluator needs to be several orders of magnitude faster. I believe this is possible because KRE is implemented as a naive, AST-directed interpreter with few optimizations.
  • Internet first — The new evaluator must retain KRE's Internet-first stance. That means that picos must remain first-class Internet citizens. Consequently, picos must
    • be capable of receiving events and queries on Internet channels,
    • interact with other picos, even if they're hosted on another instance of the evaluator somewhere else on the Internet, and
    • call and be called from Internet-hosted APIs.
  • Small deployments — KRE is a pretty complicated beast to build and deploy. Not only does it require Apache, but also over 100 Perl modules, memcached, MongoDB, and Java (for ANTLR). The new evaluator should be fully containerized and capable of being deployed in relatively small execution environments such as home WiFi routers
  • Attribute-Based Event Authorization — KRE has a simple authorization scheme that has served the products built on it. An attribute-based event authorization scheme would increase the kinds of uses to which picos could be put.

This reimplementation won't entail significant changes to the overall functionality of either picos or KRL beyond those required to meet the preceding guidelines. There will be some cleaning up of syntactic warts. The current parser is mostly good enough.

I'm inclined to a bytecode interpreter-based implementation. I think targeting the JVM is a good starting point, but I've not done enough research to know if that's a good decision or not. Perhaps some other bytecode interpreter would serve better. I don't want to write my own bytecode interpreter if I can help it.

A bytecode interpreter fits well with KRE's current architecture. For example, KRL rulesets are hosted online (e.g. Github) and compiled and optimized on demand. The engine then uses this cached intermediate representation to respond to events and queries. As a result, the parsing and translation of KRL programs could be easily separated from the evaluation of the bytecode when an event or query is received by a pico. That supports small deployments since compile functions could be hosted in the cloud.

The following picture shows the Compile pipeline. KRL is ingested on one end. Bytecode is produced as a result. In between, KRL is parsed, optimized, and analyzed before being translated into bytecode. In addition, the pipeline creates the salience graph, the data structure the scheduler uses to determine which rules are applicable to a given event.

Compile Pipeline

Once the bytecode is created it would be used by both the Event Evaluation pipeline and the Query Evaluation pipeline. The following diagrams show those pipelines. The event (or query) decoding creates a decode object and evaluation context. The determination of which rules are scheduled or which functions respond is made at runtime. For the Event Evaluation pipeline, the execution of a rule may cause additional rules to be scheduled.

Event Evaluation Pipeline Query Evaluation Pipeline

I have money to proceed with this project. Right now I'm in the planning stage. I'm looking for someone who is interested in helping with the planning and implementation. The position is a contract position that will last for at least one year. I prefer someone who can be local (i.e. sit near me at BYU). If you're interested in getting paid to build a programming language, let me know.


Decentralized Public Key Infrastructure

Svastra obliqua, m, face, Dorchester Co_2015-12-01-18.04

Decentralized Public Key Infrastructure

Abstract

Today’s Internet places control of online identities into the hands of third-parties. Email addresses, usernames, and website domains are borrowed or "rented" through DNS, X.509, and social networks. This results in severe usability and security challenges Internet-wide. This paper describes a possible alternate approach called decentralized public key infrastructure (DPKI), which returns control of online identities to the entities they belong to. By doing so, DPKI addresses many usability and security challenges that plague traditional public key infrastructure (PKI). DPKI has advantages at each stage of the PKI life cycle. It makes permissionless bootstrapping of online identities possible and provides for the simple creation of stronger SSL certificates. In usage, it can help “Johnny” to finally encrypt thanks to its relegation of public key management to secure decentralized datastores. Finally, it includes mechanisms to recover lost or compromised identifiers.

This paper was prepared by (alphabetical by last name) Christopher Allen, Arthur Brock, Vitalik Buterin, Jon Callas, Duke Dorje, Christian Lundkvist, Pavel Kravchenko, Jude Nelson, Drummond Reed, Markus Sabadello, Greg Slepak, Noah Thorp, and Harlan T Wood as part of the Rebooting the Web of Trust project.


Evaluating KRL Declarations

I can't tell you how much I rely on browser consoles to test and debug JavaScript. Programming KRL has been made more difficult by the fact that there's no command line. KRL is evaluated online in the context of a pico. You couldn't just run a few lines to see test a complex set of operations or figure out a dumb syntax error.

I just built a simple KRL declaration evaluator that you can use to run KRL declarations and see the results. Here's a screenshot:

KRL Declaration Evaluator

The sad part is this only took about three hours because all the pieces were just lying around in the code that makes picos work. I just had to hook them up. This is going to save me a lot of time. And I'm sure my students will enjoy it. Much nicer than adding logging statements, uploading the code to the pico, evaluating, and then looking at the logs. Why didn't I do this year ago?

I'm sure there are weird expression result I might not have accounted for. And the evaluator can't help with expressions that contain references to persistent variables, event attributes, and other expressions that only make sense in the context of a pico.

Right now, the editor is using a JavaScript syntax highlighter, which is close, but not perfect. For example, the image shows some errors (red x's on the left margin) that are not really KRL errors. That's fine KRL. If you're interested in writing a KRL syntax highlighter for Ace, I'd be much obliged. For now, I've left syntax highlighting on, but turned syntax checking off. No worries, the results will show you syntax errors if you submit something with bad syntax.

KRL Declaration evaluator with error

I made this work with the help of the Ace Editor, Bootstrap, and jQuery.