Reactive Programming Patterns: Scatter Gather

Introduction

Fuse is an open-source, connected-car platform that was used to experiment with techniques for building a true Internet of Things. Fuse was built using picos.

Fuse sends a periodic (weekly for now) report to the fleet owner providing all the details of trips and fuel fillups for each vehicle in the reporting period. The report also aggregates the detail information for each vehicle and then for the fleet. Here's the start of a weekly report for my fleet:

The owner is represented by a pico, as are the fleet and each of the vehicles. Each of these picos is independent, network addressable, stores data for itself, and executes processes that are defined by functions and rules. They can respond to requests (via functions) or events (via rules). They communicate with each other directly without intermediation.

Synchronous Request-Response Solution

The most straightforward way to create a report, and the one I used initially, is for the fleet to make a request of each of its vehicles, asking them to compile details about trips and fillups and return the resulting JSON structure. Then the fleet formats that and sends an event to the owner pico indicating the report is ready to email. That process is represented by the following diagram. The methods for coding it are straightforward and will be familiar to anyone who's used an API.

The owner pico kicks everything off by sending the periodic_report event to the fleet pico. The fuse_periodic_report rule in the fleet pico calls a function in the fleet pico called fleetDetails() that makes synchornous requests to each of the vehicles over HTTP using their API. Once all the vehicles have responded, the rule formats the report and tells the owner pico its ready via the periodic_report_ready event.

This works pretty well so long as the vehicles respond in a timely manner. For performance reasons, I have the HTTP timeouts set fairly short, so any big delay causes a vehicle to get missed when a request for its details times out. For people with a few vehicles in their fleet, it's fairly rare for this to happen. But with lots of vehicles, the chances go up. Somewhere around 10 vehicles in the fleet and your chances of at least one vehicle timing out get fairly good.

If my only tool was synchronous request-response-style interactions, then this would be a pretty big problem. I could increase the time out, but that's a bandaid that will only mask the problem for a while. I could make the vehicleDetails() function more performant, but that's a lot of work for reasons having to do with how the underlying platform does queries in Mongo. So that's a can of worms I'd rather not open now. Besides, it's still possible for something to get delayed due to network latency or some other problem regardless of how fast the underlying platform is.

Scatter-Gather Solution

A more entertaining and intellectually interesting solution is to use a scatter-gather pattern of rules to process everything asynchronously.

Vaughn Vernon describes the scatter-gather pattern on page 272 of his new book Reactive Messaging Patterns with the Actor Model1. Scatter-gather is useful when you need to get some number of picos to do something and then aggregate the results to complete the computation. That's exactly the problem we face here: have each vehicle pico get its trip and fillup details for the reporting period and then gather those results and process them to produce a report.

The diagram below shows the interactions between picos to create the report. A few notes about the diagram:

  • Each pico in the diagram with the same name is actually the same pico, reproduced to show the specific interaction at a given point in the flow.

  • The rules send events, but only to a pico generally, not to a specific rule. Each pico provides an event bus that rules use to subscribe to events. Any number of rules can be listening for a given event.

  • There are no requests (function calls) in this flow, only asynchronous events.

Here's how it works.

The owner pico kicks everything off by sending the request_periodic_report event to the fleet. Because events are asynchronous, after it does so, it's free to do other tasks. The start_periodic_report rule in the fleet pico scatters the periodic_vehicle_report event to each vehicle in the fleet, whether there's 1 or 100. Of course, these events are asynchronous as well. Consequently they are not under time pressure to complete.

When each vehicle pico completes, it sends a periodic_vehicle_report_createdevent to the fleet pico. The catch_vehicle_reports rule is listening and gathers the reports. Once it's added the vehicle report, it fires the periodic_vehicle_report_addedevent. Another rule in the fleet pico, check_report_status is checking to see if every vehicle has responded. When the number of reports equals the number of vehicles, it raises the periodic_report_data_ready event and the data is turned into a report and the owner pico is notified it's ready for emailing.

Some Messy Details

You might have noticed a few issues that have to be addressed in the preceding narrative.

First, while unlikely, it's possible that the process could be started anew before the first process has completed. To avoid clashes and keep the events and data straight, each report process has a unique report correlation number (rcn) Each report is kept separate, even if multiple reports are being processed at the same time. This is not strictly necessary for this task since reports run once per week and are extremely unlikely to overlap. But it's a good practice to use correlation numbers to keep independent process flows independent.

Second, the check_report_status uses events from the vehicle picos to determine when it's done. But event delivery is not guaranteed. If one or more vehicle picos fail to produce a vehicle report, then no fleet report would be delivered to the owner. There are several tactics we could use:

  • We could accept the failure and tell owners that sometimes reports will fail, possibly giving them or someone else the opportunity to intervene manually and regenerate the report.

  • We can set a timeout and continue, generating a report with some missing vehicles.

  • We can set a timeout and reissue events to vehicle picos that failed to respond. This is more complicated because in the event that the vehicle pico still fails to respond after some number of retries, we have to adopt the strategy of continuing without the data.

I adopted the second strategy. Picos have the ability to schedule events for some future time (either once or repeating). I chose 2 minutes as the time out period. That's plenty long enough for the vehicles to respond.

This idea of creating timeouts with scheduled events is very important. Unlike an operating system, picos don't have an internal timer tick. They only respond to events. So it's up to the programmer to determine when a timer tick is necessary and schedule one. While it's possible to use recurring scheduled events to create a regular, short-delay timer tick for a pico, I discourage it because its generally unnecessary and wastes processing power.

How it Works

The following sections gives details about the specific patterns that makes scatter-gather work.

Correlating Events

When one pico sends an event to another, it often is expecting an asynchronous response. A correlation identifier can be used to associate these two events. A correlation identifier links conversational state in asynchronous interactions. 

Correlation identifiers are passed as event attributes and can be any string that is unique within the conversation. 

Rules use a correlation identifier to ensure that two processes don't get confused with one another. For example, the correlation identifier can be used in the pico's persistent state to keep data about different conversations separate. 

The following rule from the Fuse Fleet ruleset shows the calculation and use of an correlation number as part of creating Fuse fleet reports. The rule has been simplified to emphasize the idea of correlation numbers. 

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 rule start_periodic_report { select when fuse periodic_report_start pre { new_rcn = genCorrelationNumber(); rcn = event:attr("report_correlation_number") .defaultsTo(new_rcn); ... report_data = {"period": period, "start": start, "end": end, "timezone": tz}; augmented_attrs = event:attrs() .put(["report_correlation_number"], rcn); } fired { raise explicit event "periodic_report_routable" with attributes augmented_attrs; ent:report_data{rcn} := report_data; schedule explicit event "periodic_report_timer_expired" at time:add(time:now(),{"minutes" : 2}) attributes {"report_correlation_number": rcn, "timezone": tz} } }

Commentary:

  • The correlation number is generated by a function that ensures that it's a unique string.

  • The implicit understanding is that any rule that sees the correlation number will pass it along so that every player can correlate their actions. Future versions of Wrangler, the pico operating system, will provide more automation for correlation.

  • The correlation number is used in internal events (the raise in the postlude).

  • The rule stores the correlation number for later use. In this case it's used as a key for storing other information in a persistent variable (the report_data) that will be used later for interactions involving this event. 

Event Recipient Lists

Event or message routing is a critical task in reactive systems. The simplest way to route events to other picos is to keep a list of picos. Sending events to each of the picos on a recipient list is a simple matter. In this pattern one rule sends the same event to each of a number of other picos. The recipient list is analogous to the To: list on an email message. 

The recipient list can be static, based on a particular configuration of picos in the computation, or it can be calculated in various ways. Computed recipient lists allow a pico to act as an event router. 

The following example from Fuse shows an example of a rule that uses an event recipient list that contains all the vehicles in the fleet. The list isn't static, it's computed by calling the activeVehicleSummary() function. This function could change the recipient list based on vehicles being added or removed from the fleet, or merely by vehicles being inactive (not connected to a device). 

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 rule process_periodic_report_with_rcn { select when explicit periodic_report_routable foreach activeVehicleSummary() setting(vsum) pre { rcn = event:attr("report_correlation_number"); channel = {"cid": vsum{"channel"}}; ... } if(not rcn.isnull()) then { event:send(channel, "fuse", "periodic_vehicle_report") with attrs = { "report_correlation_number": rcn, "vehicle_id": vsum{"deviceId"}, "start": common:convertToUTC(start), "end": common:convertToUTC(end) }; } ... }

Commentary:

  • The foreach statement runs the rule once for each vehicle returned by activeVehicleSummaries()

  • Each iteration of the foreach loop sets the vsum variable with the vehicle summary for a specific vehicle.

  • The rule routes the periodic_vehicle_report event to each vehicle using event:send() and the event channel in vsum.

  • The correlation number is taken from an event attribute.

  • If the correlation number is missing, the rule doesn't fire.

  • The correlation number is sent with the event to each vehicle pico. 

Content-Based Event Routing

Another way to route messages is by content. In content-based event routing, the routing pico knows about some number of other picos and selects where to route the event based on the event domain, name, or attributes and other information such as the current state of the pico and external information from APIs. 

The routing rule usually attaches a correlation identifier to the event before routing it. 

The route_to_owner rule from the Fuse Fleet ruleset is a simple example of this idea. A Fuse fleet can have more than one owner and the fleet needs to generally keep the owner aware of certain things by routing events. 

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 rule route_to_owner { select when fuse new_fleet or fuse reminders_ready or fuse email_for_owner pre { owner_subs = wrangler:established().filter(function(sub){sub{Tx_role} == "FleetOwner"}); // find the owner who contacted us (could be more than one) matching_owner = owner_subs.filter( function(sub){ sub{"backChannel"} eq meta:eci } ); // use first owner if no match owner_list = matching_owner.length() > 0 => matching_owner | owner_subs; owner = owner_list.head().pick("$.eventChannel"); } { send_directive("Routing to owner") with channel = owner and attrs = event:attrs(); event:send({"cid": owner}, "fuse", event:type()) with attrs = event:attrs(); } }

Commentary:

  • The event expression is used to determine what events get routed to the owner. This method is static and bound early. KRL offers no mechanism at present for dynamically computing an event expression, but rule chaining with computed event types could be used to achieve a similar effect.

  • The rule is designed to route events to just one owner. The event is routed to the owner who sent the incoming event or to the first owner, if the incoming event didn't come from an owner. 

  • If needed, this rule could be extended to route to all owners.

Pico Registration

The most general way to route events is to create a service directory and allow picos to register for events based on specific criteria. 

In this pattern, picos send registration events that include information like their name, an event channel identifier, and attributes that are important in routing. The registration pico might be a special pico that serves as a directory in a large system or the registration might just be a ruleset in pico with other responsibilities. The registration pico might route events based on pico type, name, or other attributes. 

One important feature of a directory is to allow picos to change their event channel for security reasons without losing service. A directory also allows picos to move to other hosting providers without loss of functionality. 

A more complex example of this idea is to use a registrar. A registrar is a third party that manages the registry on behalf of the instances. The registrar watches for child_created or child_destroyed events to know when instances are created or destroyed and registers or deregisters them as appropriate. The registrar also periodically checks the health of instances and automatically deregisters those it deems incapacitated. The registrar decouples instances from the registry. They can be oblivious to the existence of the registry so that they never need explicitly register. 

There is no good example of this pattern in Fuse. However, the following example code shows how this could work. First let's look at a simple rule to process registration events:

1 2 3 4 5 6 7 8 9 10 11 rule register_picos { select when system pico_registration pre { topic = event:attr("topic"); pico_data = makeRegistrationRecord(event:attrs()); } if(not pico_data{"eci"}.isnull()) then noop(); fired { ent:registrations{topic} := pico_data } }

Commentary:

  • The rule makes use of a topic event attribute to determine which topic the registering pico is interested in. 

  • The rule uses a function, makeRegistrationRecord() to process the incoming event attributes and create a record of anything important.

  • The rule only fires if the incoming registration event includes an event channel. Obviously, this check could include any necessary required information. 

  • The rule ultimately stores the registration request in an entity variable called ent:registrations by topic.

The following code routes to registered picos by topic. This rule would be in the same pico as the pico_registration rule shown above. 

1 2 3 4 5 6 7 8 9 10 rule route_to_registered { select when fuse events_to_route or fuse another_event_to_route foreach ent:registrations{event:attr("topic")} setting(registered_pico) pre { channel = {"cid": registered_pico{"eci"}} } event:send(channel, "fuse", event:type()) with attrs = event:attrs(); }

Commentary:

  • This rule selects on any routable events that have been set in the rule's event expression.

  • The foreach loop will run once for any pico in the ent:registrations variable for the topic specified in the attributes of the incoming event. The topic could be computed rather than relying on it being in the incoming event.

  • All the incoming event attributes and the incoming event type are routed to the picos that are registered for the topic.

Aggregators

The aggregator pattern listens for incoming events and collects them. Once the required events have been collected, the aggregator raises an event or takes some another action. 

The following two rules from the Fuse Fleet ruleset comprise an event aggregator. The first, catch_periodic_vehicle_reports, watches for periodic_vehicle_report_created events from one of the vehicles in the fleet, saves the information in the event as that vehicle's report, and raises an event that indicates the vehicle report was added. 

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 rule catch_periodic_vehicle_reports { select when fuse periodic_vehicle_report_created pre { vehicle_id = event:attr("vehicle_id"); rcn = event:attr("report_correlation_number"); updated_vehicle_reports = (ent:vehicle_reports{[rcn,"reports"]}) .defaultsTo([]) .append(event:attr("vehicle_details").decode()); } noop(); always { ent:vehicle_reports{[rcn,"reports"]} := updated_vehicle_reports; raise explicit event periodic_vehicle_report_added with report_correlation_number = rcn } }

Commentary:

  • The report correlation number is sent with the incoming event as an event attribute.

  • The report correlation number is used to store the report in an entity variable. Thus multiple simultaneous reports could in play at the same time without interfering with each other.

  • The same report correlation number is raised with the periodic_vehicle_report_added event as an attribute. 

The second rule, determines when sufficient reports have been collected. In this case, it's comparing the number of reports received with the number of active vehicles. So long as there are insufficient reports received, the the rule does nothing. 

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 rule check_periodic_report_status { select when explicit periodic_vehicle_report_added pre { rcn = event:attr("report_correlation_number"); vehicles_in_fleet = activeVehicleSummary().length(); number_of_reports_received = (ent:vehicle_reports{[rcn,"reports"]}) .length(); } if ( vehicles_in_fleet <= number_of_reports_received ) then noop(); fired { log info "process vehicle reports "; raise explicit event "periodic_report_ready" with report_correlation_number = rcn; } else { log info "we're still waiting for " + (vehicles_in_fleet - number_of_reports_received) + " reports on #{rcn}"; } }

Commentary:

  • The prelude calculates how many vehicle reports have been received using the report correlation number.

  • The report correlation number is passed along with the periodic_report_readyevent so that any downstream rules can process the right report. 

  • We use the log statement in the else clause of the postlude to show information in the logs about how many reports have been received.

Scatter-Gather Pattern

The scatter-gather pattern is useful when you need to get some number of picos to do something and then aggregate the results to complete the computation. This is a common pattern for asynchronous processing in reactive systems. Events are sent asynchronously and, consequently, the sending pico does not block and is free to process other data while it's waiting for the result. Similarly, the picos that receive the event can process the event and respond when ready. 

Fuse uses the scatter-gather pattern in creating weekly vehicle reports. We've already seen the process_periodic_report_with_rcn rule in the Event Recipient Lists pattern. This rule scatters events telling the vehicles that the fleet needs the vehicle report. The rules we just saw in the Aggregator pattern are the gathering part of this ruleset. 

When we combine the pictures from those two patterns, we get a set up that looks like this:

The owner pico kicks everything off by sending the request_periodic_report event to the fleet. The start_periodic_report rule in the fleet pico scatters the periodic_vehicle_report event to each vehicle in the fleet, whether there's 1 or 1000. Of course, these events are asynchronous as well. Consequently the vehicle picos are not under time pressure to complete. 

When each vehicle pico completes, it sends a periodic_vehicle_report_createdevent to the fleet pico. The catch_vehicle_reports rule is listening and gathers the reports. Once it's added the vehicle report, it fires the periodic_vehicle_report_addedevent. Another rule in the fleet pico, check_report_status is checking to see if every vehicle has responded. When the number of reports equals the number of vehicles, it raises the periodic_report_data_ready event and the data is turned into a report and the owner pico is notified it's ready for emailing. 

As we've seen, these rules make extensive use of the report correlation number to ensure that reports are not intermingled if a request_periodic_report event happens to be sent before the previous one finishes. 

Dealing with Failure

Because events may be lost, asynchronous systems have to be prepared to deal with failure.

In the case of Fuse, the start_periodic_report rule that we saw in the section on Correlating Events also schedules the periodic_report_timer_expired event for two minutes in the future:

1 2 3 schedule explicit event "periodic_report_timer_expired" at time:add(time:now(),{"minutes" : 2}) attributes {"report_correlation_number": rcn, "timezone": tz}

Another rule, retry_from_expired_timer, listens for this event and retries missing vehicles:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 rule retry_from_expired_timer { select when explicit periodic_report_timer_expired pre { max_retries = 2; rcn = event:attr("report_correlation_number"); tz = event:attr("timezone"); vehicle_summaries = vehicleSummary(); vehicle_reports = ent:vehicle_reports{[rcn,"reports"]} .defaultsTo([]); vehicle_summaries_keys = vehicle_summaries.map(function(r){r{"deviceId"}}); vehicle_reports_keys = vehicle_reports.map(function(r){r{"deviceId"}}); missing_vehicles = vehicle_summaries_keys .difference(vehicle_reports_keys); in_array = function(k,a){ a.filter(function(x){x eq k}).length() > 0; }; needed = vehicle_summaries.filter(function(s){in_array(s{"deviceId"}, missing_vehicles)}); rcn_unprocessed = not ent:vehicle_reports{rcn}.isnull(); } if ( needed.length() > 0 && ent:retry_count < max_retries ) then { noop(); } fired { log info "Retrying for " + (needed.length()) + " vehicles"; ent:retry_count := ent_retry_count+1; raise fuse event "periodic_report_start" attributes { "vehicle_summaries": needed, "timezone": tz, "report_correlation_number": rcn } if rcn_unprocessed; schedule explicit event "periodic_report_timer_expired" at time:add(time:now(),{"minutes" : 2}) attributes {"report_correlation_number": rcn, "timezone": tz} } else { clear ent:retry_count; raise explicit event "periodic_report_ready" with report_correlation_number = rcn if rcn_unprocessed; } }

Commentary:

  • The retry in two minutes is somewhat arbitrary. Since reports are only generated once a week, waiting two minutes to retry does not seem overly long.

  • The logic in the prelude of this rule is primarily concerned with calculating needed, a list of the vehicles that have not yet sent reports. 

  • If reports are needed, the same event, periodic_report_start, is raised, but with a vehicle_summaries attribute.

  • The scheduled event is also reset in case this retry fails as well.

  • The rule is designed to retry a maximum number of times. If the maximum is reached, the rule raises the periodic_report_ready event even if not all reports have been received. The report is processed without them. 

Conclusion

Using the scatter-gather pattern for generating reports adds some complexity over the synchronous solution. But that point is moot since the synchronous solution fails to work reliably. While more complicated, the scatter-gather solution is only a handful of additional rules and none of them are very long (142 additional lines of code in total). Each rule does a single, easy-to-understand task. Using the scatter-gather solution for generating reports increases the reliability of the report generating system at an acceptable cost.

The scatter-gather solution makes better use of resources since the fleet pico isn't sitting around waiting for all the vehicles to complete before it does other important tasks. The fleet pico is free to respond to other events that may come up while the vehicles are completing their reports.

The concurrent processing is done without locks of any kind. Because each pico is independent, they have no need of locks when operating concurrently. The fleet pico could receive events from multiple vehicles, but they are queued and handled in turn. Consequently, we don't need locks inside the pico either. Lockless concurrency is a property of Actor-model systems like picos.


Notes

  1. I recommend Vaughn's book for anyone interested in picos. While the language/framework (Scala and Akka) is different, the concepts are all very similar. There's a lot of good information that can be directly applied to programming picos.